import numpy as np import math import random import time class ContinuousFunction: """ This is a class that represents the function we are trying to optimize. eval_function is a lambda function representing the function we're optimizing, and bounds represents the min and max allowed value in each dimension (so, this assumes a rectangular boundary) """ def __init__(self, eval_function, bounds): self.eval_function = eval_function self.bounds = bounds """ Return a random point within the bounds """ def random(self): pt = [] for (_min, _max) in self.bounds: diff = _max - _min pt.append(random.random() * diff + _min) return pt """ Check if a point is within the bounds """ def in_bounds(self, point): return all( p >= bnd[0] and p <= bnd[1] for (p, bnd) in zip(point, self.bounds) ) """ Plug the point into the function to find its value at this point """ def score(self, point): return self.eval_function(*point) class Particle: """ This class represents a single particle that will move around the search space """ def __init__(self, initial_position, dimension, max_speed): """ initial_position = starting position of the particle dimension = how many dimensions the problem has, e.g. f(x,y,z) is 3 max_speed = the largest speed the particle is allowed to have """ self.dimension = dimension # this uses a big library called "numpy" that does fast vector # computation self.position = np.array(initial_position) # velocity starts at 0 self.velocity = np.array([0]*self.dimension) # will keep track of best solution seen so far self.best_seen = None self.max_speed = max_speed def advance_position(self, global_best, alpha, beta, gamma): """ Moves the particle one step forward by calculating a new velocity depending on past velocity, best solution THIS particle has ever seen, and best solution ANY particle has ever seen. """ # generate two random vectors r1 and r2 r1 = np.random.random_sample((self.dimension,)) r2 = np.random.random_sample((self.dimension,)) # compute next velocity according to the formula from class self.velocity = ( alpha * self.velocity + beta * r1 * (self.best_seen - self.position) + gamma * r2 * (global_best - self.position) ) # compute the speed of the particle # if v is the velocity vector, then speed is its 2-norm: # sqrt(v[0]^2 + v[1]^2 + ...) speed = np.linalg.norm(self.velocity) # if the particle is going to fast, scale it's velocity vector # down so it's new speed is exactly self.max_speed if speed > self.max_speed: self.velocity *= self.max_speed/speed # move the particle according to its new velocity self.position += self.velocity # note that this code makes no effort to stop the particle from going # out of bounds, which is bad! def set_best(self, best, score): """ set "best" with score "score" as the best seen so far """ self.best_seen = best.copy() self.best_seen_score = score class PSOManager: """ This is a class that handles the main PSO routine, including managing all of the particles """ def __init__( self, space, dimension, N, max_speed=0.01, alpha=0.9, beta=1, gamma=1, maximization=True ): """ space = object representing the problem in this case a ContinuousFunction objec dimension = dimension of the problem N = number of particles maximization = true if maximizing, false if minimizing """ self.space = space self.dimension = dimension self.N = N self.maximization = maximization # Create a list of random particles self.particles = [ Particle(space.random(), dimension, max_speed) for _ in range(N) ] self.alpha = alpha self.beta = beta self.gamma = gamma self.global_best_sol = None self.global_best_score = None self.set_global_best() def is_better(self, new_score, old_score): """ return if new_score is better than old_score (depends if we're maximizing or minimizing) """ if self.maximization: return new_score > old_score return new_score < old_score def set_global_best(self): """ Loops over each particle and computes its score. If it's the best that particle has even seen, tells the particle to store it. If it's the best any particle has even seen, stores it for itself """ for particle in self.particles: score = self.space.score(particle.position) if particle.best_seen is None or self.is_better(score, particle.best_seen_score): particle.set_best(particle.position, score) if ( self.global_best_score is None or self.is_better(score, self.global_best_score) ): self.global_best_score = score self.global_best_sol = particle.position.copy() def advance(self): """ Moves all the particles, then checks for new best scores """ for particle in self.particles: particle.advance_position(self.global_best_sol, self.alpha, self.beta, self.gamma) self.set_global_best() eval_function = lambda x,y: -(math.sin(x)*math.sin(x**2/math.pi)**20 + math.sin(y)*math.sin(2*y**2/math.pi)**20) bounds = [[0, 4], [0, 4]] cns_func = ContinuousFunction(eval_function, bounds) PSO = PSOManager(cns_func, 2, 100, alpha=0.9, beta=1, gamma=1, max_speed=0.1, maximization=True) gen = 0 while True: gen += 1 PSO.advance() print(f"Generation {gen}: Best score = {PSO.global_best_score} for sol = {PSO.global_best_sol}")