Skip to content

Documentation for ResultsHandler

Provides all functions that operate on calculated results. As IO for the results object the ResultsHandler is able to handle results on its own.

Source code in photonai/processing/results_handler.py
class ResultsHandler:
    """
    Provides all functions that operate on calculated results.
    As IO for the results object the ResultsHandler
    is able to handle results on its own.

    """
    def __init__(self, results_object: MDBHyperpipe = None, output_settings=None):
        """
        Initialize the object.

        Parameters:
            results_object:
                All results are stored here.
                An initial setting is not necessary,
                because a later loading via file or MongoDB is possible.

            output_settings (OutputSettings):
                Setting for creation and storage of the results_object.

        """
        self.results = results_object
        self.output_settings = output_settings

    def load_from_file(self, results_file: str):
        """
        Read results_file from json into MDBHyperpipe object self.results.

        Parameters:
            results_file:
                Full path to json file.

        """
        self.results = MDBHyperpipe.from_document(json.load(open(results_file, 'r')))

    def load_from_mongodb(self, mongodb_connect_url: str, pipe_name: str):
        """
        Read results_file from MongoDB into MDBHyperpipe object self.results.

        Parameters:
            mongodb_connect_url:
                MongoDB connection string.

            pipe_name:
                Name of the stored hyperpipe.

        """
        connect(mongodb_connect_url, alias="photon_core")
        results = list(MDBHyperpipe.objects.raw({'name': pipe_name}))
        if len(results) == 1:
            self.results = results[0]
        elif len(results) > 1:
            self.results = MDBHyperpipe.objects.order_by([("computation_start_time", DESCENDING)]).raw({'name': pipe_name}).first()
            warn_text = 'Found multiple hyperpipes with that name. Returning most recent one.'
            logger.warning(warn_text)
            warnings.warn(warn_text)
        else:
            raise FileNotFoundError('Could not load hyperpipe from MongoDB.')

    @staticmethod
    def get_methods() -> list:
        """
        This function returns a list of all methods available for ResultsHandler.

        Returns:
            List of all available methods.

        """
        methods_list = [s for s in dir(ResultsHandler) if '__' not in s]
        return methods_list

    def get_performance_table(self):
        """This function returns a summary table of the overall results.

        ToDo: add best_config information!
        """
        res_tab = pd.DataFrame()
        for i, folds in enumerate(self.results.outer_folds):
            # add best config infos
            res_tab.loc[i, 'best_config'] = str(folds.best_config.human_readable_config)

            # add fold index
            res_tab.loc[i, 'fold'] = folds.fold_nr

            # add sample size infos
            res_tab.loc[i, 'n_train'] = folds.best_config.best_config_score.number_samples_training
            res_tab.loc[i, 'n_validation'] = folds.best_config.best_config_score.number_samples_validation

            # add performance metrics
            d = folds.best_config.best_config_score.validation.metrics
            for key, value in d.items():
                res_tab.loc[i, key] = value

        # add row with overall info
        res_tab.loc[i + 1, 'n_validation'] = np.sum(res_tab['n_validation'])
        for key, value in d.items():
            m = res_tab.loc[:, key]
            res_tab.loc[i+1, key] = np.mean(m)
            res_tab.loc[i + 1, key + '_sem'] = sem(m)   # standard error of the mean
        res_tab.loc[i + 1, 'best_config'] = 'Overall'
        return res_tab

    def get_performance_outer_folds(self):
        performances = dict()
        for metric in self.results.outer_folds[0].best_config.best_config_score.validation.metrics.keys():
            performances[metric] = list()
        for i, fold in enumerate(self.results.outer_folds):
            for metric, value in fold.best_config.best_config_score.validation.metrics.items():
                performances[metric].append(value)
        return performances

    def get_config_evaluations(self) -> dict:
        """
        Return the test performance of every tested configuration in every outer fold.

        Returns:
            Test performance of every configuration.

        """
        config_performances = list()
        maximum_fold = None
        for outer_fold in self.results.outer_folds:
            if maximum_fold is None or len(outer_fold.tested_config_list) > maximum_fold:
                maximum_fold = len(outer_fold.tested_config_list)

        for outer_fold in self.results.outer_folds:
            performance = dict()
            for metric in self.results.hyperpipe_info.metrics:
                performance[metric] = list()

            for i in range(maximum_fold):
                # for config in outer_fold.tested_config_list:
                for metric in self.results.hyperpipe_info.metrics:
                    if i >= len(outer_fold.tested_config_list):
                        performance[metric].append(np.nan)
                        continue
                    config = outer_fold.tested_config_list[i]
                    if config.config_failed:
                        performance[metric].append(np.nan)
                    else:
                        for item in config.metrics_test:
                            if (item.operation == 'mean') and (item.metric_name == metric):
                                performance[metric].append(item.value)
            config_performances.append(performance)

        config_performances_dict = dict()
        for metric in self.results.hyperpipe_info.metrics:
            config_performances_dict[metric] = list()
            for fold in config_performances:
                config_performances_dict[metric].append(fold[metric])

        return config_performances_dict

    def get_minimum_config_evaluations(self):
        config_evaluations = self.get_config_evaluations()
        minimum_config_evaluations = dict()

        for metric, evaluations in config_evaluations.items():
            minimum_config_evaluations[metric] = list()
            greater_is_better = Scorer.greater_is_better_distinction(metric)

            for fold in evaluations:
                fold_evaluations = list()

                if greater_is_better:
                    for i, config in enumerate(fold):
                        if i == 0:
                            last_config = config
                        else:
                            if config > last_config:
                                last_config = config
                        fold_evaluations.append(last_config)
                else:
                    last_config = np.inf
                    for i, config in enumerate(fold):
                        if i == 0:
                            last_config = config
                        else:
                            if config < last_config:
                                last_config = config
                        fold_evaluations.append(last_config)
                minimum_config_evaluations[metric].append(fold_evaluations)

        return minimum_config_evaluations

    def get_learning_curves(self, config_nr, outer_fold_nr, save):
        """This function gets the learning curves out of the result tree.
        It returns the learning curves as a pandas dataframe.
        If save = True it saves the learning curves as a csv file.

        """
        cuts = self.results.hyperpipe_info.learning_curves_cut.values[1:] + [1.]
        fold_num = len(self.results.outer_folds[0].tested_config_list[config_nr - 1].inner_folds)
        idx = pd.MultiIndex.from_product([cuts, [i + 1 for i in range(fold_num)]], names=['Cut', 'Inner Fold Nr.'])
        col = pd.MultiIndex.from_product([self.results.hyperpipe_info.metrics, ['test', 'train']])
        data = {}
        for metric in self.results.hyperpipe_info.metrics:
            config = self.results.outer_folds[outer_fold_nr - 1].tested_config_list[config_nr - 1]
            for t in [1, 2]:
                curves = []
                for cut_nr, cut in enumerate(cuts):
                    curves += [config.inner_folds[fold].learning_curves[cut_nr][t][metric] for fold in range(fold_num)]
                data.update({(metric, ['test', 'train'][t-1]): curves})
        curves = pd.DataFrame(data, index=idx, columns=col)
        if save:
            curves.to_csv(self._save_prep_learning_curves('lc_outer_fold_%d_config_%d.csv' % (outer_fold_nr, config_nr)))
        return curves

    def plot_curves(self, curves: pd.DataFrame, title: str = 'Learning Curves'):
        """This function plots the learning curves.

        Parameters:
            curves:
                Dataframe with multi-index: (run - fraction of data)
                columns: at least (metric, train/test) floats

            title:
                Subtitle of plot.

        """
        metrics = self.results.hyperpipe_info.metrics
        fig, axes = plt.subplots(1, len(metrics), figsize=(len(metrics) * 4., 4.))
        if len(metrics) == 1:
            axes = [axes]
        cuts = curves.index.get_level_values(0)
        col_template = tuple(curves.columns[0])
        # iterate only over first 2 entries [(metric, train/test)], example 'mean' as third

        for metric, ax in zip(metrics, axes):
            for subset in ['test', 'train']:
                sns.lineplot(x=cuts, y=curves[(metric, subset)+col_template[2:]], label=metric + '_' + subset, ax=ax)
            ax.set(xlabel='Fraction of Train Data used', ylabel='Metric Value')
            ax.legend(fontsize='small')
        plt.suptitle(title)
        plt.tight_layout(rect=[0, 0.03, 1, 0.95])

        return fig

    def plot_learning_curves_config(self, config_nr, outer_fold_nr, save, show=False):
        """This function gets the learning curves for a specific config nr. and outer fold nr. and plots them
        If config_nr = -1 it gets the best config of the outer fold
        If save = True the plot is saved
        If show = True the plot is shown
        """
        if config_nr == -1:
            config_nr = self.results.best_config.config_nr
        curves = self.get_learning_curves(config_nr, outer_fold_nr, save)
        curves.columns = curves.columns.to_flat_index()
        fig = self.plot_curves(curves, 'Learning Curves (Outer Fold Nr.%d Config Nr.%d)' % (outer_fold_nr, config_nr))
        if save:
            plt.savefig(self._save_prep_learning_curves('lc_outer_fold_%d_config_%d.png' % (outer_fold_nr, config_nr)))
        if show:
            plt.show()
        plt.close()

    def plot_learning_curves_outer_fold(self, outer_fold_nr, config_nr_list=None, save=True, show=False):
        """This function gets the learning curves for a list of configs in a specific outer fold and plots them
        For each config the mean of the learning curves of all inner folds is used
        If config_nr = -1 it gets the best config of the outer fold
        If save = True the plot is saved
        If show = True the plot is shown
        """
        if config_nr_list is None:
            config_nr_list = np.arange(1, len(self.results.outer_folds[outer_fold_nr - 1].tested_config_list) + 1)
        elif -1 in config_nr_list:
            config_nr_list = [nr for nr in config_nr_list if nr is not self.results.best_config.config_nr]
            config_nr_list[config_nr_list == -1] = self.results.best_config.config_nr
        curves_list = []
        for config_nr in config_nr_list:
            curves = self.get_learning_curves(config_nr, outer_fold_nr, save)
            curves_list.append(curves.groupby(level=0).agg(['mean']))
        curves_configs = pd.concat(curves_list, axis=0, names=["Config Nr."], keys=config_nr_list)
        curves_configs.columns = curves_configs.columns.to_flat_index()
        curves_configs = curves_configs.swaplevel()
        fig = self.plot_curves(curves_configs, 'Learning Curves (Outer Fold Nr.%d)' % outer_fold_nr)
        if save:
            curves_configs.to_csv(self._save_prep_learning_curves('lc_outer_fold_{}.csv'.format(outer_fold_nr)))
            plt.savefig(self._save_prep_learning_curves('lc_outer_fold_{}.png'.format(outer_fold_nr)))
        if show:
            plt.show()
        plt.close()

    def _save_prep_learning_curves(self, file_name):
        path = self.results.output_folder + '/learning_curves/'
        if not os.path.exists(path):
            os.makedirs(path)
        return os.path.join(path, file_name)

    def save_all_learning_curves(self):
        for outer_fold_nr in range(1, len(self.results.outer_folds) + 1):
            for config_nr in range(1, len(self.results.outer_folds[0].tested_config_list) + 1):
                self.plot_learning_curves_config(config_nr, outer_fold_nr, save=True)

    def plot_optimizer_history(self, metric,
                               title: str = 'Optimizer History',
                               type: str = 'plot',
                               reduce_scatter_by: Union[int, str] = 'auto',
                               file: str = None):
        """
        :param metric: specify metric that has been stored within the PHOTONAI results tree
        :param type: 'plot' or 'scatter'
        :param reduce_scatter_by: integer or string ('auto'), reduce the number of points plotted by scatter
        :param file: specify a filename if you want to save the plot
        :return:
        """

        if metric not in self.results.hyperpipe_info.metrics:
            raise ValueError('Metric "{}" not stored in results tree'.format(metric))

        config_evaluations = self.get_config_evaluations()
        minimum_config_evaluations = self.get_minimum_config_evaluations()

        # handle different lengths
        min_corresponding = len(min(config_evaluations[metric], key=len))
        config_evaluations_corres = [configs[:min_corresponding] for configs in config_evaluations[metric]]
        minimum_config_evaluations_corres = [configs[:min_corresponding]
                                             for configs in minimum_config_evaluations[metric]]

        mean = np.nanmean(np.asarray(config_evaluations_corres), axis=0)
        mean_min = np.nanmean(np.asarray(minimum_config_evaluations_corres), axis=0)

        greater_is_better = Scorer.greater_is_better_distinction(metric)
        if greater_is_better:
            caption = 'Maximum'
        else:
            caption = 'Minimum'

        plt.figure()
        if type == 'plot':
            plt.plot(np.arange(0, len(mean)), mean, '-', color='gray', label='Mean Performance')

        elif type == 'scatter':
            # now do smoothing
            if isinstance(reduce_scatter_by, str):
                if reduce_scatter_by != 'auto':
                    msg = '{} is not a valid smoothing_kernel specifier. ' \
                          'Falling back to "auto".'.format(reduce_scatter_by)
                    logger.warning(msg)
                    warnings.warn(msg)

                # if auto, then calculate size of reduce_scatter_by so that 75 points on x remain
                # smallest reduce_scatter_by should be 1
                reduce_scatter_by = max([np.floor(min_corresponding / 75).astype(int), 1])

            if reduce_scatter_by > 1:
                plt.plot([], [], ' ', label="scatter reduced by factor {}".format(reduce_scatter_by))

            for i, fold in enumerate(config_evaluations[metric]):
                # add a few None so that list can be divided by smoothing_kernel
                remaining = len(fold) % reduce_scatter_by
                if remaining:
                    fold.extend([np.nan] * (reduce_scatter_by - remaining))
                # calculate mean over every n named_steps so that plot is less cluttered
                reduced_fold = np.nanmean(np.asarray(fold).reshape(-1, reduce_scatter_by), axis=1)
                reduced_xfit = np.arange(reduce_scatter_by / 2, len(fold), step=reduce_scatter_by)
                if i == len(config_evaluations[metric])-1:
                    plt.scatter(reduced_xfit, np.asarray(reduced_fold), color='gray', alpha=0.5, label='Performance', marker='.')
                else:
                    plt.scatter(reduced_xfit, np.asarray(reduced_fold), color='gray', alpha=0.5, marker='.')
        else:
            raise ValueError('Please specify either "plot" or "scatter".')

        plt.plot(np.arange(0, len(mean_min)), mean_min, '-', color='black', label='Mean {} Performance'.format(caption))

        for i, fold in enumerate(minimum_config_evaluations[metric]):
            xfit = np.arange(0, len(fold))
            plt.plot(xfit, fold, '-', color='black', alpha=0.5)

        plt.ylabel(metric.replace('_', ' '))
        plt.xlabel('No of Evaluations')

        plt.legend()
        plt.title(title)
        if file:
            plt.savefig(file)
        else:
            file = os.path.join(self.results.output_folder, "optimizer_history.png")
            plt.savefig(file)
        plt.close()

    def get_importance_scores(self):
        """
        This function returns the importance scores for the best configuration of each outer fold.
        """
        imps = []
        for i, fold in enumerate(self.results.outer_folds):
            imps.append(fold.best_config.best_config_score.feature_importances)
        return imps

    @staticmethod
    def collect_fold_lists(score_info_list, fold_nr, predictions_filename=''):
        if len(score_info_list) > 0:
            fold_nr_array = []
            collectables = {'y_pred': [], 'y_true': [], 'indices': [], 'probabilities': []}

            for i, score_info in enumerate(score_info_list):
                for collectable_key, collectable_list in collectables.items():
                    if getattr(score_info, collectable_key) is not None and len(
                            getattr(score_info, collectable_key)) > 0:
                        collectables[collectable_key].extend(list(getattr(score_info, collectable_key)))
                    else:
                        collectables[collectable_key].extend(list(np.full((len(score_info.y_true)), np.nan)))
                fold_nr_array.extend(list(np.ones((len(score_info.y_true),)) * fold_nr[i]))

            # enable nd y_pred support
            if len(collectables["y_pred"]) > len(collectables["y_true"]):
                tmp_collectables_y_pred = collectables["y_pred"]
                headers = collectables["y_pred"][0]
                for i, header in enumerate(list(headers)):
                    collectables[header] = [x[i] for x in tmp_collectables_y_pred if x != tmp_collectables_y_pred[0]]

            collectables["fold"] = fold_nr_array
            # convert to pandas dataframe to use their sorting algorithm
            save_df = pd.DataFrame(collectables)
            sorted_df = save_df.sort_values(by='indices')

            if predictions_filename != '':
                sorted_df.to_csv(predictions_filename, index=None)

            return sorted_df.to_dict('list')

    def get_mean_train_predictions(self, filename=''):
        """
         This function returns the MEAN predictions, true targets, and fold index
         for the TRAINING Set of the best configuration of each outer fold.
         """
        if self.results is None:
            raise ValueError("Result tree information is needed but results attribute of object is None.")

        score_info_list = list()
        fold_nr_list = list()
        for outer_fold in self.results.outer_folds:
            score_info_list.append(outer_fold.best_config.best_config_score.training)
            fold_nr_list.append(outer_fold.fold_nr)
        infos = self.collect_fold_lists(score_info_list, fold_nr_list, filename)
        infos = {key: np.array(value) for key, value in infos.items()}
        num_items = np.unique(infos["indices"])
        mean_pred = np.zeros(num_items.shape)
        y_true = np.zeros(num_items.shape)
        for i in num_items:
            idx = (infos["indices"] == i)
            mean_pred[i] = np.mean(infos["y_pred"][idx])
            y_true[i] = infos["y_true"][idx][0]
        return {'y_true': y_true, 'y_pred': mean_pred, 'indices': num_items}

    def get_test_predictions(self, filename=''):
        """
        This function returns the predictions, true targets, and fold index
        for the best configuration of each outer fold.
        """
        if self.results is None:
            raise ValueError("Result tree information is needed but results attribute of object is None.")

        score_info_list = list()
        fold_nr_list = list()
        for outer_fold in self.results.outer_folds:
            score_info_list.append(outer_fold.best_config.best_config_score.validation)
            fold_nr_list.append(outer_fold.fold_nr)
        return self.collect_fold_lists(score_info_list, fold_nr_list, filename)

    def get_validation_predictions(self, outer_fold_nr=0, config_no=0, filename=''):
        """
        This function returns the predictions, probabilities, true targets, fold and index
        for the config_nr of the given outer_fold
        """
        score_info_list = list()
        fold_nr_list = list()

        if self.results is None:
            raise ValueError("Result tree information is needed but results attribute of object is None.")

        # Todo: find config by config_id
        for inner_fold in self.results.outer_folds[outer_fold_nr].tested_config_list[config_no].inner_folds:
            score_info_list.append(inner_fold.validation)
            fold_nr_list.append(inner_fold.fold_nr)

        return self.collect_fold_lists(score_info_list, fold_nr_list, filename)

    def eval_mean_time_components(self, write_results=True, plotly_return=False):
        """
            This function create charts and tables out of the time-monitoring.
        """
        result_dict = {}
        caching = False
        default_dict = {'total_seconds': 0,
                        'total_items_processed': 0,
                        'mean_seconds_per_config': 0,
                        'mean_seconds_per_item': 0}

        # sum up times per element, 1. per config, and 2. in total
        for outer_fold in self.results.outer_folds:
            for config_nr, config in enumerate(outer_fold.tested_config_list):
                tmp_config_dict = {}
                # resort time entries for each element so that is has the following structure
                # element_name -> fit/transform/predict -> (seconds, nr_items)
                for inner_fold in config.inner_folds:
                    for time_key, time_values in inner_fold.time_monitor.items():
                        for value_item in time_values:
                            name, time, nr_items = value_item[0], value_item[1], value_item[2]
                            if name not in tmp_config_dict:
                                tmp_config_dict[name] = {}
                            if time_key not in tmp_config_dict[name]:
                                tmp_config_dict[name][time_key] = []
                            tmp_config_dict[name][time_key].append((time, nr_items))

                # calculate mean time per config and absolute time
                for element_name, element_time_dict in tmp_config_dict.items():
                    for element_time_key, element_time_list in element_time_dict.items():
                        if element_time_key == "transform_cached":
                            caching = True
                        mean_time = np.mean([i[0] for i in element_time_list])
                        total_time = np.sum([i[0] for i in element_time_list])
                        total_items_processed = np.sum([i[1] for i in element_time_list])

                        if element_name not in result_dict:
                            result_dict[element_name] = {}
                        if element_time_key not in result_dict[element_name]:
                            result_dict[element_name][element_time_key] = dict(default_dict)

                        result_dict[element_name][element_time_key]['total_seconds'] += total_time
                        result_dict[element_name][element_time_key]['total_items_processed'] += total_items_processed
                        mean_time_per_config = result_dict[element_name][element_time_key]['mean_seconds_per_config']
                        tmp_total_mean = ((mean_time_per_config * config_nr) + mean_time) / (config_nr + 1)
                        result_dict[element_name][element_time_key]['mean_seconds_per_config'] = tmp_total_mean
                        tmp_mean_per_item = result_dict[element_name][element_time_key]['total_seconds'] / \
                                            result_dict[element_name][element_time_key]['total_items_processed']
                        result_dict[element_name][element_time_key]['mean_seconds_per_item'] = tmp_mean_per_item

        format_str = '{:06.6f}'
        if caching:
            # in case we used caching add transform_cached and transform_computed values to transform_total
            for name, sub_result_dict in result_dict.items():
                if "transform_cached" in sub_result_dict:
                    result_dict[name]["transform"] = dict(default_dict)
                    for value_dict in sub_result_dict.values():
                        for info in value_dict.keys():
                            result_dict[name]["transform"][info] = result_dict[name]["transform_cached"][info]
                            # in case everything's been in the cache we have no computation
                            if "transform_computed" in sub_result_dict:
                                result_dict[name]["transform"][info] += result_dict[name]["transform_computed"][info]
                    if "transform_computed" in sub_result_dict:
                        # calculate a ratio, if caching was helpful and how much of the time it saved
                        result_dict[name]["cache_ratio"] = result_dict[name]["transform_cached"]["total_seconds"] / \
                                                           result_dict[name]["transform_computed"]["total_seconds"]

            # in case of caching we have different plot plus a different csv file
            csv_keys = ["fit", "transform", "transform_computed", "transform_cached", "predict"]
            csv_titles = csv_keys
            plot_list = ["fit", "transform", "transform_cached"]
            method_list = ["fit", "transform_computed", "transform_cached", "predict"]
        else:
            csv_keys = ["fit", "transform_computed", "predict"]
            csv_titles = ["fit", "transform", "predict"]
            plot_list = ["fit", "transform_computed"]
            method_list = ["fit", "transform_computed", "predict"]

        # write csv file with time analysis
        if write_results:
            sub_keys = ["total_seconds", "mean_seconds_per_config", "mean_seconds_per_item"]
            csv_filename = os.path.join(self.results.output_folder, 'time_monitor.csv')
            with open(csv_filename, 'w') as csvfile:
                writer = csv.writer(csvfile)
                header1 = [""]
                for k_name in csv_titles:
                    header1.extend([k_name, "", ""])
                header2 = ["Element"] + (sub_keys * len(csv_titles))
                if caching:
                    header1.append("")
                    header2.append("cache_ratio")
                writer.writerow(header1)
                writer.writerow(header2)
                for item, item_dict in result_dict.items():
                    row = [item]
                    for time_key in csv_keys:
                        for sub_key in sub_keys:
                            if time_key in item_dict:
                                row.append(format_str.format(item_dict[time_key][sub_key]))
                            else:
                                row.append('')
                    if caching:
                        if "cache_ratio" in item_dict:
                            row.append(item_dict["cache_ratio"])
                    writer.writerow(row)

        # plot figure
        # TODO! Use PiePlotlyPlot class without cricle imports
        plotly_dict = {'layout': {'title': 'Time Monitor Pie Chart',
                                  'showlegend': True,
                                  'height': 600,
                                  'annotations': []},
                       'data': []
                       }

        def append_plotly(labels, values, name, colors, domain):
            """
            helper function (temporary -> to.do above)
            """
            plotly_dict["data"].append({'labels': labels,
                                        'values': values,
                                        'type': 'pie',
                                        'name': name,
                                        'marker': {'colors': colors},
                                        'domain': domain,
                                        'hoverinfo': 'label+percent',
                                        'textposition': 'inside'})
            plotly_dict['layout']['annotations'].append({
                "x": np.mean(domain["x"]),
                "y": (domain["y"][1]),
                "font": {
                    "size": 16
                },
                "text": name,
                "xref": "paper",
                "yref": "paper",
                "xanchor": "center",
                "yanchor": "bottom",
                "showarrow": False
            })

        def eval_mean_time_autopct(values):
            def my_autopct(pct):
                total = sum(values)
                if pct/total >= 1:
                    return str(round(pct, 1))+"%"
                else:
                    return None

            return my_autopct

        # Create nxm sub plots
        cpl = len(plot_list)
        gs = matplotlib.gridspec.GridSpec(int((cpl-1)/3)+2, min(cpl, 3))
        legend_theme = plt.get_cmap('Set3')
        legend_theme2 = plt.get_cmap('tab10')

        element_names = [name for name, element in result_dict.items()]

        fig = plt.figure(figsize=(10, 7), dpi=160)
        colors = [legend_theme(1. * i / len(element_names)) for i in range(len(element_names))]
        for i, k in enumerate(plot_list):
            ax = plt.subplot(gs[int(i/3), i % 3])
            ax.set_prop_cycle("color", colors)
            data = [element[k]["total_seconds"] if k in element else 0 for name, element in result_dict.items()]
            data_sum = sum(data)
            if data_sum == 0:
                data_sum = 1
            values = [val/data_sum for val in data]
            patches, _, _ = plt.pie(values,
                                    shadow=True,
                                    startangle=90,
                                    autopct=eval_mean_time_autopct(data),
                                    pctdistance=0.7)
            plt.axis('equal')
            plt.title(k)
            append_plotly(labels=[str(d) for d in element_names],
                          values=values,
                          name=k,
                          colors=colors,
                          domain={'x': [i/len(plot_list), (i+1)/len(plot_list)], 'y': [0.55, 1]})

        plt.legend(
            loc='upper left',
            labels=['%s' % l for l in element_names],
            prop={'size': 10},
            bbox_to_anchor=(0.0, 1),
            bbox_transform=fig.transFigure
        )

        # add another plot for the comparison of the fit/transform/predict methods
        ax2 = plt.subplot(gs[int(i/3)+1, :])
        colors = [legend_theme2(1. * i / len(data)) for i in range(len(method_list))]
        ax2.set_prop_cycle("color", colors)
        data = []
        for k in method_list:
            data.append(np.sum([element[k]["total_seconds"] for name, element in result_dict.items() if k in element]))
        patches_an, _, _ = plt.pie([val/sum(data) for val in data],
                                   shadow=True,
                                   startangle=90,
                                   pctdistance=0.7,
                                   autopct=eval_mean_time_autopct(data))

        append_plotly(labels=method_list, values=[val / sum(data) for val in data], name="methods",
                      colors=colors, domain={'x': [0, 1], 'y': [0, 0.45]})

        plt.axis('equal')
        plt.title("methods")
        plt.legend(
            loc='lower left',
            labels=['%s' % l for l in method_list],
            prop={'size': 10},
            bbox_transform=fig.transFigure
        )

        # for only one legend
        #fig.legend(patches+patches_an, element_names+method_list, prop={'size': 10}, loc='lower left')

        if write_results:
            plt.savefig(os.path.join(self.results.output_folder, 'time_monitor_pie.png'))
        plt.close()
        if plotly_return:
            str_fig = "var layout =" + str(plotly_dict["layout"]) + ";"
            str_fig += "var data = " + str(plotly_dict["data"]) + ";"
            str_fig += "Plotly.newPlot('" + "time_monitor_pie_id" + "',data, layout);"
            return str_fig.replace("False", "false").replace("True", "true")

    def save(self):

        if self.output_settings.mongodb_connect_url:
            connect(self.output_settings.mongodb_connect_url, alias='photon_core')
            logger.info('Write results to mongodb...')
            try:
                self.results.save()
            except DocumentTooLarge:
                logger.error('Could not save document into MongoDB: Document too large')
        if self.output_settings.save_output:
            logger.info("Writing results to project folder...")
            self.write_result_tree_to_file()

    def save_backmapping(self, filename: str, backmapping):
        try:
            if isinstance(backmapping, list):
                backmapping = np.asarray(backmapping)

            try:
                from nibabel.nifti1 import Nifti1Image
                if isinstance(backmapping, Nifti1Image):
                    backmapping.to_filename(os.path.join(self.results.output_folder, filename + '.nii.gz'))
            except ImportError:
                pass
            finally:
                if isinstance(backmapping, np.ndarray):
                    if backmapping.size > 1000:
                        np.savez(os.path.join(self.results.output_folder, filename + '.npz'), backmapping)
                    else:
                        np.savetxt(os.path.join(self.results.output_folder, filename + '.csv'), backmapping, delimiter=',')
                else:
                    with open(os.path.join(self.results.output_folder, filename + '.p'), 'wb') as f:
                        pickle.dump(backmapping, f)
        except Exception as e:
            logger.error("Could not save backmapped feature importances.")
            logger.error(e)

    def write_convenience_files(self):
        if self.output_settings.save_output:
            logger.info("Writing summary file, plots and prediction csv to result folder ...")
            self.write_summary()
            self.write_predictions_file()

    def convert_to_json_serializable(self, value):
        if isinstance(value, (int, np.int32, np.int64)):
            return int(value)
        if isinstance(value, (float, np.float32, np.float64)):
            if self.output_settings.reduce_space:
                return round(float(value), 3)
            return float(value)
        else:
            return json_util.default(value)

    def write_result_tree_to_file(self):
        try:
            local_file = os.path.join(self.results.output_folder, 'photon_result_file.json')
            result = self.round_floats(self.results.to_son().to_dict())

            with open(local_file, 'w') as outfile:
                json.dump(result, outfile, default=self.convert_to_json_serializable)
        except OSError as e:
            logger.error("Could not write results to local file")
            logger.error(str(e))

    @classmethod
    def round_floats(cls, d):
        # recursive method for rounding all floats in result.json
        result = {}
        if isinstance(d, dict):
            for key, value in d.items():
                value = cls.round_floats(value)
                result.update({key: value})
            return result
        elif isinstance(d, list):
            return [cls.round_floats(val) for val in d]
        elif isinstance(d, float):
            return round(d, 6)
        else:
            return d

    def get_best_config_inner_fold_predictions(self, filename=''):
        score_info_list = []
        fold_nr = []
        for inner_fold in self.results.best_config.inner_folds:
            score_info_list.append(inner_fold.validation)
            fold_nr.append(inner_fold.fold_nr)
        return self.collect_fold_lists(score_info_list, fold_nr, filename)

    def write_predictions_file(self):
          if self.output_settings.save_output:
            filename = os.path.join(self.output_settings.results_folder, 'best_config_predictions.csv')
            # usually we write the predictions for the outer fold
            if not self.output_settings.save_predictions_from_best_config_inner_folds:
                return self.get_test_predictions(filename)
            # in case no outer folds exist, we write the inner_fold predictions
            else:
                return self.get_best_config_inner_fold_predictions(filename)

    def _get_best_outer_fold_configs_per_estimator(self) -> dict:
        # 1. find out which estimators there are
        last_element_name_identifier, last_element_dict = list(self.results.hyperpipe_info.elements.items())[-1]
        no_switch_found = False
        if not ":" in last_element_name_identifier:
            no_switch_found = True

        last_element_base_element, last_element_name = last_element_name_identifier.split(":")
        if not last_element_base_element == "SWITCH":
            no_switch_found = True

        if no_switch_found:
            logger.info("Could not identify switch at the end of the pipeline. Estimator Comparison aborted.")
            return

        # generate config key by switch name
        search_key = last_element_name + "__" + "estimator_name"
        estimator_list = last_element_dict.keys()
        best_configs_from_estimators = dict()
        for estimator in estimator_list:
            best_configs_from_estimators[estimator] = list()

        # 2. iterate list and filter configs
        for outer_fold in self.results.outer_folds:
            for estimator_name in estimator_list:
                try:
                    best_estimator_config = outer_fold.get_optimum_config(
                        metric=self.results.hyperpipe_info.best_config_metric,
                        maximize_metric=self.results.hyperpipe_info.
                        maximize_best_config_metric,
                        dict_filter=(search_key, estimator_name))
                    best_configs_from_estimators[estimator_name].append(best_estimator_config)
                except Warning as w:
                    logger.info("Could not find best config for estimator {} "
                                "in outer fold {}".format(estimator_name, outer_fold.fold_nr))

        return best_configs_from_estimators

    def get_n_best_validation_configs_per_estimator(self, n=10, estimator_names=None) -> dict:
        best_configs_from_estimator = self._get_best_outer_fold_configs_per_estimator()
        if estimator_names:
            all_estimators = list(best_configs_from_estimator.keys())
            for name in all_estimators:
                if name not in estimator_names:
                    del best_configs_from_estimator[name]

        best_n_config_dict = dict()
        for estimator_name, estimator_list in best_configs_from_estimator.items():
            if n > len(estimator_list):
                n_configs_per_estimator = [c.config_dict for c in estimator_list]
            else:
                sort_order = np.argsort([[c.get_test_metric(self.results.hyperpipe_info.best_config_metric, 'mean')
                                          for c in estimator_list]])
                n_configs_per_estimator = [estimator_list[idx].config_dict for idx in sort_order[:n]]
            best_n_config_dict[estimator_name] = n_configs_per_estimator
            print_config_list_table(estimator_name, n_configs_per_estimator)

        return best_n_config_dict

    def get_mean_of_best_validation_configs_per_estimator(self, write_to_file=False):

        best_configs_from_estimators = self._get_best_outer_fold_configs_per_estimator()

        # get mean values for each metric for each estimator config list
        estimator_performance_values = dict()
        for estimator_name, estimator_config_list in best_configs_from_estimators.items():
            estimator_performance_values[estimator_name] = dict()
            for metric in self.results.hyperpipe_info.metrics:
                performance_values = [c.get_test_metric(metric, 'mean')
                                      for c in estimator_config_list]
                estimator_performance_values[estimator_name][metric] = np.mean(performance_values)
        output = print_estimator_metrics(estimator_performance_values, self.results.hyperpipe_info.metrics, True)
        if write_to_file:
            text_file = open(os.path.join(self.output_settings.results_folder,
                                          "mean_best_estimator_performance.txt"), "w")
            text_file.write(output)
            text_file.close()
        return output

    def text_summary(self):
        def divider(header):
            return header.ljust(101, '=')

        output_string = divider("ANALYSIS INFORMATION ")

        elapsed_time = self.results.computation_end_time - self.results.computation_start_time
        output_string += """ 
Project Folder: {},
Computation Time: {} - {}
Duration: {}
Optimized for: {}
Hyperparameter Optimizer: {}

""".format(self.output_settings.results_folder,
           self.results.computation_start_time,
           self.results.computation_end_time,
           elapsed_time,
           self.results.hyperpipe_info.best_config_metric,
           self.results.hyperpipe_info.optimization["Optimizer"])

        output_string += divider("DUMMY RESULTS ")
        output_string += """
{}

""".format(print_metrics("DUMMY", self.results.dummy_estimator.get_test_metric(operation='mean'), summary=True))

        output_string += divider("AVERAGE PERFORMANCE ACROSS OUTER FOLDS ")

        test_metrics = self.results.get_test_metric_dict()
        train_metrics = self.results.get_train_metric_dict()
        output_string += """
{}

""".format(self.print_table_for_performance_overview(train_metrics, test_metrics))

        output_string += divider("BEST HYPERPARAMETER CONFIGURATION ")
        output_string += """
{}

""".format(json.dumps(self.results.best_config.human_readable_config, indent=4, sort_keys=True))

        output_string += """
{}

""".format(print_outer_folds(self.results.hyperpipe_info.metrics, self.results.outer_folds, summary=True))
        output_string += divider("PHOTONAI {} ".format(__version__))

        if self.output_settings.results_folder is not None:
            output_string += "\nYour results are stored in " + self.output_settings.results_folder + "\n"
            output_string += "Go to https://explorer.photon-ai.com and upload your photon_result_file.json " \
                             "for convenient result visualization! \n"
            output_string += "For more info and documentation visit https://www.photon-ai.com"

        if self.output_settings.save_output:
            try:
                summary_filename = os.path.join(self.output_settings.results_folder, 'photon_summary.txt')
                text_file = open(summary_filename, "w")
                text_file.write(output_string)
                text_file.close()
            except OSError as e:
                logger.error("Could not write summary file")
                logger.error(str(e))

            return output_string

    @staticmethod
    def print_table_for_performance_overview(metric_dict_train, metric_dict_test):
        x = PrettyTable()
        x.field_names = ["Metric Name", "Training Mean", "Training Std", "Test Mean", "Test Std"]
        for element_key, element_dict in metric_dict_train.items():
            x.add_row([element_key, element_dict["mean"], element_dict["std"],
                       metric_dict_test[element_key]["mean"], metric_dict_test[element_key]["std"]])
        return x

