Skip to content

Documentation for Hyperpipe

The PHOTONAI Hyperpipe class creates a custom machine learning pipeline. In addition it defines the relevant analysis’ parameters such as the cross-validation scheme, the hyperparameter optimization strategy, and the performance metrics of interest.

So called PHOTONAI PipelineElements can be added to the Hyperpipe, each of them being a data-processing method or a learning algorithm. By choosing and combining data-processing methods or algorithms, and arranging them with the PHOTONAI classes, simple and complex pipeline architectures can be designed rapidly.

The PHOTONAI Hyperpipe automatizes the nested training, test and hyperparameter optimization procedures.

The Hyperpipe monitors:

  • the nested-cross-validated training and test procedure,
  • communicates with the hyperparameter optimization strategy,
  • streams information between the pipeline elements,
  • logs all results obtained and evaluates the performance,
  • guides the hyperparameter optimization process by a so-called best config metric which is used to select the best performing hyperparameter configuration.

Attributes:

Name Type Description
optimum_pipe PhotonPipeline

An sklearn pipeline object that is fitted to the training data according to the best hyperparameter configuration found. Currently, we don't create an ensemble of all best hyperparameter configs over all folds. We find the best config by comparing the test error across outer folds. The hyperparameter config of the best fold is used as the optimal model and is then trained on the complete set.

best_config dict

Dictionary containing the hyperparameters of the best configuration. Contains the parameters in the sklearn interface of model_name__parameter_name: parameter value.

results MDBHyperpipe

Object containing all information about the for the performed hyperparameter search. Holds the training and test metrics for all outer folds, inner folds and configurations, as well as additional information.

elements list

