Source code for desdeo_mcdm.interactive.NautilusNavigator

from typing import Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd
from desdeo_mcdm.interactive.InteractiveMethod import InteractiveMethod
from desdeo_tools.interaction.request import BaseRequest, SimplePlotRequest
from desdeo_tools.scalarization.ASF import PointMethodASF
from desdeo_tools.scalarization.Scalarizer import DiscreteScalarizer
from desdeo_tools.solver.ScalarSolver import DiscreteMinimizer


[docs]class NautilusNavigatorStopRequest(BaseRequest): """Request to stop navigation and return the solution found (or the currently reachable solutions, if stopped before the navigation ends. """ def __init__( self, reachable_idx: List[int], pareto_front: np.ndarray, decision_variables: Optional[np.ndarray] = None, ): message = ( "These are the current objectives vectors (and decision vectors, if supplied)" "reachable from the current navigation point." ) if decision_variables is not None: reachable_decision = decision_variables[reachable_idx] else: reachable_decision = None content = { "message": message, "objective_vectors": pareto_front[reachable_idx], "decision_vectors": reachable_decision, "reachable_idx": reachable_idx, } super().__init__("classification_preference", "no_interaction", content=content)
[docs]class NautilusNavigatorRequest(BaseRequest): """Request to handle interactions with NAUTILUS Navigator. See the NautilusNavigator class for further details. """ def __init__( self, ideal: np.ndarray, nadir: np.ndarray, reachable_lb: np.ndarray, reachable_ub: np.ndarray, user_bounds: List[float], reachable_idx: List[int], step_number: int, steps_remaining: int, distance: float, allowed_speeds: List[int], current_speed: int, navigation_point: np.ndarray, ): msg = ( # TODO: Be more specific... "Please supply aspirations levels for each objective between " "the upper and lower bounds as `reference_point`. Specify a " "speed between 1-5 as `speed`. If going to a previous step is " "desired, please set `go_to_previous` to True, otherwise it should " "be False. " "Bounds for one or more objectives may also be specified as 'user_bounds'; when navigating," "the value of the objectives present in the navigation points will not exceed the values" "specified in 'user_bounds'." "Lastly, if stopping is desired, `stop` should be True, " "otherwise it should be set to False." ) content = { "message": msg, "ideal": ideal, "nadir": nadir, "reachable_lb": reachable_lb, "reachable_ub": reachable_ub, "user_bounds": user_bounds, "reachable_idx": reachable_idx, "step_number": step_number, "steps_remaining": steps_remaining, "distance": distance, "allowed_speeds": allowed_speeds, "current_speed": current_speed, "navigation_point": navigation_point, } super().__init__("reference_point_preference", "required", content=content) @classmethod
[docs] def init_with_method(cls, method): return cls( method._ideal, method._nadir, method._reachable_lb, method._reachable_ub, method._user_bounds, method._reachable_idx, method._step_number, method._steps_remaining, method._distance, method._allowed_speeds, method._current_speed, method._navigation_point, )
[docs] def validator(self, response: Dict) -> None: if "reference_point" not in response: raise NautilusNavigatorException("'reference_point' entry missing.") if "speed" not in response: raise NautilusNavigatorException("'speed' entry missing.") if "go_to_previous" not in response: raise NautilusNavigatorException("'go_to_previous' entry missing.") if "user_bounds" not in response: raise NautilusNavigatorException("'user_bounds' entry missing") if "stop" not in response: raise NautilusNavigatorException("'stop' entry missing.") ref_point = response["reference_point"] try: if np.any(ref_point < self._content["ideal"]) or np.any( ref_point > self._content["nadir"] ): raise NautilusNavigatorException( f"The given reference point {ref_point} " "must be between the ranges imposed by the ideal and nadir points." ) except Exception as e: raise NautilusNavigatorException( f"An exception rose when validating the given reference point {ref_point}.\n" f"Previous exception: {type(e)}: {str(e)}." ) speed = response["speed"] try: if int(speed) not in self._content["allowed_speeds"]: raise NautilusNavigatorException(f"Invalid speed: {speed}.") except Exception as e: raise NautilusNavigatorException( f"An exception rose when validating the given speed {speed}.\n" f"Previous exception: {type(e)}: {str(e)}." ) if not type(response["go_to_previous"]) == bool: raise ( f"Non boolean value {response['go_to_previous']} " f"found for 'go_to_previous' when validating the response." ) # This ensures that Nones are converted to np.nan user_bounds = np.array(response["user_bounds"], dtype=float) try: if len(user_bounds) != self._content["ideal"].size: raise NautilusNavigatorException( f"The given user bounds '{user_bounds}' has mismatching dimensions compared " f"to the ideal point '{self._content['ideal']}'." ) """ if np.any(user_bounds < self._content["reachable_lb"]) or np.any( user_bounds > self._content["reachable_ub"] ): raise NautilusNavigatorException( f"The given user bounds '{user_bounds}' has one or more elements that are outside the region " f"bounded by the reachable upper and lower bounds of the current navigation points. " f"Current lower bounds: {self._content['reachable_lb']} " f"Current upper bounds: {self._content['reachable_ub']}." ) """ except Exception as e: raise NautilusNavigatorException( f"An exception rose when validating the given user bounds " f"{user_bounds}.\n" f"Previous exception: {type(e)}: {str(e)}." ) if not type(response["stop"]) == bool: raise ( f"Non boolean value {response['stop']} " f"found for 'go_to_previous' when validating the response." )
@BaseRequest.response.setter def response(self, response: Dict): self.validator(response) self._response = response
[docs]class NautilusNavigatorException(Exception): """Raised when an exception related to NAUTILUS Navigator is encountered. """ pass
[docs]class NautilusNavigator(InteractiveMethod): """Implementations of the NAUTILUS Navigator algorithm. Args: pareto_front (np.ndarray): A two dimensional numpy array representing a Pareto front with objective vectors on each of its rows. ideal (np.ndarray): The ideal objective vector of the problem being represented by the Pareto front. nadir (np.ndarray): The nadir objective vector of the problem being represented by the Pareto front. decision_variables (Optional[np.ndarray]): Two dimensinoal numpy array of decision variables that can be optionally supplied. The i'th vector in decision_variables should result in the i'th objective vector in pareto_front. Defaults to None. Raises: NautilusNavigatorException: One or more dimension mismatches are encountered among the supplies arguments. """ def __init__( self, pareto_front: np.ndarray, ideal: np.ndarray, nadir: np.ndarray, decision_variables: Optional[np.ndarray] = None, ): if not pareto_front.ndim == 2: raise NautilusNavigatorException( "The supplied Pareto front should be a two dimensional array. Found " f" number of dimensions {pareto_front.ndim}." ) if decision_variables is not None: # check correct dimensino of decision_variables, if supplied. if decision_variables.shape[0] != pareto_front.shape[0]: raise NautilusNavigatorException( "The supplied decision variables must be as many as the objective " "vectors in the supplied Pareto front. Dimension of variables " f"{decision_variables.shape[0]}; dimension of Pareto front " f"{pareto_front.shape[0]}." ) self._decision_variables = decision_variables if not ideal.shape[0] == pareto_front.shape[1]: raise NautilusNavigatorException( "The Pareto front must consist of objective vectors with the " "same number of objectives as defined in the ideal and nadir " "points." ) if not ideal.shape == nadir.shape: raise NautilusNavigatorException( "The dimensions of the ideal and nadir point do not match." ) self._ideal = ideal self._nadir = nadir # in objective space! self._pareto_front = pareto_front # bounds of the reachable region self._reachable_ub = self._nadir self._reachable_lb = self._ideal # user given bounds, defaults to none self._user_bounds = np.repeat(np.nan, self._ideal.size) # currently reachable solution as a list of indices of the Pareto front self._reachable_idx = list(range(0, self._pareto_front.shape[0])) # current iteration step number self._step_number = 1 # iterations left self._steps_remaining = 100 # L2 distance to the supplied Pareto front self._distance = 0 self._allowed_speeds = [1, 2, 3, 4, 5] self._current_speed = None self._reference_point = None self._navigation_point = self._nadir self._projection_index = None
[docs] def start(self) -> NautilusNavigatorRequest: """Returns the first Request object to begin iterating. Returns: NautilusNavigatorRequest: The Request. """ return NautilusNavigatorRequest.init_with_method(self)
[docs] def iterate(self, request: NautilusNavigatorRequest) -> NautilusNavigatorRequest: """Perform the next logical step based on the response in the Request. """ reqs = self.handle_request(request) return reqs
[docs] def handle_request( self, request: NautilusNavigatorRequest ) -> Union[NautilusNavigatorRequest, NautilusNavigatorStopRequest]: """Handle the Request and its contents. Args: request (NautilusNavigatorRequest): A Request with a defined response. Returns: NautilusNavigatorRequest: Some of the contents of the response are invalid. """ preference_point = request.response["reference_point"] speed = request.response["speed"] go_to_previous = request.response["go_to_previous"] # ensure Nones are converted to NaN user_bounds = np.array(request.response["user_bounds"], dtype=float) stop = request.response["stop"] reachable_idx = request.content["reachable_idx"] if stop: return NautilusNavigatorStopRequest( reachable_idx, self._pareto_front, self._decision_variables ) if go_to_previous: step_number = request.content["step_number"] nav_point = request.content["navigation_point"] lower_bounds = request.content["reachable_lb"] upper_bounds = request.content["reachable_ub"] distance = request.content["distance"] steps_remaining = request.content["steps_remaining"] return self.update( preference_point, speed, go_to_previous, stop, step_number, nav_point, lower_bounds, upper_bounds, user_bounds, reachable_idx, distance, steps_remaining, ) else: return self.update( preference_point, speed, go_to_previous, stop, user_bounds=user_bounds )
[docs] def update( self, ref_point: np.ndarray, speed: int, go_to_previous: bool, stop: bool, step_number: Optional[int] = None, nav_point: Optional[np.ndarray] = None, lower_bounds: Optional[np.ndarray] = None, upper_bounds: Optional[np.ndarray] = None, user_bounds: Optional[np.ndarray] = None, reachable_idx: Optional[List[int]] = None, distance: Optional[float] = None, steps_remaining: Optional[int] = None, ) -> Optional[NautilusNavigatorRequest]: """Update the internal state of self. Args: ref_point (np.ndarray): A reference point given by a decision maker. speed (int): An integer value between 1-5 indicating the navigation speed. go_to_previous (bool): If True, the parameters indicate the state of a previous state, and the request is handled accordingly. stop (bool): If the navigation should stop. If True, returns a request with self's current state. step_number (Optional[int], optional): Current step number, or previous step number if go_to_previous is True. Defaults to None. nav_point (Optional[np.ndarray], optional): The current navigation point. Relevant if go_to_previous is True. Defaults to None. lower_bounds (Optional[np.ndarray], optional): Lower bounds of the reachable objective vector values. Relevant if go_to_previous is True. Defaults to None. upper_bounds (Optional[np.ndarray], optional): Upper bounds of the reachable objective vector values. Relevant if go_to_previous is True. Defaults to None. user_bounds (Optional[np.ndarray], optional): The user given bounds for each objective. The reachable lower limit with attempt to not exceed the given bounds for each objective value. reachable_idx (Optional[List[int]], optional): Indices of the reachable Pareto optimal solutions. Relevant if go_to_previous is True. Defaults to None. distance (Optional[float], optional): Distance to the Pareto optimal front. Relevant if go_to_previous is True. Defaults to None. steps_remaining (Optional[int], optional): Remaining steps in the navigation. Relevant if go_to_previous is True. Defaults to None. Returns: NautilusNavigatorRequest: Some of the given parameters are erroneous. """ # if stop navigation, return request with current state if stop: return NautilusNavigatorRequest.init_with_method(self) # go to a previous state elif go_to_previous: self._step_number = step_number self._navigation_point = nav_point self._reachable_lb = lower_bounds self._reachable_ub = upper_bounds self._user_bounds = user_bounds self._reachable_idx = reachable_idx self._distance = distance self._steps_remaining = steps_remaining return NautilusNavigatorRequest.init_with_method(self) # compute a new navigation point closer to the Pareto front and the # bounds of the reachable Pareto optimal region. elif self._step_number == 1 or not np.allclose( ref_point, self._reference_point ): if self._step_number == 1: self._current_speed = speed proj_i = self.solve_nautilus_asf_problem( self._pareto_front, self._reachable_idx, ref_point, self._ideal, self._nadir, self._user_bounds, ) self._reference_point = ref_point self._projection_index = proj_i new_nav = self.calculate_navigation_point( self._pareto_front[self._projection_index], self._navigation_point, self._steps_remaining, ) self._navigation_point = new_nav self._user_bounds = user_bounds new_lb, new_ub = self.calculate_bounds( self._pareto_front[self._reachable_idx], self._navigation_point, self._user_bounds, self._reachable_lb, self._reachable_ub, ) self._reachable_lb = new_lb self._reachable_ub = new_ub new_dist = self.calculate_distance( self._navigation_point, self._pareto_front[self._projection_index], self._nadir, ) self._distance = new_dist new_reachable = self.calculate_reachable_point_indices( self._pareto_front, self._reachable_lb, self._reachable_ub, ) self._reachable_idx = new_reachable # If stop, do not update steps if self._steps_remaining == 1: # stop return NautilusNavigatorRequest.init_with_method(self) self._step_number += 1 self._steps_remaining -= 1 return NautilusNavigatorRequest.init_with_method(self)
[docs] def calculate_reachable_point_indices( self, pareto_front: np.ndarray, lower_bounds: np.ndarray, upper_bounds: np.ndarray, ) -> List[int]: """Calculate the indices of the reachable Pareto optimal solutions based on lower and upper bounds. Returns: List[int]: List of the indices of the reachable solutions. """ low_idx = np.all(pareto_front >= lower_bounds, axis=1) up_idx = np.all(pareto_front <= upper_bounds, axis=1) reachable_idx = np.argwhere(low_idx & up_idx).squeeze() return reachable_idx
@staticmethod
[docs] def solve_nautilus_asf_problem( pareto_f: np.ndarray, subset_indices: List[int], ref_point: np.ndarray, ideal: np.ndarray, nadir: np.ndarray, user_bounds: np.ndarray, ) -> int: """Forms and solves the achievement scalarizing function to find the closest point on the Pareto optimal front to the given reference point. Args: pareto_f (np.ndarray): The whole Pareto optimal front. subset_indices ([type]): Indices of the currently reachable solutions. ref_point (np.ndarray): The reference point indicating a decision maker's preference. ideal (np.ndarray): Ideal point. nadir (np.ndarray): Nadir point. user_bounds (np.ndarray): Bounds given by the user (the DM) for each objective,which should not be exceeded. A 1D array where NaN's indicate 'no bound is given' for the respective objective value. Returns: int: Index of the closest point according the minimized value of the ASF. """ asf = PointMethodASF(nadir, ideal) scalarizer = DiscreteScalarizer(asf, {"reference_point": ref_point}) solver = DiscreteMinimizer(scalarizer) # Copy the front and filter out the reachable solutions. # If user bounds are given, filter out solutions outside the those bounds. # Infeasible solutions on the pareto font are set to be NaNs. tmp = np.copy(pareto_f) mask = np.zeros(tmp.shape[0], dtype=bool) mask[subset_indices] = True tmp[~mask] = np.nan # indices of solutions with one or more objective value exceeding the user bounds. bound_mask = np.any(tmp > user_bounds, axis=1) tmp[bound_mask] = np.nan res = solver.minimize(tmp) return res["x"]
[docs] def calculate_navigation_point( self, projection: np.ndarray, nav_point: np.ndarray, steps_remaining: int, ) -> np.ndarray: """Calculate a new navigation point based on the projection of the preference point to the Pareto optimal front. Args: projection (np.ndarray): The point on the Pareto optimal front closest to the preference point given by a decision maker. nav_point (np.ndarray): The previous navigation point. steps_remaining (int): How many steps are remaining in the navigation. Returns: np.ndarray: The new navigation point. """ new_nav_point = ((steps_remaining - 1) / steps_remaining) * nav_point + ( 1 / steps_remaining ) * projection return new_nav_point
@staticmethod
[docs] def calculate_bounds( pareto_front: np.ndarray, nav_point: np.ndarray, user_bounds: np.ndarray, previous_lb: np.ndarray, previous_ub: np.ndarray, ) -> Tuple[np.ndarray, np.ndarray]: """Calculate the new bounds of the reachable points on the Pareto optimal front from a navigation point. Args: pareto_front (np.ndarray): The Pareto optimal front. nav_point (np.ndarray): The current navigation point. user_bounds (np.ndarray): Bounds given by the user (the DM) for each objective,which should not be exceeded. A 1D array where NaN's indicate 'no bound is given' for the respective objective value. previous_lb (np.ndarray): If no new lower bound can be found for an objective, this value is used. previous_ub (np.ndarray): If no new upper bound can be found for an objective, this value is used. Returns: Tuple[np.ndarray, np.ndarray]: The lower and upper bounds. """ # make sure the front is at least 2D _pareto_front = np.atleast_2d(pareto_front) new_lower_bounds = np.zeros(_pareto_front.shape[1]) new_upper_bounds = np.zeros(_pareto_front.shape[1]) # discard solutions that breach the given user bounds by setting them to NaN user_bounds_mask = np.any(_pareto_front > user_bounds, axis=1) _pareto_front[user_bounds_mask] = np.nan # TODO: vectorize this loop for r in range(_pareto_front.shape[1]): mask = np.zeros(_pareto_front.shape[1], dtype=bool) mask[r] = True subject_to = _pareto_front[:, ~mask].reshape( (_pareto_front.shape[0], _pareto_front.shape[1] - 1) ) con_mask = np.all(subject_to <= nav_point[~mask], axis=1) if _pareto_front[con_mask, mask].size != 0: min_val = np.nanmin(_pareto_front[con_mask, mask]) max_val = np.nanmax(_pareto_front[con_mask, mask]) else: min_val = previous_lb[r] max_val = previous_ub[r] new_lower_bounds[r] = min_val new_upper_bounds[r] = max_val return new_lower_bounds, new_upper_bounds
[docs] def calculate_distance( self, nav_point: np.ndarray, projection: np.ndarray, nadir: np.ndarray ) -> float: """Calculate the distance to the Pareto optimal front from a navigation point. The distance is calculated to the supplied projection which is assumed to lay on the front. Args: nav_point (np.ndarray): The navigation point. projection (np.ndarray): The point of the Pareto optimal front the distance is calculated to. nadir (np.ndarray): The nadir point of the Pareto optimal set. Returns: float: The distance. """ nom = np.linalg.norm(nav_point - nadir) denom = np.linalg.norm(projection - nadir) dist = (nom / denom) * 100 return dist
if __name__ == "__main__": # front = np.array([[1, 2, 3], [2, 3, 4], [2, 2, 3], [3, 2, 1]], dtype=float) # ideal = np.zeros(3) # nadir = np.ones(3) * 5
[docs] f1 = np.linspace(1, 100, 50)
f2 = f1[::-1] ** 2 front = np.stack((f1, f2)).T ideal = np.min(front, axis=0) nadir = np.max(front, axis=0) method = NautilusNavigator((front), ideal, nadir) req = method.start() print(req.content["reachable_lb"]) print(req.content["navigation_point"]) print(req.content["reachable_ub"]) response = { "reference_point": np.array([50, 6000]), "speed": 5, "go_to_previous": False, "stop": False, "user_bounds": [None, None], } req.response = response req = method.iterate(req) req.response = response req1 = req import time while req.content["steps_remaining"] > 1: time.sleep(1 / req.content["current_speed"]) req = method.iterate(req) req.response = response print(req.content["steps_remaining"]) print(req.content["reachable_lb"]) print(req.content["navigation_point"]) print(req.content["reachable_ub"]) print(req.content["user_bounds"]) req1.response["go_to_previous"] = True req = method.iterate(req1) req.response = response req.response["go_to_previous"] = False while req.content["steps_remaining"] > 1: time.sleep(1 / req.content["current_speed"]) req = method.iterate(req) req.response = response print(req.content["steps_remaining"]) print(req.content["reachable_lb"]) print(req.content["navigation_point"]) print(req.content["reachable_ub"]) print(req)