__init__(self, results_object=None, output_settings=None) special

Initialize the object.

Parameters:

Name Type Description Default
results_object MDBHyperpipe

All results are stored here. An initial setting is not necessary, because a later loading via file or MongoDB is possible.

None
output_settings OutputSettings

Setting for creation and storage of the results_object.

None
Source code in photonai/processing/results_handler.py
def __init__(self, results_object: MDBHyperpipe = None, output_settings=None):
    """
    Initialize the object.

    Parameters:
        results_object:
            All results are stored here.
            An initial setting is not necessary,
            because a later loading via file or MongoDB is possible.

        output_settings (OutputSettings):
            Setting for creation and storage of the results_object.

    """
    self.results = results_object
    self.output_settings = output_settings

get_config_evaluations(self)

Return the test performance of every tested configuration in every outer fold.

Returns:

Type Description
dict

Test performance of every configuration.

Source code in photonai/processing/results_handler.py
def get_config_evaluations(self) -> dict:
    """
    Return the test performance of every tested configuration in every outer fold.

    Returns:
        Test performance of every configuration.

    """
    config_performances = list()
    maximum_fold = None
    for outer_fold in self.results.outer_folds:
        if maximum_fold is None or len(outer_fold.tested_config_list) > maximum_fold:
            maximum_fold = len(outer_fold.tested_config_list)

    for outer_fold in self.results.outer_folds:
        performance = dict()
        for metric in self.results.hyperpipe_info.metrics:
            performance[metric] = list()

        for i in range(maximum_fold):
            # for config in outer_fold.tested_config_list:
            for metric in self.results.hyperpipe_info.metrics:
                if i >= len(outer_fold.tested_config_list):
                    performance[metric].append(np.nan)
                    continue
                config = outer_fold.tested_config_list[i]
                if config.config_failed:
                    performance[metric].append(np.nan)
                else:
                    for item in config.metrics_test:
                        if (item.operation == 'mean') and (item.metric_name == metric):
                            performance[metric].append(item.value)
        config_performances.append(performance)

    config_performances_dict = dict()
    for metric in self.results.hyperpipe_info.metrics:
        config_performances_dict[metric] = list()
        for fold in config_performances:
            config_performances_dict[metric].append(fold[metric])

    return config_performances_dict