Contains `all PipelineElement or Hyperpipe objects that are added to the pipeline.

Examples:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from photonai.base import Hyperpipe, PipelineElement
from photonai.optimization import FloatRange
from sklearn.model_selection import ShuffleSplit, KFold
from sklearn.datasets import load_breast_cancer

hyperpipe = Hyperpipe('myPipe',
                      optimizer='random_grid_search',
                      optimizer_params={'limit_in_minutes': 5},
                      outer_cv=ShuffleSplit(test_size=0.2, n_splits=3),
                      inner_cv=KFold(n_splits=10, shuffle=True),
                      metrics=['accuracy', 'precision', 'recall', "f1_score"],
                      best_config_metric='accuracy',
                      eval_final_performance=True,
                      verbosity=0)

hyperpipe += PipelineElement("SVC", hyperparameters={"C": FloatRange(1, 100)})

X, y = load_breast_cancer(return_X_y=True)
hyperpipe.fit(X, y)
Source code in photonai/base/hyperpipe.py
class Hyperpipe(BaseEstimator):
    """The PHOTONAI Hyperpipe class creates a custom
    machine learning pipeline. In addition it defines
    the relevant analysis’ parameters such as the
    cross-validation scheme, the hyperparameter optimization
    strategy, and the performance metrics of interest.

    So called PHOTONAI PipelineElements can be added to
    the Hyperpipe, each of them being a data-processing
    method or a learning algorithm. By choosing and
    combining data-processing methods or algorithms,
    and arranging them with the PHOTONAI classes,
    simple and complex pipeline architectures can be designed rapidly.

    The PHOTONAI Hyperpipe automatizes the nested training,
    test and hyperparameter optimization procedures.

    The Hyperpipe monitors:

    - the nested-cross-validated training
        and test procedure,
    - communicates with the hyperparameter optimization
        strategy,
    - streams information between the pipeline elements,
    - logs all results obtained and evaluates the performance,
    - guides the hyperparameter optimization process by
        a so-called best config metric which is used to select
        the best performing hyperparameter configuration.

    Attributes:
        optimum_pipe (PhotonPipeline):
            An sklearn pipeline object that is fitted to the training data
            according to the best hyperparameter configuration found.
            Currently, we don't create an ensemble of all best hyperparameter
            configs over all folds. We find the best config by comparing
            the test error across outer folds. The hyperparameter config of the best
            fold is used as the optimal model and is then trained on the complete set.

        best_config (dict):
            Dictionary containing the hyperparameters of the
            best configuration. Contains the parameters in the sklearn
            interface of model_name__parameter_name: parameter value.

        results (MDBHyperpipe):
            Object containing all information about the for the
            performed hyperparameter search. Holds the training and test
            metrics for all outer folds, inner folds
            and configurations, as well as additional information.

        elements (list):
            Contains `all PipelineElement or Hyperpipe
            objects that are added to the pipeline.

    Example:
        ``` python
        from photonai.base import Hyperpipe, PipelineElement
        from photonai.optimization import FloatRange
        from sklearn.model_selection import ShuffleSplit, KFold
        from sklearn.datasets import load_breast_cancer

        hyperpipe = Hyperpipe('myPipe',
                              optimizer='random_grid_search',
                              optimizer_params={'limit_in_minutes': 5},
                              outer_cv=ShuffleSplit(test_size=0.2, n_splits=3),
                              inner_cv=KFold(n_splits=10, shuffle=True),
                              metrics=['accuracy', 'precision', 'recall', "f1_score"],
                              best_config_metric='accuracy',
                              eval_final_performance=True,
                              verbosity=0)

        hyperpipe += PipelineElement("SVC", hyperparameters={"C": FloatRange(1, 100)})

        X, y = load_breast_cancer(return_X_y=True)
        hyperpipe.fit(X, y)
        ```

    """
    def __init__(self, name: Optional[str],
                 inner_cv: Union[BaseCrossValidator, BaseShuffleSplit, _RepeatedSplits] = None,
                 outer_cv: Union[BaseCrossValidator, BaseShuffleSplit, _RepeatedSplits, None] = None,
                 optimizer: str = 'grid_search',
                 optimizer_params: dict = None,
                 metrics: Optional[List[Union[Scorer.Metric_Type, str]]] = None,
                 best_config_metric: Optional[Union[Scorer.Metric_Type, str]] = None,
                 eval_final_performance: bool = None,
                 use_test_set: bool = True,
                 test_size: float = 0.2,
                 project_folder: str = '',
                 calculate_metrics_per_fold: bool = True,
                 calculate_metrics_across_folds: bool = False,
                 random_seed: int = None,
                 verbosity: int = 0,
                 learning_curves: bool = False,
                 learning_curves_cut: FloatRange = None,
                 output_settings: OutputSettings = None,
                 performance_constraints: list = None,
                 permutation_id: str = None,
                 cache_folder: str = None,
                 nr_of_processes: int = 1,
                 allow_multidim_targets: bool = False):
        """
        Initialize the object.

        Parameters:
            name:
                Name of hyperpipe instance.

            inner_cv:
                Cross validation strategy to test hyperparameter configurations, generates the validation set.

            outer_cv:
                Cross validation strategy to use for the hyperparameter search itself, generates the test set.

            optimizer:
                Hyperparameter optimization algorithm.

                - In case a string literal is given:
                    - "grid_search": Optimizer that iteratively tests all possible hyperparameter combinations.
                    - "random_grid_search": A variation of the grid search optimization that randomly picks
                        hyperparameter combinations from all possible hyperparameter combinations.
                    - "sk_opt": Scikit-Optimize based on theories of bayesian optimization.
                    - "random_search": randomly chooses hyperparameter from grid-free domain.
                    - "smac": SMAC based on theories of bayesian optimization.
                    - "nevergrad": Nevergrad based on theories of evolutionary learning.

                - In case an object is given:
                    expects the object to have the following methods:
                    - `ask`: returns a hyperparameter configuration in form of an dictionary containing
                        key->value pairs in the sklearn parameter encoding `model_name__parameter_name: parameter_value`
                    - `prepare`: takes a list of pipeline elements and their particular hyperparameters to prepare the
                                 hyperparameter space
                    - `tell`: gets a tested config and the respective performance in order to
                        calculate a smart next configuration to process

            metrics:
                Metrics that should be calculated for both training, validation and test set
                Use the preimported metrics from sklearn and photonai, or register your own

                - Metrics for `classification`:
                    - `accuracy`: sklearn.metrics.accuracy_score
                    - `matthews_corrcoef`: sklearn.metrics.matthews_corrcoef
                    - `confusion_matrix`: sklearn.metrics.confusion_matrix,
                    - `f1_score`: sklearn.metrics.f1_score
                    - `hamming_loss`: sklearn.metrics.hamming_loss
                    - `log_loss`: sklearn.metrics.log_loss
                    - `precision`: sklearn.metrics.precision_score
                    - `recall`: sklearn.metrics.recall_score
                - Metrics for `regression`:
                    - `mean_squared_error`: sklearn.metrics.mean_squared_error
                    - `mean_absolute_error`: sklearn.metrics.mean_absolute_error
                    - `explained_variance`: sklearn.metrics.explained_variance_score
                    - `r2`: sklearn.metrics.r2_score
                - Other metrics
                    - `pearson_correlation`: photon_core.framework.Metrics.pearson_correlation
                    - `variance_explained`:  photon_core.framework.Metrics.variance_explained_score
                    - `categorical_accuracy`: photon_core.framework.Metrics.categorical_accuracy_score

            best_config_metric:
                The metric that should be maximized or minimized in order to choose
                the best hyperparameter configuration.

            eval_final_performance:
                DEPRECATED! Use "use_test_set" instead!

            use_test_set:
                If the metrics should be calculated for the test set,
                otherwise the test set is seperated but not used.

            project_folder:
                The output folder in which all files generated by the
                PHOTONAI project are saved to.

            test_size:
                The amount of the data that should be left out if no outer_cv is given and
                eval_final_performance is set to True.

            calculate_metrics_per_fold:
                If True, the metrics are calculated for each inner_fold.
                If False, calculate_metrics_across_folds must be True.

            calculate_metrics_across_folds:
                If True, the metrics are calculated across all inner_fold.
                If False, calculate_metrics_per_fold must be True.

            random_seed:
                Random Seed.

            verbosity:
                The level of verbosity, 0 is least talkative and
                gives only warn and error, 1 gives adds info and 2 adds debug.

            learning_curves:
                Enables learning curve procedure. Evaluate learning process over
                different sizes of input. Depends on learning_curves_cut.

            learning_curves_cut:
                The tested relative cuts for data size.

            performance_constraints:
                Objects that indicate whether a configuration should
                be tested further. For example, the inner fold of a config
                does not perform better than the dummy performance.

            permutation_id:
                String identifier for permutation tests.

            cache_folder:
                Folder path for multi-processing.

            nr_of_processes:
                Determined the amount of simultaneous calculation of outer_folds.

            allow_multidim_targets:
                Allows multidimensional targets.

        """

        self.name = re.sub(r'\W+', '', name)

        if eval_final_performance is not None:
            depr_warning = "Hyperpipe parameter eval_final_performance is deprecated. It's called use_test_set now."
            use_test_set = eval_final_performance
            logger.warning(depr_warning)
            raise DeprecationWarning(depr_warning)

        # ====================== Cross Validation ===========================
        # check if both calculate_metrics_per_folds and calculate_metrics_across_folds is False
        if not calculate_metrics_across_folds and not calculate_metrics_per_fold:
            raise NotImplementedError("Apparently, you've set calculate_metrics_across_folds=False and "
                                      "calculate_metrics_per_fold=False. In this case PHOTONAI does not calculate "
                                      "any metrics which doesn't make any sense. Set at least one to True.")
        if inner_cv is None:
            msg = "PHOTONAI requires an inner_cv split. Please enable inner cross-validation. " \
                  "As exmaple: Hyperpipe(...inner_cv = KFold(n_splits = 3), ...). " \
                  "Ensure you import the cross_validation object first."
            logger.error(msg)
            raise AttributeError(msg)

        # use default cut 'FloatRange(0, 1, 'range', 0.2)' if learning_curves = True but learning_curves_cut is None
        if learning_curves and learning_curves_cut is None:
            learning_curves_cut = FloatRange(0, 1, 'range', 0.2)
        elif not learning_curves and learning_curves_cut is not None:
            learning_curves_cut = None

        self.cross_validation = Hyperpipe.CrossValidation(inner_cv=inner_cv,
                                                          outer_cv=outer_cv,
                                                          use_test_set=use_test_set,
                                                          test_size=test_size,
                                                          calculate_metrics_per_fold=calculate_metrics_per_fold,
                                                          calculate_metrics_across_folds=calculate_metrics_across_folds,
                                                          learning_curves=learning_curves,
                                                          learning_curves_cut=learning_curves_cut)

        # ====================== Data ===========================
        self.data = Hyperpipe.Data(allow_multidim_targets=allow_multidim_targets)

        # ====================== Output Folder and Log File Management ===========================
        if output_settings:
            self.output_settings = output_settings
        else:
            self.output_settings = OutputSettings()

        if project_folder == '':
            self.project_folder = os.getcwd()
        else:
            self.project_folder = project_folder

        self.output_settings.set_project_folder(self.project_folder)

        # update output options to add pipe name and timestamp to results folder
        self._verbosity = 0
        self.verbosity = verbosity
        self.output_settings.set_log_file()

        # ====================== Result Logging ===========================
        self.results_handler = None
        self.results = None
        self.best_config = None

        # ====================== Pipeline ===========================
        self.elements = []
        self._pipe = None
        self.optimum_pipe = None
        self.preprocessing = None

        # ====================== Performance Optimization ===========================
        if optimizer_params is None:
            optimizer_params = {}
        self.optimization = Optimization(metrics=metrics,
                                         best_config_metric=best_config_metric,
                                         optimizer_input=optimizer,
                                         optimizer_params=optimizer_params,
                                         performance_constraints=performance_constraints)

        # self.optimization.sanity_check_metrics()

        # ====================== Caching and Parallelization ===========================
        self.nr_of_processes = nr_of_processes
        if cache_folder:
            self.cache_folder = os.path.join(cache_folder, self.name)
        else:
            self.cache_folder = None

        # ====================== Internals ===========================

        self.permutation_id = permutation_id
        self.allow_multidim_targets = allow_multidim_targets
        self.is_final_fit = False

        # ====================== Random Seed ===========================
        self.random_state = random_seed
        if random_seed is not None:
            import random
            random.seed(random_seed)

    # ===================================================================
    # Helper Classes
    # ===================================================================

    class CrossValidation:

        def __init__(self, inner_cv, outer_cv,
                     use_test_set, test_size,
                     calculate_metrics_per_fold,
                     calculate_metrics_across_folds,
                     learning_curves,
                     learning_curves_cut):
            self.inner_cv = inner_cv
            self.outer_cv = outer_cv
            self.use_test_set = use_test_set
            self.test_size = test_size

            self.learning_curves = learning_curves
            self.learning_curves_cut = learning_curves_cut

            self.calculate_metrics_per_fold = calculate_metrics_per_fold
            # Todo: if self.outer_cv is LeaveOneOut: Set calculate metrics across folds to True -> Print
            self.calculate_metrics_across_folds = calculate_metrics_across_folds

            self.outer_folds = None
            self.inner_folds = dict()

    def __str__(self):
        return "Hyperpipe {}".format(self.name)

    class Data:

        def __init__(self, X=None, y=None, kwargs=None, allow_multidim_targets=False):
            self.X = X
            self.y = y
            self.kwargs = kwargs
            self.allow_multidim_targets = allow_multidim_targets

        def input_data_sanity_checks(self, data, targets, **kwargs):
            # ==================== SANITY CHECKS ===============================
            # 1. Make to numpy arrays
            # 2. erase all Nan targets

            logger.info("Checking input data...")
            self.X = data
            self.y = targets
            self.kwargs = kwargs

            try:
                if self.X is None:
                    raise ValueError("(Input-)data is a NoneType.")
                if self.y is None:
                    raise ValueError("(Input-)target is a NoneType.")

                shape_x = np.shape(self.X)
                shape_y = np.shape(self.y)
                if not self.allow_multidim_targets:
                    if len(shape_y) != 1:
                        if len(np.shape(np.squeeze(self.y))) == 1:
                            # use np.squeeze for non 1D targets.
                            self.y = np.squeeze(self.y)
                            shape_y = np.shape(self.y)
                            msg = "y has been automatically squeezed. If this is not your intention, block this " \
                                  "with Hyperpipe(allow_multidim_targets = True"
                            logger.warning(msg)
                            warnings.warn(msg)
                        else:
                            raise ValueError(
                                "Target is not one-dimensional. Multidimensional targets can cause problems"
                                "with sklearn metrics. Please override with "
                                "Hyperpipe(allow_multidim_targets = True).")
                if not shape_x[0] == shape_y[0]:
                    raise IndexError(
                        "Size of targets mismatch to size of the data: " + str(shape_x[0]) + " - " + str(shape_y[0]))
            except IndexError as ie:
                logger.error("IndexError: " + str(ie))
                raise ie
            except ValueError as ve:
                logger.error("ValueError: " + str(ve))
                raise ve
            except Exception as e:
                logger.error("Error: " + str(e))
                raise e

            # be compatible to list of (image-) files
            if isinstance(self.X, list):
                self.X = np.asarray(self.X)
            elif isinstance(self.X, (pd.DataFrame, pd.Series)):
                self.X = self.X.to_numpy()
            if isinstance(self.y, list):
                self.y = np.asarray(self.y)
            elif isinstance(self.y, pd.Series) or isinstance(self.y, pd.DataFrame):
                self.y = self.y.to_numpy()

            # at first first, erase all rows where y is Nan if preprocessing has not done it already
            try:
                nans_in_y = np.isnan(self.y)
                nr_of_nans = len(np.where(nans_in_y == 1)[0])
                if nr_of_nans > 0:
                    logger.info("You have {} Nans in your target vector, "
                                "PHOTONAI erases every data item that has a Nan Target".format(str(nr_of_nans)))
                    self.X = self.X[~nans_in_y]
                    self.y = self.y[~nans_in_y]
                    new_kwargs = dict()
                    for name, element_list in kwargs.items():
                        new_kwargs[name] = element_list[~nans_in_y]
                    self.kwargs = new_kwargs

            except Exception as e:
                # This is only for convenience so if it fails then never mind
                logger.error("Removing Nans in target vector failed: " + str(e))
                pass

            logger.info("Running analysis with " + str(self.y.shape[0]) + " samples.")

    # ===================================================================
    # Properties and Helper
    # ===================================================================
    @property
    def estimation_type(self):
        estimation_type = getattr(self.elements[-1], '_estimator_type')
        if estimation_type is None:
            raise NotImplementedError("Last element in Hyperpipe should be an estimator.")
        else:
            return estimation_type

    @property
    def verbosity(self):
        return self._verbosity

    @verbosity.setter
    def verbosity(self, value):
        self._verbosity = value
        self.output_settings.verbosity = self._verbosity
        self.output_settings.set_log_level()

    @staticmethod
    def disable_multiprocessing_recursively(pipe):
        if isinstance(pipe, (Stack, Branch, Switch, Preprocessing)):
            if hasattr(pipe, 'nr_of_processes'):
                pipe.nr_of_processes = 1
            for child in pipe.elements:
                if isinstance(child, Branch):
                    Hyperpipe.disable_multiprocessing_recursively(child)
                elif hasattr(child, 'base_element'):
                    Hyperpipe.disable_multiprocessing_recursively(child.base_element)
        elif isinstance(pipe, PhotonPipeline):
            for name, child in pipe.named_steps.items():
                Hyperpipe.disable_multiprocessing_recursively(child)
        else:
            if hasattr(pipe, 'nr_of_processes'):
                pipe.nr_of_processes = 1

    @staticmethod
    def recursive_cache_folder_propagation(element, cache_folder, inner_fold_id):
        if isinstance(element, (Switch, Stack, Preprocessing)):
            for child in element.elements:
                Hyperpipe.recursive_cache_folder_propagation(child, cache_folder, inner_fold_id)

        elif isinstance(element, Branch):
            # in case it's a Branch, we create a cache subfolder and propagate it to every child
            if cache_folder:
                cache_folder = os.path.join(cache_folder, element.name)
            Hyperpipe.recursive_cache_folder_propagation(element.base_element, cache_folder, inner_fold_id)
            # Hyperpipe.prepare_caching(element.base_element.cache_folder)

        elif isinstance(element, PhotonPipeline):
            element.fold_id = inner_fold_id
            element.cache_folder = cache_folder

            # pipe.caching is automatically set to True or False by .cache_folder setter

            for name, child in element.named_steps.items():
                # we need to check if any element is Branch, Stack or Swtich
                Hyperpipe.recursive_cache_folder_propagation(child, cache_folder, inner_fold_id)

        # else: if it's a simple PipelineElement, then we just don't do anything

    # ===================================================================
    # Pipeline Setup
    # ===================================================================

    def __iadd__(self, pipe_element: PipelineElement):
        """
        Add an element to the machine learning pipeline.
        Returns self.

        Parameters:
            pipe_element:
                The object to add to the machine learning pipeline,
                being either a transformer or an estimator.

        """
        if isinstance(pipe_element, Preprocessing):
            self.preprocessing = pipe_element
        elif isinstance(pipe_element, CallbackElement):
            pipe_element.needs_y = True
            self.elements.append(pipe_element)
        else:
            if isinstance(pipe_element, PipelineElement) or issubclass(type(pipe_element), PhotonNative):
                self.elements.append(pipe_element)
            else:
                raise TypeError("Element must be of type Pipeline Element")
        return self

    def add(self, pipe_element: PipelineElement):
        """
        Add an element to the machine learning pipeline.
        Returns self.

        Parameters:
            pipe_element:
                The object to add to the machine learning pipeline,
                being either a transformer or an estimator.

        """
        self.__iadd__(pipe_element)

    # ===================================================================
    # Workflow Setup
    # ===================================================================
    def _prepare_dummy_estimator(self):
        self.results.dummy_estimator = MDBDummyResults()

        if self.estimation_type == 'regressor':
            self.results.dummy_estimator.strategy = 'mean'
            return DummyRegressor(strategy=self.results.dummy_estimator.strategy)
        elif self.estimation_type == 'classifier':
            self.results.dummy_estimator.strategy = 'most_frequent'
            return DummyClassifier(strategy=self.results.dummy_estimator.strategy)
        else:
            logger.info('Estimator does not specify whether it is a regressor or classifier. '
                        'DummyEstimator step skipped.')
            return

    def __get_pipeline_structure(self, pipeline_elements):
        element_list = dict()
        for p_el in pipeline_elements:
            if not hasattr(p_el, 'name'):
                raise Warning('Strange Pipeline Element found that has no name..? Type: '.format(type(p_el)))
            if hasattr(p_el, 'elements'):
                child_list = self.__get_pipeline_structure(p_el.elements)
                identifier = p_el.name
                if hasattr(p_el, "identifier"):
                    identifier = p_el.identifier + identifier
                    element_list[identifier] = child_list
            else:
                if hasattr(p_el, 'base_element'):
                    element_list[p_el.name] = str(type(p_el.base_element))
                else:
                    element_list[p_el.name] = str(type(p_el))
        return element_list

    def _prepare_result_logging(self, start_time):

        self.results = MDBHyperpipe(name=self.name, version=__version__)
        self.results.hyperpipe_info = MDBHyperpipeInfo()

        # in case eval final performance is false, we have no outer fold predictions
        if not self.cross_validation.use_test_set:
            self.output_settings.save_predictions_from_best_config_inner_folds = True
        self.results_handler = ResultsHandler(self.results, self.output_settings)

        self.results.computation_start_time = start_time
        self.results.hyperpipe_info.estimation_type = self.estimation_type
        self.results.output_folder = self.output_settings.results_folder

        if self.permutation_id is not None:
            self.results.permutation_id = self.permutation_id

        # save wizard information to PHOTONAI db in order to map results to the wizard design object
        if self.output_settings and hasattr(self.output_settings, 'wizard_object_id'):
            if self.output_settings.wizard_object_id:
                self.name = self.output_settings.wizard_object_id
                self.results.name = self.output_settings.wizard_object_id
                self.results.wizard_object_id = ObjectId(self.output_settings.wizard_object_id)
                self.results.wizard_system_name = self.output_settings.wizard_project_name
                self.results.user_id = self.output_settings.user_id
        self.results.outer_folds = []
        self.results.hyperpipe_info.elements = self.__get_pipeline_structure(self.elements)
        self.results.hyperpipe_info.eval_final_performance = self.cross_validation.use_test_set
        self.results.hyperpipe_info.best_config_metric = self.optimization.best_config_metric
        self.results.hyperpipe_info.metrics = self.optimization.metrics
        self.results.hyperpipe_info.learning_curves_cut = self.cross_validation.learning_curves_cut
        self.results.hyperpipe_info.maximize_best_config_metric = self.optimization.maximize_metric

        # optimization
        def _format_cross_validation(cv):
            if cv:
                string = "{}(".format(cv.__class__.__name__)
                for key, val in cv.__dict__.items():
                    string += "{}={}, ".format(key, val)
                return string[:-2] + ")"
            else:
                return "None"

        self.results.hyperpipe_info.cross_validation = \
            {'OuterCV': _format_cross_validation(self.cross_validation.outer_cv),
             'InnerCV': _format_cross_validation(self.cross_validation.inner_cv)}
        self.results.hyperpipe_info.data = {'X_shape': self.data.X.shape, 'y_shape': self.data.y.shape}
        self.results.hyperpipe_info.optimization = {'Optimizer': self.optimization.optimizer_input_str,
                                                    'OptimizerParams': str(self.optimization.optimizer_params),
                                                    'BestConfigMetric': self.optimization.best_config_metric}

        # add json file of hyperpipe attributes
        try:
            json_transformer = JsonTransformer()
            json_transformer.to_json_file(self, self.output_settings.results_folder+"/hyperpipe_config.json")
        except:
            msg = "JsonTransformer was unable to create the .json file."
            logger.warning(msg)
            warnings.warn(msg)

    def _finalize_optimization(self):
        # ==================== EVALUATING RESULTS OF HYPERPARAMETER OPTIMIZATION ===============================
        # 1. computing average metrics
        # 2. finding overall best config
        # 3. training model with best config
        # 4. persisting best model
        logger.clean_info('')
        logger.stars()
        logger.photon_system_log("Finished all outer fold computations.")
        logger.info("Now analysing the final results...")

        # computer dummy metrics
        logger.info("Computing dummy metrics...")
        config_item = MDBConfig()
        dummy_results = [outer_fold.dummy_results for outer_fold in self.results.outer_folds]
        config_item.inner_folds = [f for f in dummy_results if f is not None]
        if len(config_item.inner_folds) > 0:
            self.results.dummy_estimator.metrics_train, self.results.dummy_estimator.metrics_test = \
                MDBHelper.aggregate_metrics_for_inner_folds(config_item.inner_folds, self.optimization.metrics)

        logger.info("Computing mean and std for all outer fold metrics...")
        # Compute all final metrics
        self.results.metrics_train, self.results.metrics_test = \
            MDBHelper.aggregate_metrics_for_outer_folds(self.results.outer_folds, self.optimization.metrics)

        # Find best config across outer folds
        logger.info("Find best config across outer folds...")
        best_config = self.optimization.get_optimum_config_outer_folds(self.results.outer_folds)
        self.best_config = best_config.config_dict
        self.results.best_config = best_config

        # save results again
        self.results.computation_end_time = datetime.datetime.now()
        self.results.computation_completed = True
        logger.info("Save final results...")
        self.results_handler.save()

        logger.info("Prepare Hyperpipe.optimum pipe with best config..")
        # set self to best config
        self.optimum_pipe = self._pipe
        self.optimum_pipe.set_params(**self.best_config)

        if self.output_settings.generate_best_model:
            logger.info("Fitting best model...")
            # set self to best config
            self.optimum_pipe = self._pipe
            self.optimum_pipe.set_params(**self.best_config)

            # set caching
            # we want caching disabled in general but still want to do single subject caching
            self.recursive_cache_folder_propagation(self.optimum_pipe, self.cache_folder, 'fixed_fold_id')
            self.optimum_pipe.caching = False

            # disable multiprocessing when fitting optimum pipe
            # (otherwise inverse_transform won't work for BrainAtlas/Mask)
            self.disable_multiprocessing_recursively(self.optimum_pipe)

            self.optimum_pipe.fit(self.data.X, self.data.y, **self.data.kwargs)

            # Before saving the optimum pipe, add preprocessing without multiprocessing
            self.disable_multiprocessing_recursively(self.preprocessing)
            self.optimum_pipe.add_preprocessing(self.preprocessing)

            # Now truly set to no caching (including single_subject_caching)
            self.recursive_cache_folder_propagation(self.optimum_pipe, None, None)

            if self.output_settings.save_output:
                try:
                    pretrained_model_filename = os.path.join(self.output_settings.results_folder,
                                                             'photon_best_model.photon')
                    PhotonModelPersistor.save_optimum_pipe(self.optimum_pipe, pretrained_model_filename)
                    logger.info("Saved best model to file.")
                except Exception as e:
                    logger.info("Could not save best model to file")
                    logger.error(str(e))

                # get feature importances of optimum pipe
                logger.info("Mapping back feature importances...")
                feature_importances = self.optimum_pipe.feature_importances_

                if not feature_importances:
                    logger.info("No feature importances available for {}!".format(self.optimum_pipe.elements[-1][0]))
                else:
                    self.results.best_config_feature_importances = feature_importances

                    # write backmapping file only if optimum_pipes inverse_transform works completely.
                    # restriction: only a faulty inverse_transform is considered, missing ones are further ignored.
                    with warnings.catch_warnings(record=True) as w:
                        # get backmapping
                        backmapping, _, _ = self.optimum_pipe.\
                            inverse_transform(np.array(feature_importances).reshape(1, -1), None)

                        if not any("The inverse transformation is not possible for" in s
                                   for s in [e.message.args[0] for e in w]):
                            # save backmapping
                            self.results_handler.save_backmapping(
                                filename='optimum_pipe_feature_importances_backmapped', backmapping=backmapping)
                        else:
                            logger.info('Could not save feature importance: backmapping NOT successful.')

                # save learning curves
                if self.cross_validation.learning_curves:
                    self.results_handler.save_all_learning_curves()

        logger.info("Summarizing results...")

        logger.info("Write predictions to files...")
        # write all convenience files (summary, predictions_file and plots)
        self.results_handler.write_predictions_file()

        logger.info("Write summary...")
        logger.stars()
        logger.photon_system_log("")
        logger.photon_system_log(self.results_handler.text_summary())

    def preprocess_data(self):
        # if there is a preprocessing pipeline, we apply it first.
        if self.preprocessing is not None:
            logger.info("Applying preprocessing steps...")
            self.preprocessing.fit(self.data.X, self.data.y, **self.data.kwargs)
            self.data.X, self.data.y, self.data.kwargs = self.preprocessing.transform(self.data.X, self.data.y,
                                                                                      **self.data.kwargs)

    def _prepare_pipeline(self):
        self._pipe = Branch.prepare_photon_pipe(self.elements)
        self._pipe = Branch.sanity_check_pipeline(self._pipe)
        if self.random_state:
            self._pipe.random_state = self.random_state

    # ===================================================================
    # sklearn interfaces
    # ===================================================================

    @staticmethod
    def fit_outer_folds(outer_fold_computer, X, y, kwargs):
        outer_fold_computer.fit(X, y, **kwargs)
        return

    def fit(self, data: np.ndarray, targets: np.ndarray, **kwargs):
        """
        Starts the hyperparameter search and/or fits the pipeline to the data and targets.

        Manages the nested cross validated hyperparameter search:

        1. Filters the data according to filter strategy (1) and according to the imbalanced_data_strategy (2)
        2. requests new configurations from the hyperparameter search strategy, the optimizer,
        3. initializes the testing of a specific configuration,
        4. communicates the result to the optimizer,
        5. repeats 2-4 until optimizer delivers no more configurations to test
        6. finally searches for the best config in all tested configs,
        7. trains the pipeline with the best config and evaluates the performance on the test set

        Parameters:
            data:
                The array-like training and test data with shape=[N, D],
                where N is the number of samples and D is the number of features.

            targets:
                The truth array-like values with shape=[N],
                where N is the number of samples.

            **kwargs:
                Keyword arguments, passed to Outer_Fold_Manager.fit.


        Returns:
            Fitted Hyperpipe.

        """
        # switch to result output folder
        start = datetime.datetime.now()
        self.output_settings.update_settings(self.name, start.strftime("%Y-%m-%d_%H-%M-%S"))

        logger.photon_system_log('=' * 101)
        logger.photon_system_log('PHOTONAI ANALYSIS: ' + self.name)
        logger.photon_system_log('=' * 101)
        logger.info("Preparing data and PHOTONAI objects for analysis...")

        # loop over outer cross validation
        if self.nr_of_processes > 1:
            hyperpipe_client = Client(threads_per_worker=1, n_workers=self.nr_of_processes, processes=False)

        try:
            # check data
            self.data.input_data_sanity_checks(data, targets, **kwargs)
            # create photon pipeline
            self._prepare_pipeline()
            # initialize the progress monitors
            self._prepare_result_logging(start)
            # apply preprocessing
            self.preprocess_data()

            if not self.is_final_fit:

                # Outer Folds
                outer_folds = FoldInfo.generate_folds(self.cross_validation.outer_cv,
                                                      self.data.X, self.data.y, self.data.kwargs,
                                                      self.cross_validation.use_test_set,
                                                      self.cross_validation.test_size)

                self.cross_validation.outer_folds = {f.fold_id: f for f in outer_folds}
                delayed_jobs = []

                # Run Dummy Estimator
                dummy_estimator = self._prepare_dummy_estimator()

                if self.cache_folder is not None:
                    logger.info("Removing cache files...")
                    CacheManager.clear_cache_files(self.cache_folder, force_all=True)

                # loop over outer cross validation
                for i, outer_f in enumerate(outer_folds):

                    # 1. generate OuterFolds Object
                    outer_fold = MDBOuterFold(fold_nr=outer_f.fold_nr)
                    outer_fold_computer = OuterFoldManager(self._pipe,
                                                           self.optimization,
                                                           outer_f.fold_id,
                                                           self.cross_validation,
                                                           cache_folder=self.cache_folder,
                                                           cache_updater=self.recursive_cache_folder_propagation,
                                                           dummy_estimator=dummy_estimator,
                                                           result_obj=outer_fold)
                    # 2. monitor outputs
                    self.results.outer_folds.append(outer_fold)

                    if self.nr_of_processes > 1:
                        result = dask.delayed(Hyperpipe.fit_outer_folds)(outer_fold_computer,
                                                                         self.data.X,
                                                                         self.data.y,
                                                                         self.data.kwargs)
                        delayed_jobs.append(result)
                    else:
                        try:
                            # 3. fit
                            outer_fold_computer.fit(self.data.X, self.data.y, **self.data.kwargs)
                            # 4. save outer fold results
                            self.results_handler.save()
                        finally:
                            # 5. clear cache
                            CacheManager.clear_cache_files(self.cache_folder)

                if self.nr_of_processes > 1:
                    dask.compute(*delayed_jobs)
                    self.results_handler.save()

                # evaluate hyperparameter optimization results for best config
                self._finalize_optimization()

                # clear complete cache ? use self.cache_folder to delete all subfolders within the parent cache folder
                # directory
                CacheManager.clear_cache_files(self.cache_folder, force_all=True)

            ###############################################################################################
            else:
                self.preprocess_data()
                self._pipe.fit(self.data.X, self.data.y, **kwargs)
        except Exception as e:
            logger.error(e)
            logger.error(traceback.format_exc())
            traceback.print_exc()
            raise e
        finally:
            if self.nr_of_processes > 1:
                hyperpipe_client.close()
        return self

    def predict(self, data: np.ndarray, **kwargs) -> np.ndarray:
        """
        Use the optimum pipe to predict the input data.

        Parameters:
            data:
                The array-like prediction data with shape=[M, D],
                where M is the number of samples and D is the number
                of features. D must correspond to the number
                of trained dimensions of the fit method.

            **kwargs:
                Keyword arguments, passed to optimum_pipe.predict.

        Returns:
            Predicted targets calculated on input data with trained model.

        """
        # Todo: if local_search = true then use optimized pipe here?
        if self._pipe:
            return self.optimum_pipe.predict(data, **kwargs)

    def predict_proba(self, data: np.ndarray, **kwargs) -> np.ndarray:
        """
        Use the optimum pipe to predict the probabilities from the input data.

        Parameters:
            data:
                The array-like prediction data with shape=[M, D],
                where M is the number of samples and D is the number
                of features. D must correspond to the number
                of trained dimensions of the fit method.

            **kwargs:
                Keyword arguments, passed to optimum_pipe.predict_proba.

        Returns:
            Probabilities calculated from input data on fitted model.


        """
        if self._pipe:
            return self.optimum_pipe.predict_proba(data, **kwargs)

    def transform(self, data: np.ndarray, **kwargs) -> np.ndarray:
        """
        Use the optimum pipe to transform the data.

        Parameters:
            data:
                The array-like input data with shape=[M, D],
                where M is the number of samples and D is the number
                of features. D must correspond to the number
                of trained dimensions of the fit method.

            **kwargs:
                Keyword arguments, passed to optimum_pipe.transform.

        Returns:
            Transformed data.

        """
        if self._pipe:
            X, _, _ = self.optimum_pipe.transform(data, y=None, **kwargs)
            return X

    def score(self, data: np.ndarray, y: np.ndarray, **kwargs) -> np.ndarray:
        """
        Use the optimum pipe to score the model.

        Parameters:
            data:
                The array-like data with shape=[M, D],
                where M is the number of samples and D is the number
                of features. D must correspond to the number
                of trained dimensions of the fit method.

            y:
                The array-like true targets.

            **kwargs:
                Keyword arguments, passed to optimum_pipe.predict.

        Returns:
            Score data on input data with trained model.

        """
        if self._pipe:
            predictions = self.optimum_pipe.predict(data, **kwargs)
            scorer = Scorer.create(self.optimization.best_config_metric)
            return scorer(y, predictions)

    def _calculate_permutation_importances(self, **kwargs):
        """
        extracted function from get_feature_importance to improve unit testing
        """

        importance_list = {'mean': list(), 'std': list()}

        def train_and_get_fimps(pipeline, train_idx, test_idx, data_X, data_y, data_kwargs, fold_str):

            train_X, train_y, train_kwargs = PhotonDataHelper.split_data(data_X, data_y, data_kwargs,
                                                                         indices=train_idx)

            test_X, test_y, test_kwargs = PhotonDataHelper.split_data(data_X, data_y, data_kwargs,
                                                                      indices=test_idx)

            # fit fold's best model (again) -> to obtain that model's feature importances
            logger.photon_system_log("Permutation Importances: Fitting model for " + fold_str)
            pipeline.fit(train_X, train_y, **train_kwargs)

            # get feature importances
            logger.photon_system_log("Permutation Importances: Calculating performances for " + fold_str)
            perm_imps = permutation_importance(pipeline, test_X, test_y, **kwargs)

            # store into list
            importance_list['mean'].append(perm_imps["importances_mean"])
            importance_list['std'].append(perm_imps["importances_std"])

            return perm_imps

        for outer_fold in self.results.outer_folds:

            if outer_fold.best_config is None:
                raise ValueError("Could not find a best config for outer fold " + str(outer_fold.fold_nr))

            pipe_copy = self.optimum_pipe.copy_me()

            # set pipe to config
            pipe_copy.set_params(**outer_fold.best_config.config_dict)

            if not self.results.hyperpipe_info.eval_final_performance:
                no_outer_cv_indices = False
                if outer_fold.best_config.best_config_score is None:
                    no_outer_cv_indices = True
                elif outer_fold.best_config.best_config_score.training is None or not outer_fold.best_config.best_config_score.training.indices:
                    no_outer_cv_indices = True

                if no_outer_cv_indices:
                    data_to_split, y_to_split, kwargs_to_split = self.data.X, self.data.y, self.data.kwargs
                else:

                    logger.photon_system_log("Permutation Importances: Using inner_cv folds.")

                    # get outer fold data
                    idx = outer_fold.best_config.best_config_score.training.indices
                    data_to_split, y_to_split, kwargs_to_split = PhotonDataHelper.split_data(self.data.X,
                                                                                             self.data.y,
                                                                                             self.data.kwargs,
                                                                                             indices=idx)

                for inner_fold in outer_fold.best_config.inner_folds:
                    train_and_get_fimps(pipe_copy,
                                        inner_fold.training.indices, inner_fold.validation.indices,
                                        data_to_split, y_to_split, kwargs_to_split,
                                        "inner fold " + str(inner_fold.fold_nr))

            else:
                train_and_get_fimps(pipe_copy,
                                    outer_fold.best_config.best_config_score.training.indices,
                                    outer_fold.best_config.best_config_score.validation.indices,
                                    self.data.X, self.data.y, self.data.kwargs, "outer fold " + str(outer_fold.fold_nr))

        return importance_list

    def get_permutation_feature_importances(self, **kwargs):
        """
        Fits a model for the best config of each outer fold (using the training data of that fold).
        Then calls sklearn.inspection.permutation_importance with the test data and the given kwargs (e.g. n_repeats).
        Returns mean of "importances_mean" and of "importances_std" of all outer folds.

        Parameters:
            **kwargs:
                Keyword arguments, passed to sklearn.permutation_importance.

        Returns:
            Dictionary with average of "mean" and "std" for all outer folds, respectively.

        """

        logger.photon_system_log("")
        logger.photon_system_log("Computing permutation importances. This may take a while.")
        logger.stars()
        if self.optimum_pipe is None:
            raise ValueError("Cannot calculate permutation importances when optimum_pipe is None (probably the "
                             "training and optimization procedure failed)")
        importance_list = self._calculate_permutation_importances(**kwargs)
        mean_importances = np.mean(np.array(importance_list["mean"]), axis=0)
        std_importances = np.mean(np.array(importance_list["std"]), axis=0)
        logger.stars()

        return {'mean': mean_importances, 'std': std_importances}


    def inverse_transform_pipeline(self, hyperparameters: dict,
                                   data: np.ndarray,
                                   targets: np.ndarray,
                                   data_to_inverse: np.ndarray) -> np.ndarray:
        """
        Inverse transform data for a pipeline with specific hyperparameter configuration.

        1. Copy Sklearn Pipeline,
        2. Set Parameters
        3. Fit Pipeline to data and targets
        4. Inverse transform data with that pipeline

        Parameters:
            hyperparameters:
                The concrete configuration settings for the pipeline elements.

            data:
                The training data to which the pipeline is fitted.

            targets:
                The truth values for training.

            data_to_inverse:
                The data that should be inversed after training.

        Returns:
            Inverse data as array.

        """
        copied_pipe = self.pipe.copy_me()
        copied_pipe.set_params(**hyperparameters)
        copied_pipe.fit(data, targets)
        return copied_pipe.inverse_transform(data_to_inverse)

    # ===================================================================
    # Copy, Save and Load
    # ===================================================================

    def copy_me(self):
        """
        Helper function to copy an entire Hyperpipe

        Returns:
            Hyperpipe

        """
        signature = inspect.getfullargspec(OutputSettings.__init__)[0]
        settings = OutputSettings()
        for attr in signature:
            if hasattr(self.output_settings, attr):
                setattr(settings, attr, getattr(self.output_settings, attr))
        self.output_settings.initialize_log_file()

        # create new Hyperpipe instance
        pipe_copy = Hyperpipe(name=self.name,
                              inner_cv=deepcopy(self.cross_validation.inner_cv),
                              outer_cv=deepcopy(self.cross_validation.outer_cv),
                              best_config_metric=self.optimization.best_config_metric,
                              metrics=self.optimization.metrics,
                              optimizer=self.optimization.optimizer_input_str,
                              optimizer_params=self.optimization.optimizer_params,
                              project_folder=self.project_folder,
                              output_settings=settings)

        signature = inspect.getfullargspec(self.__init__)[0]
        for attr in signature:
            if hasattr(self, attr) and attr != 'output_settings':
                setattr(pipe_copy, attr, getattr(self, attr))

        if hasattr(self, 'preprocessing') and self.preprocessing:
            preprocessing = Preprocessing()
            for element in self.preprocessing.elements:
                preprocessing += element.copy_me()
            pipe_copy += preprocessing
        if hasattr(self, 'elements'):
            for element in self.elements:
                pipe_copy += element.copy_me()
        return pipe_copy

    def save_optimum_pipe(self, filename=None, password=None):
        if filename is None:
            filename = "photon_" + self.name + "_best_model.p"
        PhotonModelPersistor.save_optimum_pipe(self, filename, password)

    @staticmethod
    def load_optimum_pipe(file: str, password: str = None) -> PhotonPipeline:
        """
        Load optimum pipe from file.
        As staticmethod, instantiation is thus not required.
        Called backend: PhotonModelPersistor.load_optimum_pipe.

        Parameters:
            file:
                File path specifying .photon file to load
                trained pipeline from zipped file.

            password:
                Passcode for read file.

        Returns:
            Returns pipeline with all trained PipelineElements.

        """
        return PhotonModelPersistor.load_optimum_pipe(file, password)

    @staticmethod
    def reload_hyperpipe(results_folder, X, y, **data_kwargs):

        res_handler = ResultsHandler()
        res_handler.load_from_file(os.path.join(results_folder, "photon_result_file.json"))
        loaded_optimum_pipe = Hyperpipe.load_optimum_pipe(os.path.join(results_folder, "photon_best_model.photon"))

        new_hyperpipe = JsonTransformer().from_json_file(os.path.join(results_folder, "hyperpipe_config.json"))
        new_hyperpipe.results = res_handler.results
        new_hyperpipe.optimum_pipe = loaded_optimum_pipe
        new_hyperpipe.data = Hyperpipe.Data(X, y, data_kwargs)

        return new_hyperpipe

    def __repr__(self, **kwargs):
        """Overwrite BaseEstimator's function to avoid errors when using Jupyter Notebooks."""
        return "Hyperpipe(name='{}')".format(self.name)

