Skip to content

Aggregation Function

What is the AggrFunc module?

The AggrFunc module facilitates the application of aggregation functions to feature groups within a longitudinal dataset, enabling the use of temporal information before applying traditional machine learning algorithms.

What are features_group and non_longitudinal_features?

Two key attributes, features_group and non_longitudinal_features, enable algorithms to interpret the temporal structure of longitudinal data.

  • features_group: A list of lists where each sublist contains indices of a longitudinal attribute's waves, ordered from oldest to most recent. This captures temporal dependencies.
  • non_longitudinal_features: A list of indices for static, non-temporal features excluded from the temporal matrix.

Proper setup of these attributes is critical for leveraging temporal patterns effectively.

See More In Temporal Dependency Guide

AggrFunc

Bases: DataPreparationMixin

AggrFunc stands for Aggregation Functions, aggregation on feature groups in longitudinal datasets.

The AggrFunc facilitates the application of aggregation functions to feature groups within a longitudinal dataset, enabling the use of temporal information before applying traditional machine learning algorithms like those in Scikit-Learn or any other alike machine learning-based libarires.

The aggregation function is applied iteratively across waves for each feature group, producing a single aggregated feature per group (e.g., mean_income from income_wave1, income_wave2, income_wave3 using the mean function). Supported aggregation functions include mean, median, mode, and custom callable functions that take a pandas Series as input and return a single value. Parallel processing is also supported via the Ray library for enhanced efficiency on large datasets.

Parameters:

Name Type Description Default
features_group List[List[int]]

A temporal matrix representing the temporal dependency of a longitudinal dataset. Each sublist contains indices of a longitudinal attribute's waves. Defaults to None. See the "Temporal Dependency" page in the documentation for details.

None
non_longitudinal_features List[Union[int, str]]

A list of indices or names of non-longitudinal features. Defaults to None.

None
feature_list_names List[str]

A list of feature names in the dataset. Defaults to None.

None
aggregation_func Union[str, Callable]

The aggregation function to apply. Options are "mean", "median", "mode", or a custom callable function. Defaults to "mean". See further in the aggregation_function.py file at the object AGG_FUNCS for those supported.

'mean'
parallel bool

Whether to use parallel processing for aggregation. Defaults to False.

False
num_cpus int

Number of CPUs for parallel processing. Defaults to -1 (uses all available CPUs).

-1

Attributes:

Name Type Description
dataset DataFrame

The longitudinal dataset to transform.

aggregation_func Union[str, Callable]

The aggregation function applied to feature groups.

parallel bool

Whether parallel processing is enabled.

num_cpus int

Number of CPUs used for parallel processing.

Examples:

Below are examples demonstrating the usage of the AggrFunc class with the "stroke.csv" dataset. Please, note that "stroke.csv" is a placeholder and should be replaced with the actual path to your dataset.

Basic Usage

from scikit_longitudinal.data_preparation import LongitudinalDataset
from scikit_longitudinal.data_preparation.aggregation_function import AggrFunc

# Load dataset
dataset = LongitudinalDataset('./stroke_longitudinal.csv')
dataset.load_data()
dataset.load_target(target_column="stroke_w2")
dataset.setup_features_group("elsa")
dataset.load_train_test_split(test_size=0.2, random_state=42)

# Initialize AggrFunc
agg_func = AggrFunc(
    aggregation_func="mean",
    features_group=dataset.feature_groups(),
    non_longitudinal_features=dataset.non_longitudinal_features(),
    feature_list_names=dataset.data.columns.tolist()
)

# Apply transformation
agg_func.prepare_data(dataset.X_train)
transformed_dataset, _, _, _ = agg_func._transform()

Advanced: custom aggregation function

from scikit_longitudinal.data_preparation import LongitudinalDataset
from scikit_longitudinal.data_preparation.aggregation_function import AggrFunc

# Load dataset
dataset = LongitudinalDataset("./stroke_longitudinal.csv")
dataset.load_data()
dataset.load_target(target_column="stroke_w2")
dataset.setup_features_group("elsa")
dataset.load_train_test_split(test_size=0.2, random_state=42)

