Documentation for PipelineElement
PHOTONAI wrapper class for any transformer or estimator in the pipeline.
So called PHOTONAI PipelineElements can be added to the Hyperpipe, each of them being a data-processing method or a learning algorithm. By choosing, combining data-processing methods and algorithms, and arranging them with the PHOTONAI classes, simple and complex pipeline architectures can be designed rapidly.
The PHOTONAI PipelineElement implements several helpful features:
- Saves the hyperparameters that should be tested and creates a grid of all hyperparameter configurations.
- Enables fast and rapid instantiation of pipeline elements per string identifier, e.g 'svc' creates an sklearn.svm.SVC object.
- Attaches a "disable" switch to every element in the pipeline in order to test a complete disable.
Source code in photonai/base/
class PipelineElement(BaseEstimator):
PHOTONAI wrapper class for any transformer or estimator in the pipeline.
So called PHOTONAI PipelineElements can be added to the Hyperpipe,
each of them being a data-processing method or a learning algorithm.
By choosing, combining data-processing methods and algorithms,
and arranging them with the PHOTONAI classes, simple and complex
pipeline architectures can be designed rapidly.
The PHOTONAI PipelineElement implements several helpful features:
- Saves the hyperparameters that should be tested
and creates a grid of all hyperparameter configurations.
- Enables fast and rapid instantiation of pipeline
elements per string identifier, e.g 'svc' creates
an sklearn.svm.SVC object.
- Attaches a "disable" switch to every element
in the pipeline in order to test a complete disable.
def __init__(self, name: str, hyperparameters: dict = None, test_disabled: bool = False,
disabled: bool = False, base_element: BaseEstimator = None, batch_size: int = 0, **kwargs) -> None:
Takes a string literal and transforms it into an object
of the associated class (see PhotonCore.JSON).
A string literal encoding the class to be instantiated.
Which values/value range should be tested for the
In form of Dict: parameter_name -> HyperparameterElement.
If the hyperparameter search should evaluate a
complete disabling of the element.
If true, the element is currently disabled and
does nothing except return the data it received.
The underlying BaseEstimator. If not given the
instantiation per string identifier takes place.
Size of the division on which is calculated separately.
Any parameters that should be passed to the object
to be instantiated, default parameters.
if hyperparameters is None:
hyperparameters = {}
if base_element is None:
# Registering Pipeline Elements
if len(PhotonRegistry.ELEMENT_DICTIONARY) == 0:
registry = PhotonRegistry
if name not in PhotonRegistry.ELEMENT_DICTIONARY:
# try to reload
PhotonRegistry.ELEMENT_DICTIONARY = PhotonRegistry().get_package_info()
if name in PhotonRegistry.ELEMENT_DICTIONARY:
desired_class_info = PhotonRegistry.ELEMENT_DICTIONARY[name]
desired_class_home = desired_class_info[0]
desired_class_name = desired_class_info[1]
imported_module = importlib.import_module(desired_class_home)
desired_class = getattr(imported_module, desired_class_name)
self.base_element = desired_class(**kwargs)
except AttributeError as ae:
logger.error('ValueError: Could not find according class:'
+ str(PhotonRegistry.ELEMENT_DICTIONARY[name]))
raise ValueError('Could not find according class:', PhotonRegistry.ELEMENT_DICTIONARY[name])
# if even after reload the element does not appear, it is not supported
logger.error('Element not supported right now:' + name)
raise NameError('Element not supported right now:', name)
self.base_element = base_element
self.is_transformer = hasattr(self.base_element, "transform")
self.reduce_dimension = False # boolean - set on transform method
self.is_estimator = hasattr(self.base_element, "predict")
self._name = name
self.initial_name = str(name)
self.kwargs = kwargs
self.current_config = None
self.batch_size = batch_size
self.test_disabled = test_disabled
self.initial_hyperparameters = dict(hyperparameters)
self._sklearn_disabled = + '__disabled'
self._hyperparameters = hyperparameters
if len(hyperparameters) > 0:
key_0 = next(iter(hyperparameters))
if not in key_0:
self.hyperparameters = hyperparameters
self.hyperparameters = hyperparameters
# self.initalize_hyperparameters = hyperparameters
# check if hyperparameters are already in sklearn style
# check if hyperparameters are members of the class
if self.is_transformer or self.is_estimator:
self.disabled = disabled
# check if self.base element needs y for fitting and transforming
if hasattr(self.base_element, 'needs_y'):
self.needs_y = self.base_element.needs_y
self.needs_y = False
# or if it maybe needs covariates for fitting and transforming
if hasattr(self.base_element, 'needs_covariates'):
self.needs_covariates = self.base_element.needs_covariates
self.needs_covariates = False
self._random_state = False
def name(self):
return self._name
def name(self, value):
self._name = value
def hyperparameters(self):
return self._hyperparameters
def hyperparameters(self, value: dict):
def _check_hyperparameters(self, BaseEstimator):
# check if hyperparameters are members of the class
not_supported_hyperparameters = list(
set([key.split("__")[-1] for key in self._hyperparameters.keys() if key.split("__")[-1] != "disabled"]) -
if not_supported_hyperparameters:
error_message = 'ValueError: Set of hyperparameters are not valid, check hyperparameters:' + \
raise ValueError(error_message)
def generate_sklearn_hyperparameters(self, value: dict):
Generates a dictionary according to the sklearn convention of
element_name__parameter_name: parameter_value.
self._hyperparameters = {}
for attribute, value_list in value.items():
self._hyperparameters[ + '__' + attribute] = value_list
if self.test_disabled:
self._hyperparameters[self._sklearn_disabled] = [False, True]
def random_state(self):
return self._random_state
def random_state(self, random_state):
self._random_state = random_state
if hasattr(self, 'elements'):
for el in self.elements:
if hasattr(el, 'random_state'):
el.random_state = self._random_state
if hasattr(self, "base_element") and hasattr(self.base_element, "random_state"):
self.base_element.random_state = random_state
def _estimator_type(self):
# estimator_type obligation for estimators, is ignored if a transformer is given
# prevention of misuse through predict test (predict method available <=> Estimator).
est_type = getattr(self.base_element, '_estimator_type', None)
if est_type in [None, 'transformer']:
if hasattr(self.base_element, 'predict'):
raise NotImplementedError("Element has predict() method but does not specify whether it is a regressor"
" or classifier. Remember to inherit from ClassifierMixin or RegressorMixin.")
return None
if est_type not in ['classifier', 'regressor']:
raise NotImplementedError("Currently, we only support type classifier or regressor."
" Is {}.".format(est_type))
if not hasattr(self.base_element, 'predict'):
raise NotImplementedError("Estimator does not implement predict() method.")
return est_type
# this is only here because everything inherits from PipelineElement.
def __iadd__(self, pipe_element):
Add an element to the intern list of elements.
pipe_element (PipelineElement):
The object to add, being either a transformer or an estimator.
PipelineElement.sanity_check_element_type_for_building_photon_pipes(pipe_element, type(self))
# check if that exact instance has been added before
already_added_objects = len([i for i in self.elements if i is pipe_element])
if already_added_objects > 0:
error_msg = "Cannot add the same instance twice to " + + " - " + str(type(self))
raise ValueError(error_msg)
# check for doubled names:
already_existing_element_with_that_name = len([i for i in self.elements if ==])
if already_existing_element_with_that_name > 0:
error_msg = "Already added a pipeline element with the name " + + " to " +
# check for other items that have been renamed
nr_of_existing_elements_with_that_name = len([i for i in self.elements if])
new_name = + str(nr_of_existing_elements_with_that_name + 1)
while len([i for i in self.elements if == new_name]) > 0:
nr_of_existing_elements_with_that_name += 1
new_name = + str(nr_of_existing_elements_with_that_name + 1)
msg = "Renaming " + + " in " + + " to " + new_name + " in " +
warnings.warn(msg) = new_name
return self
def copy_me(self):
if in PhotonRegistry.ELEMENT_DICTIONARY:
# we need initial name to refer to the class to be instantiated (SVC) even though the name might be SVC2
copy = PipelineElement(self.initial_name, {}, test_disabled=self.test_disabled,
disabled=self.disabled, batch_size=self.batch_size, **self.kwargs)
copy.initial_hyperparameters = self.initial_hyperparameters
# in the setter of the name, we use initial hyperparameters to adjust the hyperparameters to the name =
if hasattr(self.base_element, 'copy_me'):
new_base_element = self.base_element.copy_me()
new_base_element = deepcopy(self.base_element)
except Exception as e:
error_msg = "Cannot copy custom element " + + ". Please specify a copy_me() method " \
"returning a copy of the object"
raise e
# handle custom elements
copy = PipelineElement.create(, new_base_element, hyperparameters=self.hyperparameters,
disabled=self.disabled, batch_size=self.batch_size,
if self.current_config is not None:
copy._random_state = self._random_state
return copy
def create(cls, name: str, base_element: BaseEstimator, hyperparameters: dict, test_disabled: bool = False,
disabled: bool = False, **kwargs):
Takes an instantiated object and encapsulates it
into the PHOTONAI structure.
Add the disabled function and attaches
information about the hyperparameters that should be tested.
A string literal encoding the class to be instantiated.
The underlying transformer or estimator class.
Which values/value range should be tested for the
In form of Dict: parameter_name -> HyperparameterElement.
If the hyperparameter search should evaluate a
complete disabling of the element.
If true, the element is currently disabled and
does nothing except return the data it received.
Any parameters that should be passed to the object
to be instantiated, default parameters.
``` python
class RD(BaseEstimator, TransformerMixin):
def fit(self, X, y, **kwargs):
def fit_transform(self, X, y=None, **fit_params):
return self.transform(X)
def transform(self, X):
return X[:, :3]
trans = PipelineElement.create('MyTransformer', base_element=RD(), hyperparameters={})
if isinstance(base_element, type):
raise ValueError("Base element should be an instance but is a class.")
return PipelineElement(name, hyperparameters, test_disabled, disabled, base_element=base_element, **kwargs)
def feature_importances_(self):
if hasattr(self.base_element, 'feature_importances_'):
return self.base_element.feature_importances_.tolist()
elif hasattr(self.base_element, 'coef_'):
return self.base_element.coef_.tolist()
def generate_config_grid(self):
config_dict = create_global_config_dict([self])
if len(config_dict) > 0:
if self.test_disabled:
config_list = list(ParameterGrid(config_dict))
if self.test_disabled:
for item in config_list:
item[self._sklearn_disabled] = False
config_list.append({self._sklearn_disabled: True})
if len(config_list) < 2:
config_list.append({self._sklearn_disabled: False})
return config_list
return []
def get_params(self, deep: bool = True):
Forwards the get_params request to the wrapped base element.
if hasattr(self.base_element, 'get_params'):
params = self.base_element.get_params(deep)
params["name"] =
return params
return None
def set_params(self, **kwargs):
Forwards the set_params request to the wrapped base element
Takes care of the disabled parameter which is additionally attached by the PHOTON wrapper
# this is an ugly hack to approximate the right settings when copying the element
self.current_config = kwargs
# element disable is a construct used for this container only
if self._sklearn_disabled in kwargs:
self.disabled = kwargs[self._sklearn_disabled]
del kwargs[self._sklearn_disabled]
elif 'disabled' in kwargs:
self.disabled = kwargs['disabled']
del kwargs['disabled']
return self
def fit(self, X: np.ndarray, y: np.ndarray = None, **kwargs):
Calls the fit function of the base element.
The array-like training and test data with shape=[N, D],
where N is the number of samples and D is the number of features.
The truth array-like values with shape=[N],
where N is the number of samples.
Keyword arguments, passed to base_element.predict.
Fitted self.
if not self.disabled:
obj = self.base_element
arg_list = inspect.signature(
if len(arg_list.parameters) > 2:
vals = arg_list.parameters.values()
kwargs_param = list(vals)[-1]
if kwargs_param.kind == kwargs_param.VAR_KEYWORD:, y, **kwargs)
return self, y)
return self
def __batch_predict(self, delegate, X, **kwargs):
if not isinstance(X, list) and not isinstance(X, np.ndarray):
msg = "Cannot do batching on a single entity."
return delegate(X, **kwargs)
# initialize return values
processed_y = None
nr = PhotonDataHelper.find_n(X)
batch_idx = 0
for start, stop in PhotonDataHelper.chunker(nr, self.batch_size):
batch_idx += 1
logger.debug( + " is predicting batch " + str(batch_idx))
# split data in batches
X_batched, y_batched, kwargs_dict_batched = PhotonDataHelper.split_data(X, None, kwargs, start, stop)
# predict
y_pred = delegate(X_batched, **kwargs_dict_batched)
processed_y = PhotonDataHelper.stack_data_vertically(processed_y, y_pred)
return processed_y
def __predict(self, X, **kwargs):
if not self.disabled:
if hasattr(self.base_element, 'predict'):
return self.adjusted_predict_call(self.base_element.predict, X, **kwargs)
logger.error('BaseException. base Element should have function ' +
raise BaseException('base Element should have function predict.')
return X
def predict(self, X: np.ndarray, **kwargs) -> np.ndarray:
Calls the predict function of the underlying base_element.
The array-like training and test data with shape=[N, D],
where N is the number of samples and D is the number of features.
Keyword arguments, passed to base_element.predict.
Predictions values.
if self.batch_size == 0:
return self.__predict(X, **kwargs)
return self.__batch_predict(self.__predict, X, **kwargs)
def predict_proba(self, X, **kwargs):
if self.batch_size == 0:
return self.__predict_proba(X, **kwargs)
return self.__batch_predict(self.__predict_proba, X, **kwargs)
def __predict_proba(self, X: np.ndarray, **kwargs):
Predict probabilities
base element needs predict_proba() function, otherwise throw
base exception.
if not self.disabled:
if hasattr(self.base_element, 'predict_proba'):
# todo: here, I used delegate call (same as below in predict within the transform call)
#return self.base_element.predict_proba(X)
return self.adjusted_predict_call(self.base_element.predict_proba, X, **kwargs)
# todo: in case _final_estimator is a Branch, we do not know beforehand it the base elements will
# have a predict_proba -> if not, just return None (@Ramona, does this make sense?)
# logger.error('BaseException. base Element should have "predict_proba" function.')
# raise BaseException('base Element should have predict_proba function.')
return None
return X
def __transform(self, X, y=None, **kwargs):
if not self.disabled:
if hasattr(self.base_element, 'transform'):
return self.adjusted_delegate_call(self.base_element.transform, X, y, **kwargs)
elif hasattr(self.base_element, 'predict'):
return self.predict(X, **kwargs), y, kwargs
logger.error('BaseException: transform-predict-mess')
raise BaseException('transform-predict-mess')
return X, y, kwargs
def transform(self, X: np.ndarray, y: np.ndarray = None, **kwargs) -> (np.ndarray, np.ndarray, dict):
Calls transform on the base element.
In case there is no transform method, calls predict.
This is used if we are using an estimator as a preprocessing step.
The array-like data with shape=[N, D], where N is the
number of samples and D is the number of features.
The truth array-like values with shape=[N], where N is
the number of samples.
Keyword arguments, passed to base_element.transform.
(X, y) in transformed version and original kwargs.
if self.batch_size == 0:
Xt, yt, kwargs = self.__transform(X, y, **kwargs)
Xt, yt, kwargs = self.__batch_transform(X, y, **kwargs)
if all(hasattr(data, "shape") for data in [X, Xt]) and all(len(data.shape) > 1 for data in [X, Xt]):
self.reduce_dimension = (Xt.shape[1] < X.shape[1])
return Xt, yt, kwargs
def inverse_transform(self, X: np.ndarray, y: np.ndarray = None, **kwargs) -> (np.ndarray, np.ndarray, dict):
Calls inverse_transform on the base element.
When the dimension is preserved: transformers
without inverse returns original input.
The array-like data with shape=[N, D], where N
is the number of samples and D is the number of features.
The truth array-like values with shape=[N], where N is
the number of samples.
Keyword arguments, passed to base_element.transform.
Thrown when there is a dimensional reduction but no inverse is defined.
(X, y, kwargs) in back-transformed version.
if hasattr(self.base_element, 'inverse_transform'):
# todo: check this
X, y, kwargs = self.adjusted_delegate_call(self.base_element.inverse_transform, X, y, **kwargs)
elif self.is_transformer and self.reduce_dimension:
msg = "{} has no inverse_transform, but element reduce dimesions.".format(
raise NotImplementedError(msg)
return X, y, kwargs
def __batch_transform(self, X, y=None, **kwargs):
if not isinstance(X, list) and not isinstance(X, np.ndarray):
warning = "Cannot do batching on a single entity."
return self.__transform(X, y, **kwargs)
# initialize return values
processed_X = None
processed_y = None
processed_kwargs = dict()
nr = PhotonDataHelper.find_n(X)
batch_idx = 0
for start, stop in PhotonDataHelper.chunker(nr, self.batch_size):
batch_idx += 1
# split data in batches
X_batched, y_batched, kwargs_dict_batched = PhotonDataHelper.split_data(X, y, kwargs, start, stop)
actual_batch_size = PhotonDataHelper.find_n(X_batched)
logger.debug( + " is transforming batch " + str(batch_idx) + " with " + str(actual_batch_size)
+ " items.")
# call transform
X_new, y_new, kwargs_new = self.adjusted_delegate_call(self.base_element.transform, X_batched, y_batched,
# stack results
processed_X, processed_y, processed_kwargs = PhotonDataHelper.join_data(processed_X, X_new, processed_y,
processed_kwargs, kwargs_new)
return processed_X, processed_y, processed_kwargs
def adjusted_delegate_call(self, delegate, X, y, **kwargs):
# Case| transforms X | needs_y | needs_covariates
# -------------------------------------------------------
# 1 yes no no = transform(X) -> returns Xt
# todo: case does not exist any longer
# 2 yes yes no = transform(X, y) -> returns Xt, yt
# 3 yes yes yes = transform(X, y, kwargs) -> returns Xt, yt, kwargst
# 4 yes no yes = transform(X, kwargs) -> returns Xt, kwargst
# 5 no yes or no yes or no = NOT ALLOWED
# todo: we don't need to check for Switch, Stack or Branch since those classes define
# needs_y and needs_covariates in their __init__()
if self.needs_y:
# if we dont have any target vector, we are in "predict"-mode although we are currently transforming
# in this case, we want to skip the transformation and pass X, None and kwargs onwards
# so basically, we skip all training_only elements
# todo: I think, there's no way around this if we want to pass y and kwargs down to children of Switch and Branch
if isinstance(self, (Switch, Branch, Preprocessing)):
X, y, kwargs = delegate(X, y, **kwargs)
if y is not None:
# todo: in case a method needs y, we should also always pass kwargs
# i.e. if we change the number of samples, we also need to apply that change to all kwargs
# todo: talk to Ramona! Maybe we actually DO need this case
if self.needs_covariates:
X, y, kwargs = delegate(X, y, **kwargs)
X, y = delegate(X, y)
elif self.needs_covariates:
X, kwargs = delegate(X, **kwargs)
X = delegate(X)
return X, y, kwargs
def adjusted_predict_call(self, delegate, X, **kwargs):
if self.needs_covariates:
return delegate(X, **kwargs)
return delegate(X)
def score(self, X_test: np.ndarray, y_test: np.ndarray) -> float:
Calls the score function on the base element.
Input test data to score on.
Input true targets to score on.
A goodness of fit measure or a likelihood of unseen data.
return self.base_element.score(X_test, y_test)
def prettify_config_output(self, config_name: str, config_value, return_dict: bool = False):
"""Make hyperparameter combinations human readable """
if config_name == "disabled" and config_value is False:
if return_dict:
return {'disabled': False}
return "disabled = False"
if return_dict:
return {config_name: config_value}
return config_name + '=' + str(config_value)
def sanity_check_element_type_for_building_photon_pipes(pipe_element, type_of_self):
if (not isinstance(pipe_element, PipelineElement) and not isinstance(pipe_element, PhotonNative)) or isinstance(pipe_element, Preprocessing):
raise TypeError(str(type_of_self) + " only accepts PHOTON elements. Cannot add element of type " + str(type(pipe_element)))
__iadd__(self, pipe_element)
Add an element to the intern list of elements.
Name | Type | Description | Default |
pipe_element |
PipelineElement |
The object to add, being either a transformer or an estimator. |
required |
Source code in photonai/base/
def __iadd__(self, pipe_element):
Add an element to the intern list of elements.
pipe_element (PipelineElement):
The object to add, being either a transformer or an estimator.
PipelineElement.sanity_check_element_type_for_building_photon_pipes(pipe_element, type(self))
# check if that exact instance has been added before
already_added_objects = len([i for i in self.elements if i is pipe_element])
if already_added_objects > 0:
error_msg = "Cannot add the same instance twice to " + + " - " + str(type(self))
raise ValueError(error_msg)
# check for doubled names:
already_existing_element_with_that_name = len([i for i in self.elements if ==])
if already_existing_element_with_that_name > 0:
error_msg = "Already added a pipeline element with the name " + + " to " +
# check for other items that have been renamed
nr_of_existing_elements_with_that_name = len([i for i in self.elements if])
new_name = + str(nr_of_existing_elements_with_that_name + 1)
while len([i for i in self.elements if == new_name]) > 0:
nr_of_existing_elements_with_that_name += 1
new_name = + str(nr_of_existing_elements_with_that_name + 1)
msg = "Renaming " + + " in " + + " to " + new_name + " in " +
warnings.warn(msg) = new_name
return self
__init__(self, name, hyperparameters=None, test_disabled=False, disabled=False, base_element=None, batch_size=0, **kwargs)
Takes a string literal and transforms it into an object of the associated class (see PhotonCore.JSON).
Name | Type | Description | Default |
name |
str |
A string literal encoding the class to be instantiated. |
required |
hyperparameters |
dict |
Which values/value range should be tested for the hyperparameter. In form of Dict: parameter_name -> HyperparameterElement. |
None |
test_disabled |
bool |
If the hyperparameter search should evaluate a complete disabling of the element. |
False |
disabled |
bool |
If true, the element is currently disabled and does nothing except return the data it received. |
False |
base_element |
BaseEstimator |
The underlying BaseEstimator. If not given the instantiation per string identifier takes place. |
None |
batch_size |
int |
Size of the division on which is calculated separately. |
0 |
**kwargs |
Any parameters that should be passed to the object to be instantiated, default parameters. |
{} |
Source code in photonai/base/
def __init__(self, name: str, hyperparameters: dict = None, test_disabled: bool = False,
disabled: bool = False, base_element: BaseEstimator = None, batch_size: int = 0, **kwargs) -> None:
Takes a string literal and transforms it into an object
of the associated class (see PhotonCore.JSON).
A string literal encoding the class to be instantiated.
Which values/value range should be tested for the
In form of Dict: parameter_name -> HyperparameterElement.
If the hyperparameter search should evaluate a
complete disabling of the element.
If true, the element is currently disabled and
does nothing except return the data it received.
The underlying BaseEstimator. If not given the
instantiation per string identifier takes place.
Size of the division on which is calculated separately.
Any parameters that should be passed to the object
to be instantiated, default parameters.
if hyperparameters is None:
hyperparameters = {}
if base_element is None:
# Registering Pipeline Elements
if len(PhotonRegistry.ELEMENT_DICTIONARY) == 0:
registry = PhotonRegistry
if name not in PhotonRegistry.ELEMENT_DICTIONARY:
# try to reload
PhotonRegistry.ELEMENT_DICTIONARY = PhotonRegistry().get_package_info()
if name in PhotonRegistry.ELEMENT_DICTIONARY:
desired_class_info = PhotonRegistry.ELEMENT_DICTIONARY[name]
desired_class_home = desired_class_info[0]
desired_class_name = desired_class_info[1]
imported_module = importlib.import_module(desired_class_home)
desired_class = getattr(imported_module, desired_class_name)
self.base_element = desired_class(**kwargs)
except AttributeError as ae:
logger.error('ValueError: Could not find according class:'
+ str(PhotonRegistry.ELEMENT_DICTIONARY[name]))
raise ValueError('Could not find according class:', PhotonRegistry.ELEMENT_DICTIONARY[name])
# if even after reload the element does not appear, it is not supported
logger.error('Element not supported right now:' + name)
raise NameError('Element not supported right now:', name)
self.base_element = base_element
self.is_transformer = hasattr(self.base_element, "transform")
self.reduce_dimension = False # boolean - set on transform method
self.is_estimator = hasattr(self.base_element, "predict")
self._name = name
self.initial_name = str(name)
self.kwargs = kwargs
self.current_config = None
self.batch_size = batch_size
self.test_disabled = test_disabled
self.initial_hyperparameters = dict(hyperparameters)
self._sklearn_disabled = + '__disabled'
self._hyperparameters = hyperparameters
if len(hyperparameters) > 0:
key_0 = next(iter(hyperparameters))
if not in key_0:
self.hyperparameters = hyperparameters
self.hyperparameters = hyperparameters
# self.initalize_hyperparameters = hyperparameters
# check if hyperparameters are already in sklearn style
# check if hyperparameters are members of the class
if self.is_transformer or self.is_estimator:
self.disabled = disabled
# check if self.base element needs y for fitting and transforming
if hasattr(self.base_element, 'needs_y'):
self.needs_y = self.base_element.needs_y
self.needs_y = False
# or if it maybe needs covariates for fitting and transforming
if hasattr(self.base_element, 'needs_covariates'):
self.needs_covariates = self.base_element.needs_covariates
self.needs_covariates = False
self._random_state = False
create(name, base_element, hyperparameters, test_disabled=False, disabled=False, **kwargs)
Takes an instantiated object and encapsulates it into the PHOTONAI structure. Add the disabled function and attaches information about the hyperparameters that should be tested.
Name | Type | Description | Default |
name |
str |
A string literal encoding the class to be instantiated. |
required |
base_element |
BaseEstimator |
The underlying transformer or estimator class. |
required |
hyperparameters |
dict |
Which values/value range should be tested for the hyperparameter. In form of Dict: parameter_name -> HyperparameterElement. |
required |
test_disabled |
bool |
If the hyperparameter search should evaluate a complete disabling of the element. |
False |
disabled |
bool |
If true, the element is currently disabled and does nothing except return the data it received. |
False |
**kwargs |
Any parameters that should be passed to the object to be instantiated, default parameters. |
{} |
1 2 3 4 5 6 7 8 9 10 11 12 |
Source code in photonai/base/
def create(cls, name: str, base_element: BaseEstimator, hyperparameters: dict, test_disabled: bool = False,
disabled: bool = False, **kwargs):
Takes an instantiated object and encapsulates it
into the PHOTONAI structure.
Add the disabled function and attaches
information about the hyperparameters that should be tested.
A string literal encoding the class to be instantiated.
The underlying transformer or estimator class.
Which values/value range should be tested for the
In form of Dict: parameter_name -> HyperparameterElement.
If the hyperparameter search should evaluate a
complete disabling of the element.
If true, the element is currently disabled and
does nothing except return the data it received.
Any parameters that should be passed to the object
to be instantiated, default parameters.
``` python
class RD(BaseEstimator, TransformerMixin):
def fit(self, X, y, **kwargs):
def fit_transform(self, X, y=None, **fit_params):
return self.transform(X)
def transform(self, X):
return X[:, :3]
trans = PipelineElement.create('MyTransformer', base_element=RD(), hyperparameters={})
if isinstance(base_element, type):
raise ValueError("Base element should be an instance but is a class.")
return PipelineElement(name, hyperparameters, test_disabled, disabled, base_element=base_element, **kwargs)
fit(self, X, y=None, **kwargs)
Calls the fit function of the base element.
Name | Type | Description | Default |
X |
ndarray |
The array-like training and test data with shape=[N, D], where N is the number of samples and D is the number of features. |
required |
y |
ndarray |
The truth array-like values with shape=[N], where N is the number of samples. |
None |
**kwargs |
Keyword arguments, passed to base_element.predict. |
{} |
Type | Description |
Fitted self. |
Source code in photonai/base/
def fit(self, X: np.ndarray, y: np.ndarray = None, **kwargs):
Calls the fit function of the base element.
The array-like training and test data with shape=[N, D],
where N is the number of samples and D is the number of features.
The truth array-like values with shape=[N],
where N is the number of samples.
Keyword arguments, passed to base_element.predict.
Fitted self.
if not self.disabled:
obj = self.base_element
arg_list = inspect.signature(
if len(arg_list.parameters) > 2:
vals = arg_list.parameters.values()
kwargs_param = list(vals)[-1]
if kwargs_param.kind == kwargs_param.VAR_KEYWORD:, y, **kwargs)
return self, y)
return self
inverse_transform(self, X, y=None, **kwargs)
Calls inverse_transform on the base element.
When the dimension is preserved: transformers without inverse returns original input.
Name | Type | Description | Default |
X |
ndarray |
The array-like data with shape=[N, D], where N is the number of samples and D is the number of features. |
required |
y |
ndarray |
The truth array-like values with shape=[N], where N is the number of samples. |
None |
**kwargs |
Keyword arguments, passed to base_element.transform. |
{} |
Type | Description |
(<class 'numpy.ndarray'>, <class 'numpy.ndarray'>, <class 'dict'>) |
(X, y, kwargs) in back-transformed version. |
Source code in photonai/base/
def inverse_transform(self, X: np.ndarray, y: np.ndarray = None, **kwargs) -> (np.ndarray, np.ndarray, dict):
Calls inverse_transform on the base element.
When the dimension is preserved: transformers
without inverse returns original input.
The array-like data with shape=[N, D], where N
is the number of samples and D is the number of features.
The truth array-like values with shape=[N], where N is
the number of samples.
Keyword arguments, passed to base_element.transform.
Thrown when there is a dimensional reduction but no inverse is defined.
(X, y, kwargs) in back-transformed version.
if hasattr(self.base_element, 'inverse_transform'):
# todo: check this
X, y, kwargs = self.adjusted_delegate_call(self.base_element.inverse_transform, X, y, **kwargs)
elif self.is_transformer and self.reduce_dimension:
msg = "{} has no inverse_transform, but element reduce dimesions.".format(
raise NotImplementedError(msg)
return X, y, kwargs
predict(self, X, **kwargs)
Calls the predict function of the underlying base_element.
Name | Type | Description | Default |
X |
ndarray |
The array-like training and test data with shape=[N, D], where N is the number of samples and D is the number of features. |
required |
**kwargs |
Keyword arguments, passed to base_element.predict. |
{} |
Type | Description |
ndarray |
Predictions values. |
Source code in photonai/base/
def predict(self, X: np.ndarray, **kwargs) -> np.ndarray:
Calls the predict function of the underlying base_element.
The array-like training and test data with shape=[N, D],
where N is the number of samples and D is the number of features.
Keyword arguments, passed to base_element.predict.
Predictions values.
if self.batch_size == 0:
return self.__predict(X, **kwargs)
return self.__batch_predict(self.__predict, X, **kwargs)
score(self, X_test, y_test)
Calls the score function on the base element.
Name | Type | Description | Default |
X_test |
ndarray |
Input test data to score on. |
required |
y_test |
ndarray |
Input true targets to score on. |
required |
Type | Description |
float |
A goodness of fit measure or a likelihood of unseen data. |
Source code in photonai/base/
def score(self, X_test: np.ndarray, y_test: np.ndarray) -> float:
Calls the score function on the base element.
Input test data to score on.
Input true targets to score on.
A goodness of fit measure or a likelihood of unseen data.
return self.base_element.score(X_test, y_test)
transform(self, X, y=None, **kwargs)
Calls transform on the base element.
In case there is no transform method, calls predict. This is used if we are using an estimator as a preprocessing step.
Name | Type | Description | Default |
X |
ndarray |
The array-like data with shape=[N, D], where N is the number of samples and D is the number of features. |
required |
y |
ndarray |
The truth array-like values with shape=[N], where N is the number of samples. |
None |
**kwargs |
Keyword arguments, passed to base_element.transform. |
{} |
Type | Description |
(<class 'numpy.ndarray'>, <class 'numpy.ndarray'>, <class 'dict'>) |
(X, y) in transformed version and original kwargs. |
Source code in photonai/base/
def transform(self, X: np.ndarray, y: np.ndarray = None, **kwargs) -> (np.ndarray, np.ndarray, dict):
Calls transform on the base element.
In case there is no transform method, calls predict.
This is used if we are using an estimator as a preprocessing step.
The array-like data with shape=[N, D], where N is the
number of samples and D is the number of features.
The truth array-like values with shape=[N], where N is
the number of samples.
Keyword arguments, passed to base_element.transform.
(X, y) in transformed version and original kwargs.
if self.batch_size == 0:
Xt, yt, kwargs = self.__transform(X, y, **kwargs)
Xt, yt, kwargs = self.__batch_transform(X, y, **kwargs)
if all(hasattr(data, "shape") for data in [X, Xt]) and all(len(data.shape) > 1 for data in [X, Xt]):
self.reduce_dimension = (Xt.shape[1] < X.shape[1])
return Xt, yt, kwargs