__init__(self, name, inner_cv=None, outer_cv=None, optimizer='grid_search', optimizer_params=None, metrics=None, best_config_metric=None, eval_final_performance=None, use_test_set=True, test_size=0.2, project_folder='', calculate_metrics_per_fold=True, calculate_metrics_across_folds=False, random_seed=None, verbosity=0, learning_curves=False, learning_curves_cut=None, output_settings=None, performance_constraints=None, permutation_id=None, cache_folder=None, nr_of_processes=1, allow_multidim_targets=False) special

Initialize the object.

Parameters:

Name Type Description Default
name Optional[str]

Name of hyperpipe instance.

required
inner_cv Union[sklearn.model_selection._split.BaseCrossValidator, sklearn.model_selection._split.BaseShuffleSplit, sklearn.model_selection._split._RepeatedSplits]

Cross validation strategy to test hyperparameter configurations, generates the validation set.

None
outer_cv Union[sklearn.model_selection._split.BaseCrossValidator, sklearn.model_selection._split.BaseShuffleSplit, sklearn.model_selection._split._RepeatedSplits]

Cross validation strategy to use for the hyperparameter search itself, generates the test set.

None
optimizer str

Hyperparameter optimization algorithm.

  • In case a string literal is given:

    • "grid_search": Optimizer that iteratively tests all possible hyperparameter combinations.
    • "random_grid_search": A variation of the grid search optimization that randomly picks hyperparameter combinations from all possible hyperparameter combinations.
    • "sk_opt": Scikit-Optimize based on theories of bayesian optimization.
    • "random_search": randomly chooses hyperparameter from grid-free domain.
    • "smac": SMAC based on theories of bayesian optimization.
    • "nevergrad": Nevergrad based on theories of evolutionary learning.
  • In case an object is given: expects the object to have the following methods:

    • ask: returns a hyperparameter configuration in form of an dictionary containing key->value pairs in the sklearn parameter encoding model_name__parameter_name: parameter_value
    • prepare: takes a list of pipeline elements and their particular hyperparameters to prepare the hyperparameter space
    • tell: gets a tested config and the respective performance in order to calculate a smart next configuration to process