# Define custom function
custom_func = lambda x: x.quantile(0.25)  # First quartile

# Initialize AggrFunc
agg_func = AggrFunc(
    aggregation_func=custom_func,
    features_group=dataset.feature_groups(),
    non_longitudinal_features=dataset.non_longitudinal_features(),
    feature_list_names=dataset.data.columns.tolist(),
)

# Apply transformation
agg_func.prepare_data(dataset.X_train)
transformed_dataset, _, _, _ = agg_func._transform()

Advanced: parallel processing

# ... similar to the previous example, prepare data and transform ...

# Initialize AggrFunc with parallel processing
agg_func = AggrFunc(
    aggregation_func="mean",
    features_group=dataset.feature_groups(),
    non_longitudinal_features=dataset.non_longitudinal_features(),
    feature_list_names=dataset.data.columns.tolist(),
    parallel=True, # Enable parallel processing
    num_cpus=4 # Specify number of CPUs (optional, -1 for all available)
)

# ... similar to the previous example, prepare data and transform ...
Source code in scikit_longitudinal/data_preparation/aggregation_function.py
class AggrFunc(DataPreparationMixin):
    """AggrFunc stands for Aggregation Functions, aggregation on feature groups in longitudinal datasets.

    The `AggrFunc` facilitates the application of aggregation functions to feature groups within a longitudinal
    dataset, enabling the use of `temporal information` before applying traditional machine learning algorithms like
    those in Scikit-Learn or any other alike machine learning-based libarires.


    The aggregation function is applied iteratively across waves for each feature group, producing a single aggregated
    feature per group (e.g., `mean_income` from `income_wave1`, `income_wave2`, `income_wave3` using the `mean`
    function). Supported aggregation functions include `mean`, `median`, `mode`, and custom callable functions that
    take a pandas Series as input and return a single value. Parallel processing is also supported via the Ray library
    for enhanced efficiency on large datasets.

    Args:
        features_group (List[List[int]], optional): A temporal matrix representing the temporal dependency of a
            longitudinal dataset. Each sublist contains indices of a longitudinal attribute's waves. Defaults to None.
            See the "Temporal Dependency" page in the documentation for details.
        non_longitudinal_features (List[Union[int, str]], optional): A list of indices or names of non-longitudinal
            features. Defaults to None.
        feature_list_names (List[str], optional): A list of feature names in the dataset. Defaults to None.
        aggregation_func (Union[str, Callable], optional): The aggregation function to apply. Options are "mean",
            "median", "mode", or a custom callable function. Defaults to "mean". See further in
            the `aggregation_function.py` file at the object `AGG_FUNCS` for those supported.
        parallel (bool, optional): Whether to use parallel processing for aggregation. Defaults to False.
        num_cpus (int, optional): Number of CPUs for parallel processing. Defaults to -1 (uses all available CPUs).

    Attributes:
        dataset (pd.DataFrame): The longitudinal dataset to transform.
        aggregation_func (Union[str, Callable]): The aggregation function applied to feature groups.
        parallel (bool): Whether parallel processing is enabled.
        num_cpus (int): Number of CPUs used for parallel processing.

    Examples:
        Below are examples demonstrating the usage of the `AggrFunc` class with the "stroke.csv" dataset.
        Please, note that "stroke.csv" is a placeholder and should be replaced with the actual path to your dataset.

        !!! example "Basic Usage"
            ```python
            from scikit_longitudinal.data_preparation import LongitudinalDataset
            from scikit_longitudinal.data_preparation.aggregation_function import AggrFunc

            # Load dataset
            dataset = LongitudinalDataset('./stroke_longitudinal.csv')
            dataset.load_data()
            dataset.load_target(target_column="stroke_w2")
            dataset.setup_features_group("elsa")
            dataset.load_train_test_split(test_size=0.2, random_state=42)

            # Initialize AggrFunc
            agg_func = AggrFunc(
                aggregation_func="mean",
                features_group=dataset.feature_groups(),
                non_longitudinal_features=dataset.non_longitudinal_features(),
                feature_list_names=dataset.data.columns.tolist()
            )

            # Apply transformation
            agg_func.prepare_data(dataset.X_train)
            transformed_dataset, _, _, _ = agg_func._transform()
            ```

        !!! example "Advanced: custom aggregation function"
            ```python
            from scikit_longitudinal.data_preparation import LongitudinalDataset
            from scikit_longitudinal.data_preparation.aggregation_function import AggrFunc

            # Load dataset
            dataset = LongitudinalDataset("./stroke_longitudinal.csv")
            dataset.load_data()
            dataset.load_target(target_column="stroke_w2")
            dataset.setup_features_group("elsa")
            dataset.load_train_test_split(test_size=0.2, random_state=42)

            # Define custom function
            custom_func = lambda x: x.quantile(0.25)  # First quartile

            # Initialize AggrFunc
            agg_func = AggrFunc(
                aggregation_func=custom_func,
                features_group=dataset.feature_groups(),
                non_longitudinal_features=dataset.non_longitudinal_features(),
                feature_list_names=dataset.data.columns.tolist(),
            )

            # Apply transformation
            agg_func.prepare_data(dataset.X_train)
            transformed_dataset, _, _, _ = agg_func._transform()
            ```

        !!! example "Advanced: parallel processing"
            ```python
            # ... similar to the previous example, prepare data and transform ...

            # Initialize AggrFunc with parallel processing
            agg_func = AggrFunc(
                aggregation_func="mean",
                features_group=dataset.feature_groups(),
                non_longitudinal_features=dataset.non_longitudinal_features(),
                feature_list_names=dataset.data.columns.tolist(),
                parallel=True, # Enable parallel processing
                num_cpus=4 # Specify number of CPUs (optional, -1 for all available)
            )

            # ... similar to the previous example, prepare data and transform ...
            ```
    """

    @validate_aggregation_func
    def __init__(
        self,
        features_group: List[List[int]] = None,
        non_longitudinal_features: List[Union[int, str]] = None,
        feature_list_names: List[str] = None,
        aggregation_func: Union[str, Callable] = "mean",
        parallel: bool = False,
        num_cpus: int = -1,
    ):
        self.dataset = pd.DataFrame([])
        self.target = np.ndarray([])
        self.features_group = features_group
        self.non_longitudinal_features = non_longitudinal_features
        self.feature_list_names = feature_list_names
        self.aggregation_func = aggregation_func
        self.parallel = parallel
        self.num_cpus = num_cpus
        if isinstance(aggregation_func, str):
            self.agg_func = AGG_FUNCS[aggregation_func]
        else:
            self.agg_func = aggregation_func

    def get_params(self, deep: bool = True):  # pylint: disable=W0613
        """Get the parameters of the AggrFunc instance.

        This method retrieves the configuration parameters of the `AggrFunc` instance, useful for inspection or
        hyperparameter tuning.

        Args:
            deep (bool, optional): Unused parameter but kept for consistency with the scikit-learn API.

        Returns:
            dict: The parameters of the AggrFunc instance.
        """
        return {
            "aggregation_func": self.aggregation_func,
            "parallel": self.parallel,
            "num_cpus": self.num_cpus,
        }

    @override
    def _prepare_data(self, X: np.ndarray, y: np.ndarray = None) -> "AggrFunc":
        """Prepare the data for transformation.

        This method, overridden from `DataPreparationMixin`, converts input numpy arrays into a pandas DataFrame and
        stores the target data for compatibility, though the target is not used in the transformation.

        Args:
            X (np.ndarray): The input data.
            y (np.ndarray, optional): The target data. Defaults to None.

        Returns:
            AggrFunc: The instance with prepared data.
        """
        self.dataset = pd.DataFrame(X, columns=self.feature_list_names)
        self.target = y

        return self

    @validate_feature_group_indices
    def _transform(self):
        """Apply the aggregation function to feature groups in the dataset.

        This method applies the specified aggregation function to each feature group, replacing it with a single
        aggregated feature.

        !!! tip "Parallel Processing"
            If parallel processing is enabled, it uses the Ray library.

        !!! note "Categorical Data Handling"
            For "mean" or "median" functions
            with categorical data, it switches to "mode" and issues a warning automatically.

        Returns:
            tuple:
                - [x] pd.DataFrame: The transformed dataset.
                - [x] List[List[int]]: Feature groups in the transformed dataset (None, as they are aggregated).
                - [x] List[Union[int, str]]: Non-longitudinal features in the transformed dataset (None).
                - [x] List[str]: Names of features in the transformed dataset.
        """
        if self.features_group is not None:
            self.features_group = clean_padding(self.features_group)

        transformed_data = self.dataset.copy()
        feature_groups = [
            transformed_data.columns[i].tolist() for i in self.features_group
        ]

        if self.parallel:
            ray = get_ray_for_parallel(self.parallel, self.num_cpus)
            non_grouped_data = transformed_data.iloc[:, self.non_longitudinal_features]
            aggregate_remote = ray.remote(_aggregate)
            tasks = [
                aggregate_remote.remote(
                    feature_group,
                    transformed_data,
                    self.agg_func,
                    self.aggregation_func,
                )
                for feature_group in feature_groups
            ]
            results = ray.get(tasks)
            transformed_data = pd.concat(
                [non_grouped_data, pd.concat(results, axis=1)], axis=1
            )
        else:
            for feature_group in feature_groups:
                if (
                    self.aggregation_func in ["mean", "median"]
                    and (
                        transformed_data[feature_group].dtypes.apply(
                            lambda x: x == "object"
                        )
                    ).all()
                ):
                    warnings.warn(
                        f"Aggregation function is {self.aggregation_func} but feature group {feature_group} is "
                        "categorical. Using mode instead."
                    )

                    def agg_mode_func(x):
                        return stats.mode(x)[0]

                    agg_feature_df = get_agg_feature(
                        transformed_data, feature_group, agg_mode_func, "mode"
                    )
                else:
                    agg_feature_df = get_agg_feature(
                        transformed_data,
                        feature_group,
                        self.agg_func,
                        self.aggregation_func,
                    )

                transformed_data = pd.concat([transformed_data, agg_feature_df], axis=1)
                transformed_data.drop(feature_group, axis=1, inplace=True)

        self.dataset = transformed_data
        self.features_group = None  # pylint: disable=W0212
        self.non_longitudinal_features = None  # pylint: disable=W0212
        self.feature_list_names = self.dataset.columns.tolist()
        return (
            self.dataset,
            self.features_group,
            self.non_longitudinal_features,
            self.feature_list_names,
        )