get_methods() staticmethod

This function returns a list of all methods available for ResultsHandler.

Returns:

Type Description
list

List of all available methods.

Source code in photonai/processing/results_handler.py
@staticmethod
def get_methods() -> list:
    """
    This function returns a list of all methods available for ResultsHandler.

    Returns:
        List of all available methods.

    """
    methods_list = [s for s in dir(ResultsHandler) if '__' not in s]
    return methods_list

get_performance_table(self)

This function returns a summary table of the overall results.

ToDo: add best_config information!

Source code in photonai/processing/results_handler.py
def get_performance_table(self):
    """This function returns a summary table of the overall results.

    ToDo: add best_config information!
    """
    res_tab = pd.DataFrame()
    for i, folds in enumerate(self.results.outer_folds):
        # add best config infos
        res_tab.loc[i, 'best_config'] = str(folds.best_config.human_readable_config)

        # add fold index
        res_tab.loc[i, 'fold'] = folds.fold_nr

        # add sample size infos
        res_tab.loc[i, 'n_train'] = folds.best_config.best_config_score.number_samples_training
        res_tab.loc[i, 'n_validation'] = folds.best_config.best_config_score.number_samples_validation

        # add performance metrics
        d = folds.best_config.best_config_score.validation.metrics
        for key, value in d.items():
            res_tab.loc[i, key] = value

    # add row with overall info
    res_tab.loc[i + 1, 'n_validation'] = np.sum(res_tab['n_validation'])
    for key, value in d.items():
        m = res_tab.loc[:, key]
        res_tab.loc[i+1, key] = np.mean(m)
        res_tab.loc[i + 1, key + '_sem'] = sem(m)   # standard error of the mean
    res_tab.loc[i + 1, 'best_config'] = 'Overall'
    return res_tab