'grid_search'
metrics Optional[List[Union[Callable, keras.metrics.Metric, Type[keras.metrics.Metric], str]]]

Metrics that should be calculated for both training, validation and test set Use the preimported metrics from sklearn and photonai, or register your own

  • Metrics for classification:
    • accuracy: sklearn.metrics.accuracy_score
    • matthews_corrcoef: sklearn.metrics.matthews_corrcoef
    • confusion_matrix: sklearn.metrics.confusion_matrix,
    • f1_score: sklearn.metrics.f1_score
    • hamming_loss: sklearn.metrics.hamming_loss
    • log_loss: sklearn.metrics.log_loss
    • precision: sklearn.metrics.precision_score
    • recall: sklearn.metrics.recall_score
  • Metrics for regression:
    • mean_squared_error: sklearn.metrics.mean_squared_error
    • mean_absolute_error: sklearn.metrics.mean_absolute_error
    • explained_variance: sklearn.metrics.explained_variance_score
    • r2: sklearn.metrics.r2_score
  • Other metrics
    • pearson_correlation: photon_core.framework.Metrics.pearson_correlation
    • variance_explained: photon_core.framework.Metrics.variance_explained_score
    • categorical_accuracy: photon_core.framework.Metrics.categorical_accuracy_score
None
best_config_metric Union[Callable, keras.metrics.Metric, Type[keras.metrics.Metric], str]