get_params(deep=True)

Get the parameters of the AggrFunc instance.

This method retrieves the configuration parameters of the AggrFunc instance, useful for inspection or hyperparameter tuning.

Parameters:

Name Type Description Default
deep bool

Unused parameter but kept for consistency with the scikit-learn API.

True

Returns:

Name Type Description
dict

The parameters of the AggrFunc instance.

Source code in scikit_longitudinal/data_preparation/aggregation_function.py
def get_params(self, deep: bool = True):  # pylint: disable=W0613
    """Get the parameters of the AggrFunc instance.

    This method retrieves the configuration parameters of the `AggrFunc` instance, useful for inspection or
    hyperparameter tuning.

    Args:
        deep (bool, optional): Unused parameter but kept for consistency with the scikit-learn API.

    Returns:
        dict: The parameters of the AggrFunc instance.
    """
    return {
        "aggregation_func": self.aggregation_func,
        "parallel": self.parallel,
        "num_cpus": self.num_cpus,
    }

_prepare_data(X, y=None)

Prepare the data for transformation.

This method, overridden from DataPreparationMixin, converts input numpy arrays into a pandas DataFrame and stores the target data for compatibility, though the target is not used in the transformation.

Parameters:

Name Type Description Default
X ndarray

The input data.

required
y ndarray