load_from_file(self, results_file)

Read results_file from json into MDBHyperpipe object self.results.

Parameters:

Name Type Description Default
results_file str

Full path to json file.

required
Source code in photonai/processing/results_handler.py
def load_from_file(self, results_file: str):
    """
    Read results_file from json into MDBHyperpipe object self.results.

    Parameters:
        results_file:
            Full path to json file.

    """
    self.results = MDBHyperpipe.from_document(json.load(open(results_file, 'r')))

load_from_mongodb(self, mongodb_connect_url, pipe_name)

Read results_file from MongoDB into MDBHyperpipe object self.results.

Parameters:

Name Type Description Default
mongodb_connect_url str

MongoDB connection string.

required
pipe_name str

Name of the stored hyperpipe.

required
Source code in photonai/processing/results_handler.py
def load_from_mongodb(self, mongodb_connect_url: str, pipe_name: str):
    """
    Read results_file from MongoDB into MDBHyperpipe object self.results.

    Parameters:
        mongodb_connect_url:
            MongoDB connection string.

        pipe_name:
            Name of the stored hyperpipe.

    """
    connect(mongodb_connect_url, alias="photon_core")
    results = list(MDBHyperpipe.objects.raw({'name': pipe_name}))
    if len(results) == 1:
        self.results = results[0]
    elif len(results) > 1:
        self.results = MDBHyperpipe.objects.order_by([("computation_start_time", DESCENDING)]).raw({'name': pipe_name}).first()
        warn_text = 'Found multiple hyperpipes with that name. Returning most recent one.'
        logger.warning(warn_text)
        warnings.warn(warn_text)
    else:
        raise FileNotFoundError('Could not load hyperpipe from MongoDB.')