The metric that should be maximized or minimized in order to choose the best hyperparameter configuration.

None
eval_final_performance bool

DEPRECATED! Use "use_test_set" instead!

None
use_test_set bool

If the metrics should be calculated for the test set, otherwise the test set is seperated but not used.

True
project_folder str

The output folder in which all files generated by the PHOTONAI project are saved to.

''
test_size float

The amount of the data that should be left out if no outer_cv is given and eval_final_performance is set to True.

0.2
calculate_metrics_per_fold bool

If True, the metrics are calculated for each inner_fold. If False, calculate_metrics_across_folds must be True.

True
calculate_metrics_across_folds bool

If True, the metrics are calculated across all inner_fold. If False, calculate_metrics_per_fold must be True.

False
random_seed int

Random Seed.

None
verbosity int

The level of verbosity, 0 is least talkative and gives only warn and error, 1 gives adds info and 2 adds debug.

0
learning_curves bool

Enables learning curve procedure. Evaluate learning process over different sizes of input. Depends on learning_curves_cut.

False
learning_curves_cut FloatRange

The tested relative cuts for data size.

None
performance_constraints list

Objects that indicate whether a configuration should be tested further. For example, the inner fold of a config does not perform better than the dummy performance.

None
permutation_id str

String identifier for permutation tests.

