Source code for metacluster.metacluster
#!/usr/bin/env python
# Created by "Thieu" at 05:36, 28/07/2023 ----------%
# Email: nguyenthieu2102@gmail.com %
# Github: https://github.com/thieu1995 %
# --------------------------------------------------%
import time
from pathlib import Path
import pandas as pd
import numpy as np
from metacluster.utils import mealpy_util as mu, cluster, validator
from metacluster.utils.io_util import write_dict_to_csv
from metacluster.utils.visualize_util import export_boxplot_figures, export_convergence_figures
[docs]class MetaCluster:
"""
Defines a MetaCluster class that hold all Metaheuristic-based K-Center Clustering methods
Parameters
----------
list_optimizer: list, tuple, default = None
List of strings that represent class optimizer or list of instance of Optimizer class from Mealpy library.
Current supported optimizers, please check it here: https://github.com/thieu1995/mealpy
If a custom optimizer is passed, make sure it is an instance of `Optimizer` class.
Please use this to get supported optimizers: MetaCluster.get_support(name="optimizer")
list_paras: list, tuple, default=None
List of dictionaries that present the parameters of each Optimizer class.
You can set it to None to use all of default parameters in Mealpy library.
list_obj: list, tuple, default=None
List of strings that represent objective name.
Current supported objectives, please check it here: https://github.com/thieu1995/permetrics
Please use this to get supported objectives: MetaCluster.get_support(name="obj")
n_trials: int, default=5
The number of runs for each optimizer for each objective
seed: int, default=20
Determines random number generation for the whole program. Use an int to make the randomness deterministic.
Examples
--------
The following example shows how to use the most informative features in the MhaSelector FS method
>>> from metacluster import get_dataset, MetaCluster
>>> from sklearn.preprocessing import MinMaxScaler
>>>
>>> scaler = MinMaxScaler(feature_range=(0, 1))
>>> data = get_dataset("aniso")
>>> data.X = scaler.fit_transform(data.X)
>>>
>>> # Get all supported methods and print them out
>>> MetaCluster.get_support(name="all")
>>>
>>> list_optimizer = ["BaseFBIO", "OriginalGWO", "OriginalSMA"]
>>> list_paras = [
>>> {"name": "FBIO", "epoch": 10, "pop_size": 30},
>>> {"name": "GWO", "epoch": 10, "pop_size": 30},
>>> {"name": "SMA", "epoch": 10, "pop_size": 30}
>>> ]
>>> list_obj = ["BHI", "MIS", "XBI"]
>>> list_metric = ["BRI", "DBI", "DRI", "DI", "KDI"]
>>> model = MetaCluster(list_optimizer=list_optimizer, list_paras=list_paras, list_obj=list_obj, n_trials=3)
>>> model.execute(data=data, cluster_finder="elbow", list_metric=list_metric, save_path="history", verbose=False)
>>> model.save_boxplots()
>>> model.save_convergences()
"""
SUPPORT = {
"cluster_finder": {"elbow": "get_clusters_by_elbow", "gap": "get_clusters_by_gap_statistic",
"silhouette": "get_clusters_by_silhouette_score", "davies_bouldin": "get_clusters_by_davies_bouldin",
"calinski_harabasz": "get_clusters_by_calinski_harabasz", "bayesian_ìnormation": "get_clusters_by_bic",
"all_min": "get_clusters_all_min", "all_max": "get_clusters_all_max",
"all_mean": "get_clusters_all_mean", "all_majority": "get_clusters_all_majority"},
"obj": cluster.get_all_clustering_metrics(),
"metrics": cluster.get_all_clustering_metrics(),
"optimizer": list(mu.get_all_optimizers(verbose=False).keys())
}
FILENAME_LABELS = "result_labels"
FILENAME_METRICS = "result_metrics"
FILENAME_METRICS_MEAN = "result_metrics_mean"
FILENAME_METRICS_STD = "result_metrics_std"
FILENAME_CONVERGENCES = "result_convergences"
HYPHEN_SYMBOL = "="
def __init__(self, list_optimizer=None, list_paras=None, list_obj=None, n_trials=5, seed=20):
self.list_optimizer = list_optimizer
self.list_paras = list_paras
self.list_obj = list_obj
self.n_trials = n_trials
self.seed = seed
[docs] @staticmethod
def get_support(name="all", verbose=True):
if name == "all":
if verbose:
for key, value in MetaCluster.SUPPORT.items():
print(f"Supported methods for '{key}' are: ")
print(value)
return MetaCluster.SUPPORT
if name in list(MetaCluster.SUPPORT.keys()):
if verbose:
print(f"Supported methods for '{name}' are: ")
print(MetaCluster.SUPPORT[name])
return MetaCluster.SUPPORT[name]
raise ValueError(f"MetaCluster doesn't support {name}.")
def _set_list_function(self, list_obj=None, name="objectives"):
if type(list_obj) in (list, tuple, np.ndarray):
list_obj1 = []
list_obj0 = []
for obj in list_obj:
if obj in list(self.SUPPORT["obj"].keys()):
list_obj1.append(obj)
else:
list_obj0.append(obj)
if len(list_obj0) > 0:
print(f"MetaCluster doesn't support {name}: {list_obj0}")
return list_obj1
def _set_list_optimizer(self, list_optimizer=None, list_paras=None):
if type(list_optimizer) not in (list, tuple):
raise ValueError("list_optimizers should be a list or tuple.")
else:
if list_paras is None or type(list_paras) not in (list, tuple):
list_paras = [{}, ] * len(list_optimizer)
elif len(list_paras) != len(list_optimizer):
raise ValueError("list_paras should be a list with the same length as list_optimizer")
list_opts = []
for idx, opt in enumerate(list_optimizer):
if type(opt) is str:
opt_class = mu.get_optimizer_by_class(opt)
if type(list_paras[idx]) is dict:
list_opts.append(opt_class(**list_paras[idx]))
else:
list_opts.append(opt_class(epoch=250, pop_size=20))
elif isinstance(opt, mu.Optimizer):
if type(list_paras[idx]) is dict:
if "name" in list_paras[idx]: # Check if key exists and remove it
opt.name = list_paras[idx].pop("name")
opt.set_parameters(list_paras[idx])
list_opts.append(opt)
else:
raise TypeError(f"optimizer needs to set as a string and supported by Mealpy library.")
self.list_optimizer = list_opts
self.list_paras = list_paras
def __run__(self, optimizer, problem, mode="single", n_workers=2, termination=None):
optimizer.solve(problem, mode=mode, n_workers=n_workers, termination=termination, seed=self.seed)
return {
"best_fitness": optimizer.g_best.target.fitness,
"best_solution": optimizer.problem.decode_solution(optimizer.g_best.solution)["center_weights"],
"convergence": optimizer.history.list_global_best_fit
}
[docs] def execute(self, data=None, cluster_finder="elbow", list_metric=None, save_path="history",
verbose=True, mode='single', n_workers=None, termination=None):
"""
Parameters
----------
data : instance of Data class, default=None
The instance of Data class, make sure you have at least matrix feature X. The target labels y (Optional).
Also make sure your matrix X is normalized or standardized
cluster_finder : str, default="elbow".
The method to find the optimal number of clusters in data. The supported methods are:
["elbow", "gap", "silhouette", "davies_bouldin", "calinski_harabasz", "bayesian_ìnormation", "all_min", "all_max", "all_mean", "all_majority"].
The method has prefixes `all` means that it will try all other methods and get the statistical number of clusters.
For example, `all_min`, takes the minimum K found from all tried methods. `all_mean`, takes the average K found from all tried methods.
This parameter is only used when `data.y` is None. If you pass labels `y` to `data`. This method will be turned off.
The number of clusters will be determined by number of unique labels in `y`.
list_metric : list, default=None
List of performance metrics that supported by the library: https://github.com/thieu1995/permetrics
To get the supported metrics, please use: MetaCluster.get_support(), supported obj are supported metrics
save_path : str, default="history"
The path to the folder that hold results
verbose : int, default = True
Controls verbosity of output for each training process of each optimizer.
mode : str, default = 'single'
The mode used in Optimizer belongs to Mealpy library. Parallel: 'process', 'thread'; Sequential: 'swarm', 'single'.
- 'process': The parallel mode with multiple cores run the tasks
- 'thread': The parallel mode with multiple threads run the tasks
- 'swarm': The sequential mode that no effect on updating phase of other agents
- 'single': The sequential mode that effect on updating phase of other agents, default
n_workers : int or None, default = None
The number of workers (cores or threads) used in Optimizer (effect only on parallel mode)
termination : dict or None, default = None
The termination dictionary or an instance of Termination class. It is for Optimizer belongs to Mealpy library.
"""
## Set up optimizer and objectives
self._set_list_optimizer(self.list_optimizer, self.list_paras)
self.list_obj = self._set_list_function(self.list_obj, name="objectives")
self.list_metric = self._set_list_function(list_metric, name="metrics")
if data.y is not None:
n_clusters = len(np.unique(data.y))
else:
self.cluster_finder = validator.check_str("cluster_finder", cluster_finder, list(self.SUPPORT["cluster_finder"].keys()))
n_clusters = getattr(cluster, self.SUPPORT["cluster_finder"][self.cluster_finder])(data.X)
log_to = "console" if verbose else "None"
lb = np.min(data.X, axis=0).tolist() * n_clusters
ub = np.max(data.X, axis=0).tolist() * n_clusters
bound = mu.FloatVar(lb=lb, ub=ub, name="center_weights")
## Check parent directories
self.save_path = f"{save_path}/{data.get_name()}"
Path(self.save_path).mkdir(parents=True, exist_ok=True)
for idx_opt, opt in enumerate(self.list_optimizer):
for idx_obj, obj in enumerate(self.list_obj):
list_dict = []
for idx_trial, trial in enumerate(range(self.n_trials)):
print(f"MetaCluster are working on: optimizer={opt.get_name()}, obj={obj}, trial={trial+1}")
minmax = self.SUPPORT["obj"][obj]
problem = mu.KCentersClusteringProblem(bounds=bound, minmax=minmax, data=data, obj_name=obj, log_to=log_to)
time_run = time.perf_counter()
result = self.__run__(opt, problem, mode=mode, n_workers=n_workers, termination=termination)
time_run = round(time_run, 5)
y_pred = problem.get_y_pred(data.X, result["best_solution"])
y_pred = self.HYPHEN_SYMBOL.join(map(str, y_pred)) # Convert all labels to single string to save to csv file.
conv = self.HYPHEN_SYMBOL.join(map(str, result["convergence"]))
dict_metrics = problem.get_metrics(result["best_solution"], self.list_metric)
## Save result_labels.csv file
dict1 = {"optimizer": opt.get_name(), "obj": obj, "n_clusters": n_clusters, "y_pred": y_pred}
write_dict_to_csv(dict1, save_path=self.save_path, file_name=self.FILENAME_LABELS)
## Save result_metrics.csv file
dict2 = {"optimizer": opt.get_name(), "obj": obj, "trial": trial+1, "n_clusters": n_clusters, "time_run": time_run}
dict3 = {**dict2, **dict_metrics}
write_dict_to_csv(dict3, save_path=self.save_path, file_name=self.FILENAME_METRICS)
## Save results for metrics-min and metrics-std
dict4 = {"time_run": time_run, **dict_metrics}
list_dict.append(dict4)
## Save result_convergence.csv
dict5 = {"optimizer": opt.get_name(), "obj": obj, "trial": trial+1, "n_clusters": n_clusters, "fitness": conv}
write_dict_to_csv(dict5, save_path=self.save_path, file_name=self.FILENAME_CONVERGENCES)
## Save result_metrics_std.csv and result_metrics_std.csv file
df0 = pd.DataFrame(list_dict)
dict_mean = df0.mean().to_dict()
dict_std = df0.std().to_dict()
dict_mean = {"optimizer": opt.get_name(), "obj": obj, "n_clusters": n_clusters, **dict_mean}
dict_std = {"optimizer": opt.get_name(), "obj": obj, "n_clusters": n_clusters, **dict_std}
write_dict_to_csv(dict_mean, save_path=self.save_path, file_name=self.FILENAME_METRICS_MEAN)
write_dict_to_csv(dict_std, save_path=self.save_path, file_name=self.FILENAME_METRICS_STD)
@staticmethod
def _get_figure_size(n_models):
if n_models <= 3:
figure_size = (450, 550)
elif n_models <= 5:
figure_size = (600, 550)
elif n_models <= 7:
figure_size = (750, 550)
elif n_models <= 9:
figure_size = (900, 550)
else:
figure_size = (1050, 550)
return figure_size
[docs] def save_boxplots(self, figure_size=None, xlabel="Optimizer", list_ylabel=None, title="Boxplot of comparison models",
show_legend=True, show_mean_only=False, exts=(".png", ".pdf"), file_name="boxplot"):
"""
All boxplots figures will be saved in the same folder of: {save_path}/{dataset_name}/
Parameters
----------
figure_size : list, tuple, np.ndarray, None, default=None
The size for saved figures. `None` means it will automatically set for you.
Or you can pass (width, height) of figure based on pixel (100px to 1500px)
xlabel : str, default="Optimizer"
The label for x coordinate of boxplot figures.
list_ylabel : list, tuple, np.ndarray, None, default=None
The label for y coordinate of boxplot figures. Each boxplot corresponding to each metric in list_metric parameter,
therefor, if you wish to change to y label, you need to pass a list of string represent all metrics in order of list_metric.
None means it will use the name of metrics as the label
title : str, default="Boxplot of comparison models"
The title of figures, it should be the same for all objectives since we have y coordinate already difference.
show_legend : bool, default=True
Show the legend or not. For boxplots we can turn on or off this option, but not for convergence chart.
show_mean_only : bool, default=False
You can show the mean value only or you can show all mean, std, median of the box by this parameter
exts : list, tuple, np.ndarray, default=(".png", ".pdf")
List of extensions of the figures. It is for multiple purposes such as latex (need ".pdf" format), word (need ".png" format).
file_name : str, default="boxplot"
The prefix for filenames that will be saved.
"""
if type(figure_size) in (list, tuple, np.ndarray):
if not(len(figure_size) == 2):
raise ValueError("figure size should have length of 2 indicate width and height of the figure.")
else:
figure_size = self._get_figure_size(len(self.list_optimizer))
if type(list_ylabel) in (list, tuple, np.ndarray):
if not(len(list_ylabel) == len(self.list_obj)):
raise ValueError("list_ylabel should have the same length as list_metric.")
else:
list_ylabel = self.list_metric.copy()
for idx_metric, metric in enumerate(self.list_metric):
df = pd.read_csv(f"{self.save_path}/{self.FILENAME_METRICS}.csv", usecols=["optimizer", "obj", metric])
for idx_obj, obj in enumerate(self.list_obj):
df_draw = df[df["obj"] == obj][["optimizer", metric]]
export_boxplot_figures(df_draw, figure_size=figure_size, xlabel=xlabel, ylabel=f"{list_ylabel[idx_metric]} value",
title=title, show_legend=show_legend, show_mean_only=show_mean_only, exts=exts,
file_name=f"{file_name}-{obj}-{metric}", save_path=self.save_path)
[docs] def save_convergences(self, figure_size=None, xlabel="Epoch", list_ylabel=None,
title="Convergence chart of comparison models", exts=(".png", ".pdf"), file_name="convergence"):
"""
All convergence figures will be saved in the same folder of: {save_path}/{dataset_name}/
Parameters
----------
figure_size : list, tuple, np.ndarray, None, default=None
The size for saved figures. `None` means it will automatically set for you.
Or you can pass (width, height) of figure based on pixel (100px to 1500px)
xlabel : str, default="Optimizer"
The label for x coordinate of convergence figures.
list_ylabel : list, tuple, np.ndarray, None, default=None
The label for y coordinate of convergence figures. Each convergence corresponding to each objective in list_obj,
therefor, if you wish to change to y label, you need to pass a list of string represent all objectives in order of list_obj.
None means it will use the name of objectives as the label
title : str, default="Convergence chart of comparison models"
The title of figures, it should be the same for all objectives since we have y coordinate already difference.
exts : list, tuple, np.ndarray, default=(".png", ".pdf")
List of extensions of the figures. It is for multiple purposes such as latex (need ".pdf" format), word (need ".png" format).
file_name : str, default="convergence"
The prefix for filenames that will be saved.
"""
if type(figure_size) in (list, tuple, np.ndarray):
if not(len(figure_size) == 2):
raise ValueError("figure size should have length of 2 indicate width and height of the figure.")
else:
figure_size = (700, 500)
if len(self.list_optimizer) >= 7:
figure_size = (850, 550)
if type(list_ylabel) in (list, tuple, np.ndarray):
if not(len(list_ylabel) == len(self.list_obj)):
raise ValueError("list_ylabel should have the same length as list_obj.")
else:
list_ylabel = self.list_obj.copy()
df = pd.read_csv(f"{self.save_path}/{self.FILENAME_CONVERGENCES}.csv", usecols=["optimizer", "obj", "trial", "fitness"])
for idx_obj, obj in enumerate(self.list_obj):
## Draw convergence for single trial
for idx_trial, trial in enumerate(range(self.n_trials)):
df_draw = df[(df["obj"] == obj) & (df["trial"] == trial+1)][["optimizer", "fitness"]]
df_draw.set_index("optimizer", inplace=True)
dict_draw = df_draw.to_dict()["fitness"]
for key, value in dict_draw.items():
dict_draw[key] = np.array(value.split(self.HYPHEN_SYMBOL), dtype=float)
df_draw = pd.DataFrame(dict_draw)
export_convergence_figures(df_draw, figure_size=figure_size, xlabel=xlabel, ylabel=f"{list_ylabel[idx_obj]} fitness value",
title=title, exts=exts, file_name=f"{file_name}-{obj}-{trial+1}", save_path=self.save_path)
## Draw mean convergence of all trials
df_draw = df[df["obj"] == obj][["optimizer", 'fitness']]
mylist = df_draw.values.tolist()
dict_mean = {}
for idx, item in enumerate(mylist):
if item[0] in dict_mean:
dict_mean[item[0]].append(np.array(item[1].split(self.HYPHEN_SYMBOL), dtype=float))
else:
dict_mean[item[0]] = [np.array(item[1].split(self.HYPHEN_SYMBOL), dtype=float)]
for key, value in dict_mean.items():
dict_mean[key] = np.mean(value, axis=0)
df_draw = pd.DataFrame(dict_mean)
export_convergence_figures(df_draw, figure_size=figure_size, xlabel=xlabel, ylabel=f"Average {obj} value",
title=title, exts=exts, file_name=f"{file_name}-{obj}-mean", save_path=self.save_path)