Source code for desdeo_mcdm.interactive.ReferencePointMethod

from typing import Callable, Dict, List, Optional, Union

import numpy as np
from desdeo_mcdm.interactive.InteractiveMethod import InteractiveMethod
from desdeo_problem.problem import DiscreteDataProblem, MOProblem, VectorObjective, _ScalarObjective, variable_builder
from desdeo_tools.interaction.request import BaseRequest
from desdeo_tools.scalarization import ReferencePointASF
from desdeo_tools.scalarization.Scalarizer import DiscreteScalarizer, Scalarizer
from desdeo_tools.solver.ScalarSolver import DiscreteMinimizer, ScalarMethod, ScalarMinimizer
from scipy.optimize import differential_evolution

"""
Reference Point Method (RPM)
"""


[docs]class RPMException(Exception): """ Raised when an exception related to Reference Point Method (RFM) is encountered. """ pass
[docs]def validate_reference_point(ref_point: np.ndarray, ideal: np.ndarray, nadir: np.ndarray) -> None: """ Validate Decion maker's reference point. Args: ref_point (np.ndarray): Reference point. ideal (np.ndarray): Ideal vector. nadir (np.ndarray): Nadir vector. Raises: RPMException: In case reference point is invalid. """ if not ideal.shape == ref_point.shape: raise RPMException("The dimension of the ideal and reference point do not match.") if all(np.less(nadir, ref_point)): raise RPMException("Reference point cannot be worse than nadir point.") # or can it?
[docs]class RPMInitialRequest(BaseRequest): """ A request class to handle the Decision Maker's initial preferences for the first iteration round. """ def __init__(self, ideal: np.ndarray, nadir: np.ndarray) -> None: """ Initialize with ideal and nadir vectors. Args: ideal (np.ndarray): Ideal vector. nadir (np.ndarray): Nadir vector. """ self.n_objectives = len(ideal) self._ideal = ideal self._nadir = nadir msg = "Please specify a reference point as 'reference_point'." content = { "message": msg, "ideal": ideal, "nadir": nadir, } super().__init__("reference_point_preference", "required", content=content) @classmethod
[docs] def init_with_method(cls, method: InteractiveMethod): """ Initialize request with given instance of ReferencePointMethod. Args: method (ReferencePointMethod): Instance of ReferencePointMethod-class. Returns: RPMInitialRequest: Initial request. """ return cls(method._ideal, method._nadir)
@BaseRequest.response.setter def response(self, response: Dict) -> None: """ Set the Decision Maker's response information for initial request. Args: response (Dict): The Decision Maker's response. Raises: RPMException: In case reference point is missing. """ if "reference_point" not in response: msg = "Reference point missing. Please specify a reference point as 'reference_point." raise RPMException(msg) else: validate_reference_point(response["reference_point"], self._ideal, self._nadir) self._response = response
[docs]class RPMRequest(BaseRequest): """ A request class to handle the Decision Maker's preferences after the first iteration round. Args: f_current (np.ndarray): Current solution. f_additionals (np.ndarray): Additional solutions. ideal (np.ndarray): Idea vector. nadir (np.ndarray): Nadir vector. """ def __init__(self, f_current: np.ndarray, f_additionals: np.ndarray, ideal: np.ndarray, nadir: np.ndarray,) -> None: self._f_current = f_current self._f_additionals = f_additionals self._ideal = ideal self._nadir = nadir msg = ( "In case you are satisfied with one of the solutions, please state: " "1. 'satisfied' as 'True'." "2. 'solution_index' as the index number of the solution you choose, so that first solution has index " "number of 0, second 1 and so on." "Otherwise, please state 'satisfied' as 'False and specify a new reference point as 'reference_point'." ) content = {"message": msg, "current_solution": f_current, "additional_solutions": f_additionals} super().__init__("reference_point_preference", "required", content=content) @BaseRequest.response.setter def response(self, response: Dict) -> None: """ Set the Decision Maker's response information for request. Args: response (Dict): The Decision Maker's response. Raises: RPMException: In case response is invalid. """ if "satisfied" in response and response["satisfied"]: if not response["solution_index"] and not response["solution_index"] == 0: raise RPMException( "If you are satisfied with one of the solutions, please specify the index of the " "solution as 'solution_index'." ) if not (0 <= response["solution_index"] <= self._f_current.shape[0]): msg = ( "Solution index must range from 0 to number of objectives - 1 '{}'. " "Given solution index: '{}." ).format(self._f_current.shape[0], response["solution_index"]) raise RPMException(msg) else: if "reference_point" not in response: raise RPMException("New reference point information missing. Please specify it as 'reference_point'.") else: validate_reference_point(response["reference_point"], self._ideal, self._nadir) self._response = response
[docs]class RPMStopRequest(BaseRequest): """ A request class to handle termination. """ def __init__(self, x_h: np.ndarray, f_h: np.ndarray) -> None: """ Initialize termination request with final solution and objective vector. Args: x_h (np.ndarray): Solution (decision variables). f_h (np.ndarray): Objective vector. """ msg = "Final solution found." content = {"message": msg, "solution": x_h, "objective_vector": f_h} super().__init__("print", "no_interaction", content=content)
[docs]class ReferencePointMethod(InteractiveMethod): """ Implements the Reference Point Method as presented in |Wierzbicki_1982|. In the Reference Point Method, the Decision Maker (DM) specifies **desirable aspiration levels** for objective functions. Vectors formed of these aspiration levels are then used to derive scalarizing functions having minimal values at weakly, properly or Pareto optimal solutions. It is important that reference points are intuitive and easy for the DM to specify, their consistency is not an essential requirement. Before the solution process starts, some information is given to the DM about the problem. If possible, the ideal objective vector and the (approximated) nadir objective vector are presented. At each iteration, the DM is asked to give desired aspiration levels for the objective functions. Using this information to formulate a **reference point**, achievement function is minimized and a (weakly, properly or) Pareto optimal solution is obtained. This solution is then presented to the DM. In addition, k other (weakly, properly or) Pareto optimal solutions are calculated using **perturbed reference points**, where k is the number of objectives in the problem. The alternative solutions are also presented to the DM. If (s)he finds any of the k + 1 solutions satisfactory, the solution process is ended. Otherwise, the DM is asked to present a new reference point and the iteration described above is repeated. The idea in perturbed reference points is that the DM gets **better understanding** of the possible solutions around the current solution. If the reference point is far from the Pareto optimal set, the DM gets a **wider** description of the Pareto optimal set and if the reference point is near the Pareto optimal set, then a **finer** description of the Pareto optimal set is given. In this method, the DM has to specify aspiration levels and compare objective vectors. The DM is **free to change** her/his mind during the process and can direct the solution process without being forced to understand complicated concepts and their meaning. On the other hand, the method does not necessarily help the DM to find more satisfactory solutions. Args: problem (MOProblem): Problem to be solved. ideal (np.ndarray): The ideal objective vector of the problem. nadir (np.ndarray): The nadir objective vector of the problem. This may also be the "worst" objective vector provided by the Decision Maker if the approximation of Nadir vector is not applicable or if the Decision Maker wishes to provide even worse objective vector than what the approximated Nadir vector is. epsilon (float): A small number used in calculating the utopian point. epsilon (float): A small number used in calculating the utopian point. Default value is 1e-6. objective_names (Optional[List[str]], optional): Names of the objectives. The length of the list must match the number of elements in ideal vector. minimize (Optional[List[int]], optional): Multipliers for each objective. '-1' indicates maximization and '1' minimization. Defaults to all objective values being minimized. Raises: RPMException: Dimensions of ideal, nadir, objective_names, and minimize-list do not match. """ def __init__( self, problem: Union[MOProblem, DiscreteDataProblem], ideal: np.ndarray, nadir: np.ndarray, epsilon: float = 1e-6, objective_names: Optional[List[str]] = None, minimize: Optional[List[int]] = None, ): if not ideal.shape == nadir.shape: raise RPMException("The dimensions of the ideal and nadir point do not match.") if objective_names: if not len(objective_names) == ideal.shape[0]: raise RPMException( "The supplied objective names must have a leangth equal to " "the number of objectives." ) self._objective_names = objective_names else: self._objective_names = [f"f{i + 1}" for i in range(ideal.shape[0])] if minimize: if not len(objective_names) == ideal.shape[0]: raise RPMException("The minimize list must have " "as many elements as there are objectives.") self._minimize = minimize else: self._minimize = [1 for _ in range(ideal.shape[0])] self._ideal = ideal self._nadir = nadir self._utopian = ideal - epsilon self._n_objectives = self._ideal.shape[0] # current iteration step number self._h = 1 # solutions in decision and objective space, distances and referation points for each iteration self._xs = [None] * 10 self._fs = [None] * 10 self._ds = [None] * 10 self._qs = [None] * 10 # perturbed reference points self._pqs = [None] * 10 # additional solutions self._axs = [None] * 10 self._afs = [None] * 10 # current reference point self._q: Union[None, np.ndarray] = None # weighting vector for achievement function self._w: np.ndarray = [] self._problem = problem if isinstance(problem, MOProblem): # initialize method with MOProblem self._objectives: Callable = lambda x: self._problem.evaluate(x).objectives self._variable_bounds: Union[np.ndarray, None] = problem.get_variable_bounds() self._variable_vectors = None self._constraints: Optional[Callable] = lambda x: self._problem.evaluate(x).constraints # evolutionary method for minimizing self._method_de: ScalarMethod = ScalarMethod( lambda x, _, **y: differential_evolution(x, **y), method_args={"disp": False, "polish": False, "tol": 0.000001, "popsize": 10, "maxiter": 50000}, use_scipy=True, ) else: # Initialize the method with DiscreteData self._objectives = problem.objectives self._variable_bounds = None # TODO: check me self._variable_vectors = problem.decision_variables self._constraints = None # TODO: check me self._method_de = "discrete"
[docs] def start(self) -> RPMInitialRequest: """ Start the solution process with initializing the first request. Returns: RPMInitialRequest: Initial request. """ return RPMInitialRequest.init_with_method(self)
[docs] def iterate( self, request: Union[RPMInitialRequest, RPMRequest, RPMStopRequest] ) -> Union[RPMRequest, RPMStopRequest]: """ Perform the next logical iteration step based on the given request type. Args: request (Union[RPMInitialRequest, RPMRequest]): Either initial or intermediate request. Returns: Union[RPMRequest, RPMStopRequest]: A new request with content depending on the Decision Maker's preferences. """ if type(request).__name__ == RPMInitialRequest.__name__: return self.handle_initial_request(request) elif type(request).__name__ == RPMRequest.__name__: return self.handle_request(request) else: # if stop request, do nothing return request
[docs] def handle_initial_request(self, request: RPMInitialRequest) -> RPMRequest: """ Handles the initial request by parsing the response appropriately. Args: request (RPMInitialRequest): Initial request including the Decision Maker's initial preferences. Returns: RPMRequest: New request with updated solution process information. """ # set initial referation point self._qs[self._h] = request.response["reference_point"] self._q = self._qs[self._h] # set weighting vector self._w = self._q / (self._utopian - self._nadir) # set initial values for decision variables if isinstance(self._problem, MOProblem): x0 = self._problem.get_variable_upper_bounds() / 2 else: # discrete case x0 = self._variable_vectors[0] # this is ignored in the discrete case # solve the ASF-problem result = self.solve_asf( self._q, x0, self._w, self._nadir, self._utopian, self._objectives, self._variable_vectors, self._variable_bounds, method=self._method_de, ) # update current solution and objective function values if isinstance(self._problem, MOProblem): self._xs[self._h] = result["x"] self._fs[self._h] = self._objectives(self._xs[self._h])[0] else: # discrete case self._xs[self._h] = self._variable_vectors[result["x"]] self._fs[self._h] = self._objectives[result["x"]] # calculate perturbed reference points self._pqs[self._h] = self.calculate_prp(self._q, self._fs[self._h]) # calculate n other solutions with perturbed reference points results_additional = [ self.solve_asf( pqi, x0, self._w, self._nadir, self._utopian, self._objectives, self._variable_vectors, self._variable_bounds, self._method_de, ) for pqi in self._pqs[self._h] ] # store results into arrays if isinstance(self._problem, MOProblem): self._axs[self._h] = [result["x"] for result in results_additional] self._afs[self._h] = [self._objectives(xs_i)[0] for xs_i in self._axs[self._h]] else: self._axs[self._h] = [self._variable_vectors[result["x"]] for result in results_additional] self._afs[self._h] = [self._objectives[result["x"]] for result in results_additional] # return the information from iteration round to be shown to the DM. return RPMRequest(self._fs[self._h], self._afs[self._h], self._ideal, self._nadir)
[docs] def handle_request(self, request: RPMRequest) -> Union[RPMRequest, RPMStopRequest]: """ Handle the Decision Maker's requests after the first iteration round, so-called **intermediate requests.** Args: request (RPMRequest): Intermediate request including the Decision Maker's response. Returns: Union[RPMRequest, RPMStopRequest]: In case last iteration, request to stop the solution process. Otherwise, new request with updated solution process information. """ resp: dict = request.response # end solution finding process if "satisfied" in resp and resp["satisfied"]: if resp["solution_index"] == 0: # "original" solution return RPMStopRequest(self._xs[self._h], self._fs[self._h]) else: # additional solution return RPMStopRequest( self._axs[self._h][resp["solution_index"] - 1], self._afs[self._h][resp["solution_index"] - 1] ) # continue with new reference point given by the DM else: self._h += 1 if len(self._qs) - self._h <= 2: # "expand" space on arrays extra_space = [None] * 10 self._qs = np.array(np.concatenate((self._qs, extra_space), axis=None), dtype=object) self._xs = np.array(np.concatenate((self._xs, extra_space), axis=None), dtype=object) self._fs = np.array(np.concatenate((self._fs, extra_space), axis=None), dtype=object) self._pqs = np.array(np.concatenate((self._pqs, extra_space), axis=None), dtype=object) self._axs = np.array(np.concatenate((self._axs, extra_space), axis=None), dtype=object) self._afs = np.array(np.concatenate((self._afs, extra_space), axis=None), dtype=object) # set new reference point self._qs[self._h] = resp["reference_point"] self._q = self._qs[self._h] # set weighting vector self._w = self._q / (self._utopian - self._nadir) # set initial values for decision variables if isinstance(self._problem, MOProblem): x0 = self._problem.get_variable_upper_bounds() / 2 else: # discrete case x0 = self._variable_vectors[0] # this is ignored in the discrete case # solve the ASF-problem result = self.solve_asf( self._q, x0, self._w, self._nadir, self._utopian, self._objectives, self._variable_vectors, self._variable_bounds, method=self._method_de, ) # update current solution and objective function values if isinstance(self._problem, MOProblem): self._xs[self._h] = result["x"] self._fs[self._h] = self._objectives(self._xs[self._h])[0] else: # discrete case self._xs[self._h] = self._variable_vectors[result["x"]] self._fs[self._h] = self._objectives[result["x"]] # calculate perturbed reference points self._pqs[self._h] = self.calculate_prp(self._q, self._fs[self._h]) # calculate n other solutions with perturbed reference points results_additional = [ self.solve_asf( pqi, x0, self._w, self._nadir, self._utopian, self._objectives, self._variable_vectors, self._variable_bounds, self._method_de, ) for pqi in self._pqs[self._h] ] # store results into arrays if isinstance(self._problem, MOProblem): self._axs[self._h] = [result["x"] for result in results_additional] self._afs[self._h] = [self._objectives(xs_i)[0] for xs_i in self._axs[self._h]] else: self._axs[self._h] = [self._variable_vectors[result["x"]] for result in results_additional] self._afs[self._h] = [self._objectives[result["x"]] for result in results_additional] # return the information from iteration round to be shown to the DM. return RPMRequest(self._fs[self._h], self._afs[self._h], self._ideal, self._nadir)
[docs] def calculate_prp(self, ref_point: np.ndarray, f_current: np.ndarray) -> np.ndarray: """ Calculate perturbed reference points. Args: ref_point (np.ndarray): Current reference point. f_current (np.ndarray): Current solution. Returns: np.ndarray: Perturbed reference points. """ # distance d = np.linalg.norm(np.atleast_2d(ref_point - f_current)) # unit vectors ei = np.array([np.zeros(len(ref_point))]) es = np.repeat(ei, len(ref_point), axis=0) for i, j in enumerate(es): for ind, _ in enumerate(j): if ind == i: j[ind] = 1 return ref_point + (d * es)
[docs] def solve_asf( self, ref_point: np.ndarray, x0: np.ndarray, preferential_factors: np.ndarray, nadir: np.ndarray, utopian: np.ndarray, objectives: Callable, variable_vectors: Optional[np.ndarray] = None, variable_bounds: Optional[np.ndarray] = None, method: Union[ScalarMethod, str, None] = None, ) -> dict: """ Solve Achievement scalarizing function. Args: ref_point (np.ndarray): Reference point. x0 (np.ndarray): Initial values for decision variables. preferential_factors (np.ndarray): Preferential factors on how much would the Decision Maker wish to improve the values of each objective function. nadir (np.ndarray): Nadir vector. utopian (np.ndarray): Utopian vector. objectives (np.ndarray): The objective function values for each input vector. variable_bounds (Optional[np.ndarray)]: Lower and upper bounds of each variable as a 2D numpy array. If undefined variables, None instead. method (Union[ScalarMethod, str, None]): The optimization method the scalarizer should be minimized with. Returns: dict: A dictionary with at least the following entries: 'x' indicating the optimal variables found, 'fun' the optimal value of the optimized function, and 'success' a boolean indicating whether the optimization was conducted successfully. """ if method != "discrete": # non discrete case if variable_bounds is None: # set all bounds as [-inf, inf] variable_bounds = np.array([[-np.inf, np.inf]] * x0.shape[0]) # scalarize problem using reference point asf = ReferencePointASF(preferential_factors, nadir, utopian, rho=1e-4) asf_scalarizer = Scalarizer( evaluator=objectives, scalarizer=asf, scalarizer_args={"reference_point": ref_point} ) # minimize minimizer = ScalarMinimizer(asf_scalarizer, variable_bounds, method=method) return minimizer.minimize(x0) else: # discrete case # scalarize problem using reference point asf = ReferencePointASF(preferential_factors, nadir, utopian, rho=1e-4) asf_scalarizer = DiscreteScalarizer(asf, scalarizer_args={"reference_point": ref_point}) # minimize the discrete problem minimizer = DiscreteMinimizer(asf_scalarizer) res = minimizer.minimize(objectives) return res
# testing the method if __name__ == "__main__": print("Reference point method") # Objectives
[docs] def f1(xs): xs = np.atleast_2d(xs) return -4.07 - 2.27 * xs[:, 0]
def f2(xs): xs = np.atleast_2d(xs) return ( -2.60 - 0.03 * xs[:, 0] - 0.02 * xs[:, 1] - (0.01 / (1.39 - xs[:, 0] ** 2)) - (0.30 / (1.39 - xs[:, 1] ** 2)) ) def f3(xs): xs = np.atleast_2d(xs) return -8.21 + (0.71 / (1.09 - xs[:, 0] ** 2)) def f4(xs): xs = np.atleast_2d(xs) return -0.96 + (0.96 / (1.09 - xs[:, 1] ** 2)) def objectives(xs): return np.stack((f1(xs), f2(xs), f3(xs), f4(xs))).T obj1 = _ScalarObjective("obj1", f1) obj2 = _ScalarObjective("obj2", f2) obj3 = _ScalarObjective("obj3", f3) obj4 = _ScalarObjective("obj4", f4) objkaikki = VectorObjective("obj", objectives) # variables var_names = ["x1", "x2"] # Make sure that the variable names are meaningful to you. initial_values = np.array([0.5, 0.5]) lower_bounds = [0.3, 0.3] upper_bounds = [1.0, 1.0] bounds = np.stack((lower_bounds, upper_bounds)) variables = variable_builder(var_names, initial_values, lower_bounds, upper_bounds) # problem prob = MOProblem(objectives=[obj1, obj2, obj3, obj4], variables=variables) # objectives "seperately" # solved in Nautilus.py ideal = np.array([-6.34, -3.44487179, -7.5, 0]) nadir = np.array([-4.751, -2.86054116, -0.32111111, 9.70666666]) # start solving method = ReferencePointMethod(problem=prob, ideal=ideal, nadir=nadir) # Pareto optimal solution: [-6.30, -3.26, -2.60, 3.63] print("Let's start solving\n") req = method.start() rp = np.array([-5.0, -3.0, -3.0, 5.0]) req.response = { "reference_point": rp, } print(f"Step number: {method._h}") # 1 - continue with same preferences req = method.iterate(req) step = 1 print("\nStep number: ", method._h) print("Reference point: ", rp) print("Pareto optimal solution: ", req.content["current_solution"]) print("Additional solutions: ", req.content["additional_solutions"]) while step < 4: step += 1 rp = np.array([np.random.uniform(i, n) for i, n in zip(ideal, nadir)]) req.response = {"reference_point": rp, "satisfied": False} req = method.iterate(req) print("\nStep number: ", method._h) print("Reference point: ", rp) print("Pareto optimal solution: ", req.content["current_solution"]) print("Additional solutions: ", req.content["additional_solutions"]) req.response = {"satisfied": True, "solution_index": 0} req = method.iterate(req) print(req.content)