None
cache_folder str

Folder path for multi-processing.

None
nr_of_processes int

Determined the amount of simultaneous calculation of outer_folds.

1
allow_multidim_targets bool

Allows multidimensional targets.

False
Source code in photonai/base/hyperpipe.py
def __init__(self, name: Optional[str],
             inner_cv: Union[BaseCrossValidator, BaseShuffleSplit, _RepeatedSplits] = None,
             outer_cv: Union[BaseCrossValidator, BaseShuffleSplit, _RepeatedSplits, None] = None,
             optimizer: str = 'grid_search',
             optimizer_params: dict = None,
             metrics: Optional[List[Union[Scorer.Metric_Type, str]]] = None,
             best_config_metric: Optional[Union[Scorer.Metric_Type, str]] = None,
             eval_final_performance: bool = None,
             use_test_set: bool = True,
             test_size: float = 0.2,
             project_folder: str = '',
             calculate_metrics_per_fold: bool = True,
             calculate_metrics_across_folds: bool = False,
             random_seed: int = None,
             verbosity: int = 0,
             learning_curves: bool = False,
             learning_curves_cut: FloatRange = None,
             output_settings: OutputSettings = None,
             performance_constraints: list = None,
             permutation_id: str = None,
             cache_folder: str = None,
             nr_of_processes: int = 1,
             allow_multidim_targets: bool = False):
    """
    Initialize the object.

    Parameters:
        name:
            Name of hyperpipe instance.

        inner_cv:
            Cross validation strategy to test hyperparameter configurations, generates the validation set.

        outer_cv:
            Cross validation strategy to use for the hyperparameter search itself, generates the test set.

        optimizer:
            Hyperparameter optimization algorithm.

            - In case a string literal is given:
                - "grid_search": Optimizer that iteratively tests all possible hyperparameter combinations.
                - "random_grid_search": A variation of the grid search optimization that randomly picks
                    hyperparameter combinations from all possible hyperparameter combinations.
                - "sk_opt": Scikit-Optimize based on theories of bayesian optimization.
                - "random_search": randomly chooses hyperparameter from grid-free domain.
                - "smac": SMAC based on theories of bayesian optimization.
                - "nevergrad": Nevergrad based on theories of evolutionary learning.

            - In case an object is given:
                expects the object to have the following methods:
                - `ask`: returns a hyperparameter configuration in form of an dictionary containing
                    key->value pairs in the sklearn parameter encoding `model_name__parameter_name: parameter_value`
                - `prepare`: takes a list of pipeline elements and their particular hyperparameters to prepare the
                             hyperparameter space
                - `tell`: gets a tested config and the respective performance in order to
                    calculate a smart next configuration to process

        metrics:
            Metrics that should be calculated for both training, validation and test set
            Use the preimported metrics from sklearn and photonai, or register your own

            - Metrics for `classification`:
                - `accuracy`: sklearn.metrics.accuracy_score
                - `matthews_corrcoef`: sklearn.metrics.matthews_corrcoef
                - `confusion_matrix`: sklearn.metrics.confusion_matrix,
                - `f1_score`: sklearn.metrics.f1_score
                - `hamming_loss`: sklearn.metrics.hamming_loss
                - `log_loss`: sklearn.metrics.log_loss
                - `precision`: sklearn.metrics.precision_score
                - `recall`: sklearn.metrics.recall_score
            - Metrics for `regression`:
                - `mean_squared_error`: sklearn.metrics.mean_squared_error
                - `mean_absolute_error`: sklearn.metrics.mean_absolute_error
                - `explained_variance`: sklearn.metrics.explained_variance_score
                - `r2`: sklearn.metrics.r2_score
            - Other metrics
                - `pearson_correlation`: photon_core.framework.Metrics.pearson_correlation
                - `variance_explained`:  photon_core.framework.Metrics.variance_explained_score
                - `categorical_accuracy`: photon_core.framework.Metrics.categorical_accuracy_score

        best_config_metric:
            The metric that should be maximized or minimized in order to choose
            the best hyperparameter configuration.

        eval_final_performance:
            DEPRECATED! Use "use_test_set" instead!

        use_test_set:
            If the metrics should be calculated for the test set,
            otherwise the test set is seperated but not used.

        project_folder:
            The output folder in which all files generated by the
            PHOTONAI project are saved to.

        test_size:
            The amount of the data that should be left out if no outer_cv is given and
            eval_final_performance is set to True.

        calculate_metrics_per_fold:
            If True, the metrics are calculated for each inner_fold.
            If False, calculate_metrics_across_folds must be True.

        calculate_metrics_across_folds:
            If True, the metrics are calculated across all inner_fold.
            If False, calculate_metrics_per_fold must be True.

        random_seed:
            Random Seed.

        verbosity:
            The level of verbosity, 0 is least talkative and
            gives only warn and error, 1 gives adds info and 2 adds debug.

        learning_curves:
            Enables learning curve procedure. Evaluate learning process over
            different sizes of input. Depends on learning_curves_cut.

        learning_curves_cut:
            The tested relative cuts for data size.

        performance_constraints:
            Objects that indicate whether a configuration should
            be tested further. For example, the inner fold of a config
            does not perform better than the dummy performance.

        permutation_id:
            String identifier for permutation tests.

        cache_folder:
            Folder path for multi-processing.

        nr_of_processes:
            Determined the amount of simultaneous calculation of outer_folds.

        allow_multidim_targets:
            Allows multidimensional targets.

    """

    self.name = re.sub(r'\W+', '', name)

    if eval_final_performance is not None:
        depr_warning = "Hyperpipe parameter eval_final_performance is deprecated. It's called use_test_set now."
        use_test_set = eval_final_performance
        logger.warning(depr_warning)
        raise DeprecationWarning(depr_warning)

    # ====================== Cross Validation ===========================
    # check if both calculate_metrics_per_folds and calculate_metrics_across_folds is False
    if not calculate_metrics_across_folds and not calculate_metrics_per_fold:
        raise NotImplementedError("Apparently, you've set calculate_metrics_across_folds=False and "
                                  "calculate_metrics_per_fold=False. In this case PHOTONAI does not calculate "
                                  "any metrics which doesn't make any sense. Set at least one to True.")
    if inner_cv is None:
        msg = "PHOTONAI requires an inner_cv split. Please enable inner cross-validation. " \
              "As exmaple: Hyperpipe(...inner_cv = KFold(n_splits = 3), ...). " \
              "Ensure you import the cross_validation object first."
        logger.error(msg)
        raise AttributeError(msg)

    # use default cut 'FloatRange(0, 1, 'range', 0.2)' if learning_curves = True but learning_curves_cut is None
    if learning_curves and learning_curves_cut is None:
        learning_curves_cut = FloatRange(0, 1, 'range', 0.2)
    elif not learning_curves and learning_curves_cut is not None:
        learning_curves_cut = None

    self.cross_validation = Hyperpipe.CrossValidation(inner_cv=inner_cv,
                                                      outer_cv=outer_cv,
                                                      use_test_set=use_test_set,
                                                      test_size=test_size,
                                                      calculate_metrics_per_fold=calculate_metrics_per_fold,
                                                      calculate_metrics_across_folds=calculate_metrics_across_folds,
                                                      learning_curves=learning_curves,
                                                      learning_curves_cut=learning_curves_cut)

    # ====================== Data ===========================
    self.data = Hyperpipe.Data(allow_multidim_targets=allow_multidim_targets)

    # ====================== Output Folder and Log File Management ===========================
    if output_settings:
        self.output_settings = output_settings
    else:
        self.output_settings = OutputSettings()

    if project_folder == '':
        self.project_folder = os.getcwd()
    else:
        self.project_folder = project_folder

    self.output_settings.set_project_folder(self.project_folder)

    # update output options to add pipe name and timestamp to results folder
    self._verbosity = 0
    self.verbosity = verbosity
    self.output_settings.set_log_file()

    # ====================== Result Logging ===========================
    self.results_handler = None
    self.results = None
    self.best_config = None

    # ====================== Pipeline ===========================
    self.elements = []
    self._pipe = None
    self.optimum_pipe = None
    self.preprocessing = None

    # ====================== Performance Optimization ===========================
    if optimizer_params is None:
        optimizer_params = {}
    self.optimization = Optimization(metrics=metrics,
                                     best_config_metric=best_config_metric,
                                     optimizer_input=optimizer,
                                     optimizer_params=optimizer_params,
                                     performance_constraints=performance_constraints)

    # self.optimization.sanity_check_metrics()

    # ====================== Caching and Parallelization ===========================
    self.nr_of_processes = nr_of_processes
    if cache_folder:
        self.cache_folder = os.path.join(cache_folder, self.name)
    else:
        self.cache_folder = None

    # ====================== Internals ===========================

    self.permutation_id = permutation_id
    self.allow_multidim_targets = allow_multidim_targets
    self.is_final_fit = False

    # ====================== Random Seed ===========================
    self.random_state = random_seed
    if random_seed is not None:
        import random
        random.seed(random_seed)