The target data. Defaults to None.

None

Returns:

Name Type Description
AggrFunc AggrFunc

The instance with prepared data.

Source code in scikit_longitudinal/data_preparation/aggregation_function.py
@override
def _prepare_data(self, X: np.ndarray, y: np.ndarray = None) -> "AggrFunc":
    """Prepare the data for transformation.

    This method, overridden from `DataPreparationMixin`, converts input numpy arrays into a pandas DataFrame and
    stores the target data for compatibility, though the target is not used in the transformation.

    Args:
        X (np.ndarray): The input data.
        y (np.ndarray, optional): The target data. Defaults to None.

    Returns:
        AggrFunc: The instance with prepared data.
    """
    self.dataset = pd.DataFrame(X, columns=self.feature_list_names)
    self.target = y

    return self

_transform()

Apply the aggregation function to feature groups in the dataset.

This method applies the specified aggregation function to each feature group, replacing it with a single aggregated feature.

Parallel Processing

If parallel processing is enabled, it uses the Ray library.

Categorical Data Handling

For "mean" or "median" functions with categorical data, it switches to "mode" and issues a warning automatically.

Returns:

Name Type Description
tuple
  • pd.DataFrame: The transformed dataset.
  • List[List[int]]: Feature groups in the transformed dataset (None, as they are aggregated).
  • List[Union[int, str]]: Non-longitudinal features in the transformed dataset (None).
  • List[str]: Names of features in the transformed dataset.
