from typing import Dict, List, Optional, Tuple, Union
import numpy as np
from desdeo_mcdm.interactive.InteractiveMethod import InteractiveMethod
from desdeo_tools.interaction.request import BaseRequest
from sklearn.cluster import KMeans
from sklearn.metrics import pairwise_distances_argmin_min
[docs]class ENautilusException(Exception):
"""Raised when an exception related to ENautilus is encountered.
"""
pass
[docs]class ENautilusInitialRequest(BaseRequest):
""" A request class to handle the initial preferences.
"""
def __init__(self, ideal: np.ndarray, nadir: np.ndarray):
msg = (
"Please specify the number of iterations as 'n_iterations' to be carried out, and how many intermediate "
"points to show as 'n_points'."
)
content = {
"message": msg,
"ideal": ideal,
"nadir": nadir,
}
super().__init__("reference_point_preference", "required", content=content)
[docs] def validator(self, response: Dict) -> None:
if "n_iterations" not in response:
raise ENautilusException("'n_iterations' entry missing")
if "n_points" not in response:
raise ENautilusException("'n_points' entry missing")
n_iterations = response["n_iterations"]
if not isinstance(n_iterations, int) or int(n_iterations) < 1:
raise ENautilusException(
"'n_iterations' must be a positive integer greater than zero"
)
n_points = response["n_points"]
if not isinstance(n_points, int) or int(n_points) < 1:
raise ENautilusException(
"'n_points' must be a positive integer greater than zero"
)
@classmethod
[docs] def init_with_method(cls, method):
return cls(method._ideal, method._nadir)
@BaseRequest.response.setter
def response(self, response: Dict):
self.validator(response)
self._response = response
[docs]class ENautilusRequest(BaseRequest):
"""A request class to handle the intermediate requests.
"""
def __init__(
self,
ideal: np.ndarray,
nadir: np.ndarray,
points: np.ndarray,
lower_bounds: np.ndarray,
upper_bounds: np.ndarray,
n_iterations_left: int,
distances: np.ndarray,
):
self._max_index = len(np.squeeze(points))
msg = """Please select the most preferred solution by index as 'preferred_point_index'.
The number of remaining iterations may also be changed by
setting 'change_remaining' to True and
supplying the desried number of remaining iterations as
'new_iterations_left'.
If you wish to step back, then set 'step_back' to True. When stepping back to a
previous iteration, that iteration's intermediate solutions should be supplied alongside
the associated distances and upper and lower bounds as 'prev_distances',
'prev_solutions', 'prev_lower_bounds', and
'prev_upper_bounds'. The number of remaining iterations should be supplied as
well when stepping back as 'iterations_left'.
When 'step_back' is true, 'preferred_point_index' and 'change_remaining' are ignored.
"""
content = {
"message": msg,
"ideal": ideal,
"nadir": nadir,
"points": points,
"lower_bounds": lower_bounds,
"upper_bounds": upper_bounds,
"n_iterations_left": n_iterations_left,
"distances": distances,
}
super().__init__("reference_point_preference", "required", content=content)
[docs] def validator(self, response: Dict) -> None:
if (
"preferred_point_index" not in response
or "step_back" not in response
or "change_remaining" not in response
):
raise ENautilusException(
"'preferred_point_index', 'step_back', and 'change_remaining' must be specified."
)
if not response["step_back"]:
pref_point_index = response["preferred_point_index"]
if pref_point_index < 0 or pref_point_index > self._max_index:
raise ENautilusException("The given index is out of bounds.")
if response["change_remaining"]:
if "iterations_left" not in response:
raise ENautilusException(
"When 'change_remaining' is True, 'iterations_left' must be specified."
)
else:
# stepping back
# check that the previous solution is given alongside its bounds.
if "prev_solutions" not in response:
raise ENautilusException("'prev_solutions' entry missing.")
if "prev_lower_bounds" not in response:
raise ENautilusException("'prev_lower_bounds' entry missing.")
if "prev_upper_bounds" not in response:
raise ENautilusException("'prev_upper_bounds' entry missing.")
if "iterations_left" not in response:
raise ENautilusException(
"When stepping back 'iterations_left' must be specified."
)
if "prev_distances" not in response:
raise ENautilusException("'prev_distances' entry missing.")
# TODO: if issues arise, checking the dimensions of the prev_* entries can
# be beneficial.
@BaseRequest.response.setter
def response(self, response: Dict):
self.validator(response)
self._response = response
[docs]class ENautilusStopRequest(BaseRequest):
"""A request class to handle termination.
"""
def __init__(
self, preferred_point: np.ndarray, solution: Optional[np.ndarray] = None
):
msg = "Most preferred solution found."
content = {"message": msg, "objective": preferred_point, "solution": solution}
super().__init__("print", "no_interaction", content=content)
[docs]class ENautilus(InteractiveMethod):
def __init__(
self,
pareto_front: np.ndarray,
ideal: np.ndarray,
nadir: np.ndarray,
objective_names: Optional[List[str]] = None,
variables: Optional[np.ndarray] = None,
):
"""
Args:
pareto_front (np.ndarray): A two dimensional numpy array
representing a Pareto front with objective vectors on each of its
rows.
ideal (np.ndarray): The ideal objective vector of the problem
being represented by the Pareto front.
nadir (np.ndarray): The nadir objective vector of the problem
being represented by the Pareto front.
objective_names (Optional[List[str]], optional): Names of the
objectives. List must match the number of columns in
pareto_front. Defaults to 'f1', 'f2', 'f3', ...
variables (Optional[np.ndarray], optional): The decision variables
of the objective vectors in pareto_front. The i'th variable vector
in variables corresponds to the i'th objective vector in
pareto_front. Defaults to None.
Raises:
ENavigatorException: One or more dimension mismatches are
encountered among the supplies arguments.
"""
if not pareto_front.ndim == 2:
raise ENautilusException(
"The supplied Pareto front should be a two dimensional array. Found "
f" number of dimensions {pareto_front.ndim}."
)
if not ideal.shape[0] == pareto_front.shape[1]:
raise ENautilusException(
"The Pareto front must consist of objective vectors with the "
"same number of objectives as defined in the ideal and nadir "
"points."
)
if not ideal.shape == nadir.shape:
raise ENautilusException(
"The dimensions of the ideal and nadir point do not match."
)
if objective_names:
if not len(objective_names) == ideal.shape[0]:
raise ENautilusException(
"The supplied objective names must have a length equal to "
"the number of objectives."
)
self._objective_names = objective_names
else:
self._objective_names = [f"f{i+1}" for i in range(ideal.shape[0])]
self._ideal = ideal
self._nadir = nadir
self._variables = variables
# in objective space!
self._pareto_front = pareto_front
# bounds of the reachable region
self._reachable_ub = self._nadir
self._reachable_lb = self._ideal
# currently reachable solution as a list of indices of the Pareto front
self._reachable_idx = list(range(0, self._pareto_front.shape[0]))
self._preferred_point = None
self._projection_index = None
self._n_points = None
self._n_iterations_left = None
[docs] def start(self) -> ENautilusInitialRequest:
return ENautilusInitialRequest.init_with_method(self)
[docs] def iterate(
self, request: Union[ENautilusInitialRequest, ENautilusRequest]
) -> Union[ENautilusRequest, ENautilusStopRequest]:
"""Perform the next logical iteration step based on the given request type.
"""
if type(request).__name__ == ENautilusInitialRequest.__name__:
return self.handle_initial_request(request)
elif type(request).__name__ == ENautilusRequest.__name__:
return self.handle_request(request)
else:
# if stop request, do nothing
return request
[docs] def handle_initial_request(
self, request: ENautilusInitialRequest
) -> ENautilusRequest:
"""Handles the initial request by parsing the response appropriately.
"""
self._n_iterations_left = request.response["n_iterations"]
self._n_points = request.response["n_points"]
# self._intermediate_points = np.repeat(np.atleast_2d(self._nadir), self._n_points, axis=0)
self._preferred_point = self._nadir
zbars = self.calculate_representative_points(
self._pareto_front, self._reachable_idx, self._n_points
)
zs = self.calculate_intermediate_points(
self._preferred_point, zbars, self._n_iterations_left
)
new_lower_bounds, new_upper_bounds = self.calculate_bounds(
self._pareto_front, zs
)
distances = self.calculate_distances(zs, zbars, self._nadir)
return ENautilusRequest(
self._ideal,
self._nadir,
zs,
new_lower_bounds,
new_upper_bounds,
self._n_iterations_left,
distances,
)
[docs] def handle_request(
self, request: ENautilusRequest
) -> Union[ENautilusRequest, ENautilusStopRequest]:
"""Handles the intermediate requests.
"""
if not request.response["step_back"]:
preferred_point_index = request.response["preferred_point_index"]
self._preferred_point = request.content["points"][preferred_point_index]
if self._n_iterations_left <= 1:
self._n_iterations_left = 0
# if self._variables is defined: first, find the index of
# self._preferred point in self._pareto_front. Second,
# return the variable vector at the found position. Otherwise,
# do not return any variable vectors.
if self._variables is not None:
idx = np.linalg.norm(
np.abs(self._pareto_front - self._preferred_point), axis=1
).argmin()
solution = self._variables[idx]
else:
solution = None
return ENautilusStopRequest(self._preferred_point, solution)
self._reachable_lb = request.content["lower_bounds"][preferred_point_index]
self._reachable_ub = request.content["upper_bounds"][preferred_point_index]
self._reachable_idx = self.calculate_reachable_point_indices(
self._pareto_front, self._reachable_lb, self._reachable_ub
)
if not request.response["change_remaining"]:
# decrement iterations left
self._n_iterations_left -= 1
else:
self._n_iterations_left = request.response["iterations_left"]
# Start again
zbars = self.calculate_representative_points(
self._pareto_front, self._reachable_idx, self._n_points
)
zs = self.calculate_intermediate_points(
self._preferred_point, zbars, self._n_iterations_left
)
new_lower_bounds, new_upper_bounds = self.calculate_bounds(
self._pareto_front, zs
)
distances = self.calculate_distances(zs, zbars, self._nadir)
# stepping back
else:
zs = request.response["prev_solutions"]
new_lower_bounds = request.response["prev_lower_bounds"]
new_upper_bounds = request.response["prev_upper_bounds"]
self._n_iterations_left = request.response["iterations_left"]
distances = request.response["prev_distances"]
return ENautilusRequest(
self._ideal,
self._nadir,
zs,
new_lower_bounds,
new_upper_bounds,
self._n_iterations_left,
distances,
)
[docs] def calculate_representative_points(
self, pareto_front: np.ndarray, subset_indices: List[int], n_points: int
) -> np.ndarray:
"""Calculates the most representative points on the Pareto front. The points are clustered using k-means.
Args:
pareto_front (np.ndarray): The Pareto front.
subset_indices (List[int]): A list of indices representing the
subset of the points on the Pareto front for which the
representative points should be calculated.
n_points (int): The number of representative points to be calculated.
Returns:
np.ndarray: A 2D array of the most representative points. If the
subset of Pareto efficient points is less than n_points, returns
the subset of the Pareto front.
"""
if len(np.atleast_1d(subset_indices)) > n_points:
kmeans = KMeans(n_clusters=n_points)
kmeans.fit(pareto_front[subset_indices])
closest, _ = pairwise_distances_argmin_min(
kmeans.cluster_centers_, pareto_front[subset_indices]
)
zbars = pareto_front[subset_indices][closest]
else:
zbars = pareto_front[subset_indices]
return zbars
[docs] def calculate_bounds(
self, pareto_front: np.ndarray, intermediate_points: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
"""Calculate the new bounds of the reachable points on the Pareto
optimal front from each of the intermediate points.
Args:
pareto_front (np.ndarray): The Pareto optimal front.
intermediate_points (np.ndarray): The current intermediate points as a 2D array.
Returns:
Tuple[np.ndarray, np.ndarray]: The lower and upper bounds for each of the intermediate points.
"""
_pareto_front = np.atleast_2d(pareto_front)
n_points = np.atleast_2d(intermediate_points).shape[0]
new_lower_bounds = np.zeros((n_points, _pareto_front.shape[1]))
new_upper_bounds = np.zeros((n_points, _pareto_front.shape[1]))
for i, point in enumerate(np.atleast_2d(intermediate_points)):
# TODO: vectorize this loop
for r in range(_pareto_front.shape[1]):
mask = np.zeros(_pareto_front.shape[1], dtype=bool)
mask[r] = True
subject_to = _pareto_front[:, ~mask].reshape(
(_pareto_front.shape[0], _pareto_front.shape[1] - 1)
)
con_mask = np.all(subject_to <= point[~mask], axis=1)
min_val = np.min(_pareto_front[con_mask, mask])
max_val = np.max(_pareto_front[con_mask, mask])
new_lower_bounds[i, r] = min_val
new_upper_bounds[i, r] = max_val
return new_lower_bounds, new_upper_bounds
[docs] def calculate_distances(
self, intermediate_points: np.ndarray, zbars: np.ndarray, nadir: np.ndarray
) -> np.ndarray:
distances = np.linalg.norm(
np.atleast_2d(intermediate_points) - nadir, axis=1
) / np.linalg.norm(np.atleast_2d(zbars) - nadir, axis=1)
"""Calculates the distance to the Pareto front for each intermediate
point given utilizing representative points representing the
intermediate points.
Args:
intermediate_points (np.ndarray): The intermediate points, 2D array.
zbars (np.ndarray): The representative points corresponding to the intermediate points, 2D array.
nadir (np.ndarray): The nadir point, 1D array.
Returns:
np.ndarray: The distances calculated for each intermediate point to the Pareto front.
"""
return distances * 100
[docs] def calculate_reachable_point_indices(
self,
pareto_front: np.ndarray,
lower_bounds: np.ndarray,
upper_bounds: np.ndarray,
) -> List[int]:
"""Calculate the indices of the reachable Pareto optimal solutions
based on lower and upper bounds.
Returns:
List[int]: List of the indices of the reachable solutions.
"""
low_idx = np.all(pareto_front >= lower_bounds, axis=1)
up_idx = np.all(pareto_front <= upper_bounds, axis=1)
reachable_idx = np.argwhere(low_idx & up_idx).squeeze()
return reachable_idx