add(self, pipe_element)

Add an element to the machine learning pipeline. Returns self.

Parameters:

Name Type Description Default
pipe_element PipelineElement

The object to add to the machine learning pipeline, being either a transformer or an estimator.

required
Source code in photonai/base/hyperpipe.py
def add(self, pipe_element: PipelineElement):
    """
    Add an element to the machine learning pipeline.
    Returns self.

    Parameters:
        pipe_element:
            The object to add to the machine learning pipeline,
            being either a transformer or an estimator.

    """
    self.__iadd__(pipe_element)

copy_me(self)

Helper function to copy an entire Hyperpipe

Returns:

Type Description

Hyperpipe

Source code in photonai/base/hyperpipe.py
def copy_me(self):
    """
    Helper function to copy an entire Hyperpipe

    Returns:
        Hyperpipe

    """
    signature = inspect.getfullargspec(OutputSettings.__init__)[0]
    settings = OutputSettings()
    for attr in signature:
        if hasattr(self.output_settings, attr):
            setattr(settings, attr, getattr(self.output_settings, attr))
    self.output_settings.initialize_log_file()

    # create new Hyperpipe instance
    pipe_copy = Hyperpipe(name=self.name,
                          inner_cv=deepcopy(self.cross_validation.inner_cv),
                          outer_cv=deepcopy(self.cross_validation.outer_cv),
                          best_config_metric=self.optimization.best_config_metric,
                          metrics=self.optimization.metrics,
                          optimizer=self.optimization.optimizer_input_str,
                          optimizer_params=self.optimization.optimizer_params,
                          project_folder=self.project_folder,
                          output_settings=settings)

    signature = inspect.getfullargspec(self.__init__)[0]
    for attr in signature:
        if hasattr(self, attr) and attr != 'output_settings':
            setattr(pipe_copy, attr, getattr(self, attr))

    if hasattr(self, 'preprocessing') and self.preprocessing:
        preprocessing = Preprocessing()
        for element in self.preprocessing.elements:
            preprocessing += element.copy_me()
        pipe_copy += preprocessing
    if hasattr(self, 'elements'):
        for element in self.elements:
            pipe_copy += element.copy_me()
    return pipe_copy

fit(self, data, targets, **kwargs)

Starts the hyperparameter search and/or fits the pipeline to the data and targets.

Manages the nested cross validated hyperparameter search:

  1. Filters the data according to filter strategy (1) and according to the imbalanced_data_strategy (2)
  2. requests new configurations from the hyperparameter search strategy, the optimizer,
  3. initializes the testing of a specific configuration,
  4. communicates the result to the optimizer,
  5. repeats 2-4 until optimizer delivers no more configurations to test
  6. finally searches for the best config in all tested configs,
  7. trains the pipeline with the best config and evaluates the performance on the test set

Parameters:

Name Type Description Default
data ndarray

The array-like training and test data with shape=[N, D], where N is the number of samples and D is the number of features.

required
targets ndarray

The truth array-like values with shape=[N], where N is the number of samples.

required
**kwargs

Keyword arguments, passed to Outer_Fold_Manager.fit.

{}

Returns:

Type Description

Fitted Hyperpipe.