Source code in scikit_longitudinal/data_preparation/aggregation_function.py
@validate_feature_group_indices
def _transform(self):
    """Apply the aggregation function to feature groups in the dataset.

    This method applies the specified aggregation function to each feature group, replacing it with a single
    aggregated feature.

    !!! tip "Parallel Processing"
        If parallel processing is enabled, it uses the Ray library.

    !!! note "Categorical Data Handling"
        For "mean" or "median" functions
        with categorical data, it switches to "mode" and issues a warning automatically.

    Returns:
        tuple:
            - [x] pd.DataFrame: The transformed dataset.
            - [x] List[List[int]]: Feature groups in the transformed dataset (None, as they are aggregated).
            - [x] List[Union[int, str]]: Non-longitudinal features in the transformed dataset (None).
            - [x] List[str]: Names of features in the transformed dataset.
    """
    if self.features_group is not None:
        self.features_group = clean_padding(self.features_group)

    transformed_data = self.dataset.copy()
    feature_groups = [
        transformed_data.columns[i].tolist() for i in self.features_group
    ]

    if self.parallel:
        ray = get_ray_for_parallel(self.parallel, self.num_cpus)
        non_grouped_data = transformed_data.iloc[:, self.non_longitudinal_features]
        aggregate_remote = ray.remote(_aggregate)
        tasks = [
            aggregate_remote.remote(
                feature_group,
                transformed_data,
                self.agg_func,
                self.aggregation_func,
            )
            for feature_group in feature_groups
        ]
        results = ray.get(tasks)
        transformed_data = pd.concat(
            [non_grouped_data, pd.concat(results, axis=1)], axis=1
        )
    else:
        for feature_group in feature_groups:
            if (
                self.aggregation_func in ["mean", "median"]
                and (
                    transformed_data[feature_group].dtypes.apply(
                        lambda x: x == "object"
                    )
                ).all()
            ):
                warnings.warn(
                    f"Aggregation function is {self.aggregation_func} but feature group {feature_group} is "
                    "categorical. Using mode instead."
                )

                def agg_mode_func(x):
                    return stats.mode(x)[0]

                agg_feature_df = get_agg_feature(
                    transformed_data, feature_group, agg_mode_func, "mode"
                )
            else:
                agg_feature_df = get_agg_feature(
                    transformed_data,
                    feature_group,
                    self.agg_func,
                    self.aggregation_func,
                )

            transformed_data = pd.concat([transformed_data, agg_feature_df], axis=1)
            transformed_data.drop(feature_group, axis=1, inplace=True)

    self.dataset = transformed_data
    self.features_group = None  # pylint: disable=W0212
    self.non_longitudinal_features = None  # pylint: disable=W0212
    self.feature_list_names = self.dataset.columns.tolist()
    return (
        self.dataset,
        self.features_group,
        self.non_longitudinal_features,
        self.feature_list_names,
    )