Source code in photonai/base/hyperpipe.py
def fit(self, data: np.ndarray, targets: np.ndarray, **kwargs):
    """
    Starts the hyperparameter search and/or fits the pipeline to the data and targets.

    Manages the nested cross validated hyperparameter search:

    1. Filters the data according to filter strategy (1) and according to the imbalanced_data_strategy (2)
    2. requests new configurations from the hyperparameter search strategy, the optimizer,
    3. initializes the testing of a specific configuration,
    4. communicates the result to the optimizer,
    5. repeats 2-4 until optimizer delivers no more configurations to test
    6. finally searches for the best config in all tested configs,
    7. trains the pipeline with the best config and evaluates the performance on the test set

    Parameters:
        data:
            The array-like training and test data with shape=[N, D],
            where N is the number of samples and D is the number of features.

        targets:
            The truth array-like values with shape=[N],
            where N is the number of samples.

        **kwargs:
            Keyword arguments, passed to Outer_Fold_Manager.fit.


    Returns:
        Fitted Hyperpipe.

    """
    # switch to result output folder
    start = datetime.datetime.now()
    self.output_settings.update_settings(self.name, start.strftime("%Y-%m-%d_%H-%M-%S"))

    logger.photon_system_log('=' * 101)
    logger.photon_system_log('PHOTONAI ANALYSIS: ' + self.name)
    logger.photon_system_log('=' * 101)
    logger.info("Preparing data and PHOTONAI objects for analysis...")

    # loop over outer cross validation
    if self.nr_of_processes > 1:
        hyperpipe_client = Client(threads_per_worker=1, n_workers=self.nr_of_processes, processes=False)

    try:
        # check data
        self.data.input_data_sanity_checks(data, targets, **kwargs)
        # create photon pipeline
        self._prepare_pipeline()
        # initialize the progress monitors
        self._prepare_result_logging(start)
        # apply preprocessing
        self.preprocess_data()

        if not self.is_final_fit:

            # Outer Folds
            outer_folds = FoldInfo.generate_folds(self.cross_validation.outer_cv,
                                                  self.data.X, self.data.y, self.data.kwargs,
                                                  self.cross_validation.use_test_set,
                                                  self.cross_validation.test_size)

            self.cross_validation.outer_folds = {f.fold_id: f for f in outer_folds}
            delayed_jobs = []

            # Run Dummy Estimator
            dummy_estimator = self._prepare_dummy_estimator()

            if self.cache_folder is not None:
                logger.info("Removing cache files...")
                CacheManager.clear_cache_files(self.cache_folder, force_all=True)

            # loop over outer cross validation
            for i, outer_f in enumerate(outer_folds):

                # 1. generate OuterFolds Object
                outer_fold = MDBOuterFold(fold_nr=outer_f.fold_nr)
                outer_fold_computer = OuterFoldManager(self._pipe,
                                                       self.optimization,
                                                       outer_f.fold_id,
                                                       self.cross_validation,
                                                       cache_folder=self.cache_folder,
                                                       cache_updater=self.recursive_cache_folder_propagation,
                                                       dummy_estimator=dummy_estimator,
                                                       result_obj=outer_fold)
                # 2. monitor outputs
                self.results.outer_folds.append(outer_fold)

                if self.nr_of_processes > 1:
                    result = dask.delayed(Hyperpipe.fit_outer_folds)(outer_fold_computer,
                                                                     self.data.X,
                                                                     self.data.y,
                                                                     self.data.kwargs)
                    delayed_jobs.append(result)
                else:
                    try:
                        # 3. fit
                        outer_fold_computer.fit(self.data.X, self.data.y, **self.data.kwargs)
                        # 4. save outer fold results
                        self.results_handler.save()
                    finally:
                        # 5. clear cache
                        CacheManager.clear_cache_files(self.cache_folder)

            if self.nr_of_processes > 1:
                dask.compute(*delayed_jobs)
                self.results_handler.save()

            # evaluate hyperparameter optimization results for best config
            self._finalize_optimization()

            # clear complete cache ? use self.cache_folder to delete all subfolders within the parent cache folder
            # directory
            CacheManager.clear_cache_files(self.cache_folder, force_all=True)

        ###############################################################################################
        else:
            self.preprocess_data()
            self._pipe.fit(self.data.X, self.data.y, **kwargs)
    except Exception as e:
        logger.error(e)
        logger.error(traceback.format_exc())
        traceback.print_exc()
        raise e
    finally:
        if self.nr_of_processes > 1:
            hyperpipe_client.close()
    return self

get_permutation_feature_importances(self, **kwargs)

Fits a model for the best config of each outer fold (using the training data of that fold). Then calls sklearn.inspection.permutation_importance with the test data and the given kwargs (e.g. n_repeats). Returns mean of "importances_mean" and of "importances_std" of all outer folds.

Parameters:

Name Type Description Default
**kwargs

Keyword arguments, passed to sklearn.permutation_importance.

{}

Returns:

Type Description

Dictionary with average of "mean" and "std" for all outer folds, respectively.

Source code in photonai/base/hyperpipe.py
def get_permutation_feature_importances(self, **kwargs):
    """
    Fits a model for the best config of each outer fold (using the training data of that fold).
    Then calls sklearn.inspection.permutation_importance with the test data and the given kwargs (e.g. n_repeats).
    Returns mean of "importances_mean" and of "importances_std" of all outer folds.

    Parameters:
        **kwargs:
            Keyword arguments, passed to sklearn.permutation_importance.

    Returns:
        Dictionary with average of "mean" and "std" for all outer folds, respectively.

    """

    logger.photon_system_log("")
    logger.photon_system_log("Computing permutation importances. This may take a while.")
    logger.stars()
    if self.optimum_pipe is None:
        raise ValueError("Cannot calculate permutation importances when optimum_pipe is None (probably the "
                         "training and optimization procedure failed)")
    importance_list = self._calculate_permutation_importances(**kwargs)
    mean_importances = np.mean(np.array(importance_list["mean"]), axis=0)
    std_importances = np.mean(np.array(importance_list["std"]), axis=0)
    logger.stars()

    return {'mean': mean_importances, 'std': std_importances}

inverse_transform_pipeline(self, hyperparameters, data, targets, data_to_inverse)

Inverse transform data for a pipeline with specific hyperparameter configuration.

  1. Copy Sklearn Pipeline,
  2. Set Parameters
  3. Fit Pipeline to data and targets
  4. Inverse transform data with that pipeline

Parameters:

Name Type Description Default
hyperparameters dict

The concrete configuration settings for the pipeline elements.

required
data ndarray

The training data to which the pipeline is fitted.

required
targets ndarray

The truth values for training.

required
data_to_inverse ndarray

The data that should be inversed after training.

required

Returns:

Type Description
ndarray

Inverse data as array.

Source code in photonai/base/hyperpipe.py
def inverse_transform_pipeline(self, hyperparameters: dict,
                               data: np.ndarray,
                               targets: np.ndarray,
                               data_to_inverse: np.ndarray) -> np.ndarray:
    """
    Inverse transform data for a pipeline with specific hyperparameter configuration.

    1. Copy Sklearn Pipeline,
    2. Set Parameters
    3. Fit Pipeline to data and targets
    4. Inverse transform data with that pipeline

    Parameters:
        hyperparameters:
            The concrete configuration settings for the pipeline elements.

        data:
            The training data to which the pipeline is fitted.

        targets:
            The truth values for training.

        data_to_inverse:
            The data that should be inversed after training.

    Returns:
        Inverse data as array.

    """
    copied_pipe = self.pipe.copy_me()
    copied_pipe.set_params(**hyperparameters)
    copied_pipe.fit(data, targets)
    return copied_pipe.inverse_transform(data_to_inverse)

load_optimum_pipe(file, password=None) staticmethod

Load optimum pipe from file. As staticmethod, instantiation is thus not required. Called backend: PhotonModelPersistor.load_optimum_pipe.

Parameters:

Name Type Description Default
file str

File path specifying .photon file to load trained pipeline from zipped file.

required
password str

Passcode for read file.

None

Returns:

Type Description
PhotonPipeline

Returns pipeline with all trained PipelineElements.

Source code in photonai/base/hyperpipe.py
@staticmethod
def load_optimum_pipe(file: str, password: str = None) -> PhotonPipeline:
    """
    Load optimum pipe from file.
    As staticmethod, instantiation is thus not required.
    Called backend: PhotonModelPersistor.load_optimum_pipe.

    Parameters:
        file:
            File path specifying .photon file to load
            trained pipeline from zipped file.

        password:
            Passcode for read file.

    Returns:
        Returns pipeline with all trained PipelineElements.

    """
    return PhotonModelPersistor.load_optimum_pipe(file, password)

predict(self, data, **kwargs)

Use the optimum pipe to predict the input data.

Parameters:

Name Type Description Default
data ndarray

The array-like prediction data with shape=[M, D], where M is the number of samples and D is the number of features. D must correspond to the number of trained dimensions of the fit method.

required
**kwargs

Keyword arguments, passed to optimum_pipe.predict.

{}

Returns:

Type Description
ndarray

Predicted targets calculated on input data with trained model.

Source code in photonai/base/hyperpipe.py
def predict(self, data: np.ndarray, **kwargs) -> np.ndarray:
    """
    Use the optimum pipe to predict the input data.

    Parameters:
        data:
            The array-like prediction data with shape=[M, D],
            where M is the number of samples and D is the number
            of features. D must correspond to the number
            of trained dimensions of the fit method.

        **kwargs:
            Keyword arguments, passed to optimum_pipe.predict.

    Returns:
        Predicted targets calculated on input data with trained model.

    """
    # Todo: if local_search = true then use optimized pipe here?
    if self._pipe:
        return self.optimum_pipe.predict(data, **kwargs)

predict_proba(self, data, **kwargs)

Use the optimum pipe to predict the probabilities from the input data.

Parameters:

Name Type Description Default
data ndarray

The array-like prediction data with shape=[M, D], where M is the number of samples and D is the number of features. D must correspond to the number of trained dimensions of the fit method.

required
**kwargs

Keyword arguments, passed to optimum_pipe.predict_proba.

{}

Returns:

Type Description
ndarray

Probabilities calculated from input data on fitted model.

Source code in photonai/base/hyperpipe.py
def predict_proba(self, data: np.ndarray, **kwargs) -> np.ndarray:
    """
    Use the optimum pipe to predict the probabilities from the input data.

    Parameters:
        data:
            The array-like prediction data with shape=[M, D],
            where M is the number of samples and D is the number
            of features. D must correspond to the number
            of trained dimensions of the fit method.

        **kwargs:
            Keyword arguments, passed to optimum_pipe.predict_proba.

    Returns:
        Probabilities calculated from input data on fitted model.


    """
    if self._pipe:
        return self.optimum_pipe.predict_proba(data, **kwargs)