Source code for brainstat.stats.terms

"""Classes for fixed, mixed, and random effects."""
import difflib
import re
from typing import Any, List, Optional, Sequence, Union

import numpy as np
import pandas as pd

from brainstat._typing import ArrayLike
from brainstat._utils import logger


[docs]def check_names( x: Union[int, "FixedEffect", pd.DataFrame, pd.Series] ) -> Optional[List[str]]: """Return True if `x` is FixedEffect, Series or DataFrame.""" if isinstance(x, (pd.DataFrame, pd.Series)): return x.columns.tolist() elif isinstance(x, FixedEffect): return x.names else: return None
[docs]def check_categorical_variables( x: Union[ArrayLike, pd.DataFrame, "FixedEffect"], names: Optional[Union[str, Sequence[str]]] = None, ) -> None: """Checks whether categorical variables were provided as such. Parameters ---------- x : ArrayLike, pandas.DataFrame The input array. names : str, sequence of str or None, optional Names for each column in `x`. Default is None. """ if np.isscalar(x): return if isinstance(names, str): names = [names] if isinstance(x, pd.DataFrame): x_df = x elif isinstance(x, FixedEffect): x_df = x.m else: x_df = pd.DataFrame(x, columns=names) categorical_warning_threshold = np.minimum(5, x_df.shape[0] - 1) for i, column in enumerate(x_df): if not pd.api.types.is_numeric_dtype(x_df[column]): # Variable is categorical. continue unique_numbers = x_df[column].unique() if 1 < unique_numbers.size < categorical_warning_threshold: if names is not None: name = names[i] else: name = f"Column {i}" logger.warning( f"{name} has {unique_numbers.size} unique values but was supplied as a numeric (i.e. continuous) variable. Should it be a categorical variable? If yes, the easiest way to provide categorical variables is to convert numerics to strings." )
[docs]def to_df( x: Union[int, ArrayLike, "FixedEffect"], n: int = 1, names: Optional[Union[str, Sequence[str]]] = None, idx: Optional[int] = None, ) -> pd.DataFrame: """Convert input to DataFrame. Parameters ---------- x : int, array-like FixedEffect Input data. n : int, optional If input is a scalar, broadcast to column of `n` entries. Default is 1. names : str, sequence of str or None, optional Names for each column in `x`. Default is None. idx : int or None, optional Staring index for variable names of the for `x{i}`. Returns ------- df : DataFrame Input `x` wrapped in a DataFrame. """ if x is None: return pd.DataFrame() has_names = True if isinstance(x, pd.DataFrame): df = x elif isinstance(x, FixedEffect): df = x.m elif isinstance(x, pd.Series): df = pd.DataFrame(x) else: x_2d = np.atleast_2d(x) if x_2d.shape[0] == 1: x_2d = x_2d.T df = pd.DataFrame(x_2d) has_names = False if df.empty: return df if df.size == 1 and n > 1: df = pd.concat([df] * n, ignore_index=True) if names is None and (idx is not None or not has_names): if idx is None: idx = 0 names = ["x%d" % i for i in range(idx, idx + df.shape[1])] if names is not None: if len(names) != df.shape[1]: raise ValueError( f"Number of columns {df.shape[1]} does not " f"coincide with column names {names}" ) df.columns = names return pd.get_dummies(df, dtype=np.int8)
[docs]def get_index(df: pd.DataFrame) -> Optional[int]: """Get index for column names of the form x{i}. If there are none, return 0. Parameters ---------- df : DataFrame Input dataframe. Returns ------- index : int Index for the next x column. If `df` is empty, return None. """ if df.empty: return None r = [re.match(r"^x(\d+)$", c) for c in df.columns] r2 = [int(r1.groups()[0]) for r1 in r if r1 is not None] return 0 if len(r2) == 0 else max(r2) + 1
[docs]def check_duplicate_names( df1: pd.DataFrame, df2: Optional[pd.DataFrame] = None ) -> None: """Check columns with duplicate names. Parameters ---------- df1 : DataFrame Input dataframe. df2 : DataFrame, optional If provided, check that dataframes do not contain columns with same names. Default is None. Raises ------ ValueError If there are columns with duplicate names. """ if df2 is None: names, counts = np.unique(df1.columns, return_counts=True) names = names[counts > 1] else: names = np.intersect1d(df1.columns, df2.columns) if names.size > 0: raise ValueError(f"Variables must have different names: {names}")
[docs]def remove_identical_columns(df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame: """Remove columns with duplicate names across dataframes. Parameters ---------- df1 : DataFrame Input dataframe from which to drop columns. df2 : DataFrame Reference dataFrame Returns ------- DataFrame df1 with columns appearing in df2 removed. Raises ------ ValueError If there are columns with duplicate names but no duplicate values. """ names = np.intersect1d(df1.columns, df2.columns) for col in names: if np.array_equal(df1[col], df2[col]): df1 = df1.drop(col, axis=1) elif (df1[col].size == 1 or df2[col].size == 1) and np.all( df1[col].to_numpy() == df2[col].to_numpy() ): # Assume its an intercept term. df1 = df1.drop(col, axis=1) else: raise ValueError( f"Column {col} must be identical for duplicate removal. Either alter the column name or remove the duplicate data." ) return df1
[docs]def remove_duplicate_columns(df: pd.DataFrame, tol: float = 1e-8) -> List[str]: """Remove duplicate columns. Parameters ---------- df : DataFrame Input dataframe. tol : float, optional Tolerance to assess duplicate columns. Default is 1e-8. Returns ------- columns: list of str Columns to keep after removing duplicates. """ df = df / df.abs().sum(0) df *= 1 / tol # keep = df.round().T.drop_duplicates(keep="last").T.columns # Slow!! idx = np.unique(df.round().values, axis=1, return_index=True)[-1] keep = df.columns[sorted(idx)] return keep
[docs]class FixedEffect(object): """Build a term object for a linear model. Parameters ---------- x : array-like or DataFrame, optional If None, the term is empty. Default is None. names : str or list of str, optional Names for each column in `x`. If None, it defauts to {'x0', 'x1', ...}. Default is None. add_intercept : bool, optional If true, adds an intercept term. Defaults to True. Attributes ---------- x : DataFrame Design matrix. names : list of str Names of columns in the design matrix. See Also -------- MixedEffect: MixedEffect term Examples -------- >>> t = FixedEffect() >>> t.is_empty True >>> t1 = FixedEffect(np.arange(5), names='t1') >>> t2 = FixedEffect(np.random.randn(5, 1), names=['t2']) >>> t3 = t1 + t2 >>> t3.shape (5, 3) """ tolerance = 1e-8
[docs] def __init__( self, x: Optional[Union[ArrayLike, pd.DataFrame]] = None, names: Optional[Union[str, Sequence[str]]] = None, add_intercept: bool = True, _check_categorical: bool = True, ) -> None: if x is None: self.m = pd.DataFrame() return if _check_categorical: check_categorical_variables(x, names) if isinstance(x, FixedEffect): self.m = x.m return if np.isscalar(x) and names is None: names = ["intercept"] if isinstance(names, str): names = [names] self.m = to_df(x, names=names).reset_index(drop=True) if add_intercept and "intercept" not in self.names: self.m.insert(0, "intercept", 1) if _check_categorical: check_duplicate_names(self.m)
def _broadcast( self, t: Union[ArrayLike, "FixedEffect"], idx: Optional[int] = None ) -> pd.DataFrame: df = to_df(t, idx=idx) if self.shape[0] > 1 and df.shape[0] == 1: df = to_df(df, n=self.shape[0]) elif self.shape[0] == 1 and df.shape[0] > 1: self.m = to_df(self.m, n=df.shape[0]) elif not df.empty and self.shape[0] != df.shape[0]: raise ValueError(f"Cannot broadcast shape {df.shape} to " f"{self.shape}.") return df def _add( self, t: Union[ArrayLike, "FixedEffect", "MixedEffect"], side: str = "left" ) -> "FixedEffect": if isinstance(t, MixedEffect): return NotImplemented if self.empty: return FixedEffect(t, add_intercept=False, _check_categorical=False) idx = None if check_names(t) is None: idx = get_index(self.m) df = self._broadcast(t, idx=idx) if df.empty: return self df = remove_identical_columns(df, self.m) # check_duplicate_names(df, df2=self.m) terms = [self.m, df] names = [self.names, list(df.columns)] if side == "right": terms = terms[::-1] names = names[::-1] df = pd.concat(terms, axis=1) df.columns = names[0] + names[1] cols = remove_duplicate_columns(df, tol=self.tolerance) return FixedEffect(df[cols], add_intercept=False, _check_categorical=False) def __add__( self, t: Union[ArrayLike, "FixedEffect", "MixedEffect"] ) -> "FixedEffect": return self._add(t) def __radd__( self, t: Union[ArrayLike, "FixedEffect", "MixedEffect"] ) -> "FixedEffect": return self._add(t, side="right") def __sub__(self, t: Union[ArrayLike, "FixedEffect"]) -> "FixedEffect": if self.empty: return self df = self._broadcast(t) if df.empty: return self df /= df.abs().sum(0) df.index = self.m.index m = self.m / self.m.abs().sum(0) merged = m.T.merge(df.T, how="outer", indicator=True) mask = (merged._merge.values == "left_only")[: self.m.shape[1]] return FixedEffect( self.m[self.m.columns[mask]], add_intercept=False, _check_categorical=False ) def _mul( self, t: Union[ArrayLike, "FixedEffect", "MixedEffect"], side: str = "left" ) -> "FixedEffect": if isinstance(t, MixedEffect): return NotImplemented if self.is_empty: return self if np.isscalar(t): if t == 1: return self m = self.m * t if side == "right": names = [f"{t}*{k}" for k in self.names] else: names = [f"{k}*{t}" for k in self.names] return FixedEffect( m, names=names, add_intercept=False, _check_categorical=False ) df = self._broadcast(t) if df.empty: return FixedEffect() prod = [] names = [] for c in df.columns: prod.append(df[[c]].values * self.m) if side == "left": names.extend([f"{k}*{c}" for k in self.names]) else: names.extend([f"{c}*{k}" for k in self.names]) df = pd.concat(prod, axis=1) df.columns = names cols = remove_duplicate_columns(df, tol=self.tolerance) return FixedEffect(df[cols], add_intercept=False, _check_categorical=False) def __mul__( self, t: Union[ArrayLike, "FixedEffect", "MixedEffect"] ) -> "FixedEffect": return self._mul(t) def __rmul__( self, t: Union[ArrayLike, "FixedEffect", "MixedEffect"] ) -> "FixedEffect": return self._mul(t, side="right") def __pow__(self, p: int) -> "FixedEffect": if p > 1: return self * self ** (p - 1) return self def __repr__(self) -> str: return self.m.__repr__() def _repr_html_(self) -> str: return self.m._repr_html_() @property def is_scalar(self) -> bool: return self.size == 1 @property def matrix(self) -> pd.DataFrame: return self.m @property def names(self) -> List[str]: return object.__getattribute__(self, "m").columns.tolist() @property def is_empty(self) -> bool: return self.m.empty def __getattr__(self, name: str) -> Any: if name in object.__getattribute__(self, "names"): return object.__getattribute__(self, "m")[name].values if name in {"shape", "size", "empty"}: return getattr(self.m, name) return object.__getattribute__(self, name)
[docs]class MixedEffect: """Build a random term object for a linear model. Parameters ---------- ran : array-like or DataFrame, optional For the random effects. If None, the random term is empty. Default is None. fix : array-like or DataFrame, optional If None, the fixed effects. name_ran : str, list, optional Name(s) for the random term(s). If None, it defauts to 'xi'. Default is None. name_fix : str, optional Name for the `fix` term. If None, it defauts to 'xi'. Default is None. ranisvar : bool, optional If True, `ran` is already a term for the variance. Default is False. Attributes ---------- mean : FixedEffect FixedEffect for the mean. variance : FixedEffect FixedEffect for the variance. See Also -------- FixedEffect: FixedEffect object Examples -------- >>> r = MixedEffect() >>> r.is_empty True >>> r2 = MixedEffect(np.arange(5), name_ran='r1') >>> r2.mean.is_empty True >>> r2.variance.shape (25, 2) """
[docs] def __init__( self, ran: Optional[Union[ArrayLike, pd.DataFrame]] = None, fix: Optional[Union[ArrayLike, pd.DataFrame]] = None, name_ran: Optional[Union[str, Sequence[str]]] = None, name_fix: Optional[Union[str, Sequence[str]]] = None, ranisvar: bool = False, add_intercept: bool = True, add_identity: bool = True, _check_categorical: bool = True, ) -> None: if isinstance(ran, MixedEffect): self.mean = ran.mean # type: ignore self.variance = ran.variance # type: ignore return if ran is None: self.variance = FixedEffect() else: if _check_categorical: check_categorical_variables(ran, name_ran) ran = to_df(ran) if not ranisvar: if ran.size == 1: name_ran = "I" v = ran.values.flat[0] if v != 1: name_ran += f"{v}**2" else: if name_ran is None: name = check_names(ran) if name: name_ran = name[0] for i in range(1, len(name)): sm = difflib.SequenceMatcher(None, name_ran, name[i]) match = sm.find_longest_match( 0, len(name_ran), 0, len(name[i]) ) name_ran = name_ran[match.a : match.a + match.size] ran = ran @ ran.T ran = ran.values.ravel() self.variance = FixedEffect( ran, names=name_ran, add_intercept=False, _check_categorical=False ) self.mean = FixedEffect( fix, names=name_fix, add_intercept=add_intercept, _check_categorical=_check_categorical, ) if add_identity: I = MixedEffect( 1, name_ran="I", add_identity=False, _check_categorical=False ) tmp_mixed = self + I self.variance = tmp_mixed.variance self.set_identity_last()
[docs] def set_identity_last(self) -> None: """Sets the identity matrix column last. Raises ------ ValueError Raised if "I" occurs more than once in the names. """ if self.variance.is_empty: return if self.variance.m.size == 1: # Class is the identity. return n = int(round(np.sqrt(self.variance.m.shape[0]))) I = np.reshape(np.identity(n), (-1, 1)) index = np.argwhere(np.all(self.variance.m.to_numpy() == I, axis=0)) if index.size == 0: return elif index.size == 1: names = self.variance.names names.append(names.pop(index[0][0])) self.variance.m = self.variance.m[names] else: raise ValueError("Found the identity matrix twice in the dataframe.")
def broadcast_to(self, r1: "MixedEffect", r2: "MixedEffect") -> FixedEffect: if r1.variance.shape[0] == 1: v = np.eye(max(r2.shape[0], int(np.sqrt(r2.shape[2])))) return FixedEffect( v.ravel(), names="I", add_intercept=False, _check_categorical=False ) return r1.variance def _add( self, r: Union[FixedEffect, "MixedEffect"], side: str = "left" ) -> "MixedEffect": if not isinstance(r, MixedEffect): r = MixedEffect( fix=r, add_intercept=False, add_identity=False, _check_categorical=False ) r.variance = self.broadcast_to(r, self) self.variance = self.broadcast_to(self, r) if side == "left": ran = self.variance + r.variance fix = self.mean + r.mean else: ran = r.variance + self.variance fix = r.mean + self.mean s = MixedEffect( ran=ran, fix=fix, ranisvar=True, add_intercept=False, add_identity=False, _check_categorical=False, ) s.set_identity_last() return s def __add__(self, r: Union[FixedEffect, "MixedEffect"]) -> "MixedEffect": # type: ignore[misc] return self._add(r) def __radd__(self, r: Union[FixedEffect, "MixedEffect"]) -> "MixedEffect": # type: ignore[misc] return self._add(r, side="right") def _sub( self, r: Union[FixedEffect, "MixedEffect"], side: str = "left" ) -> "MixedEffect": if not isinstance(r, MixedEffect): r = MixedEffect(fix=r, add_intercept=False, add_identity=False) r.variance = self.broadcast_to(r, self) self.variance = self.broadcast_to(self, r) if side == "left": ran = self.variance - r.variance fix = self.mean - r.mean else: ran = r.variance - self.variance fix = r.mean - self.mean return MixedEffect( ran=ran, fix=fix, ranisvar=True, add_intercept=False, add_identity=False, _check_categorical=False, ) def __sub__(self, r: Union[FixedEffect, "MixedEffect"]) -> "MixedEffect": return self._sub(r) def __rsub__(self, r: Union[FixedEffect, "MixedEffect"]) -> "MixedEffect": return self._sub(r, side="right") def _mul( self, r: Union[FixedEffect, "MixedEffect"], side: str = "left" ) -> "MixedEffect": if not isinstance(r, MixedEffect): r = MixedEffect(fix=r, add_intercept=False, add_identity=False) r.variance = self.broadcast_to(r, self) self.variance = self.broadcast_to(self, r) if side == "left": ran = self.variance * r.variance fix = self.mean * r.mean else: ran = r.variance * self.variance fix = r.mean * self.mean s = MixedEffect( ran=ran, fix=fix, ranisvar=True, add_intercept=False, add_identity=False, _check_categorical=False, ) if self.mean.matrix.values.size > 0: x = self.mean.matrix.values.T / self.mean.matrix.abs().values.max() t = FixedEffect() for i in range(x.shape[0]): for j in range(i + 1): if i == j: t = t + FixedEffect( np.outer(x[i], x[j]).T.ravel(), names=self.mean.names[i], add_intercept=False, _check_categorical=False, ) else: xs = x[i] + x[j] xs_name = f"({self.mean.names[i]}+{self.mean.names[j]})" xd = x[i] - x[j] xd_name = f"({self.mean.names[i]}-{self.mean.names[j]})" v = np.outer(xs, xs) / 4 t = t + FixedEffect( v.ravel(), names=xs_name, add_intercept=False, _check_categorical=False, ) v = np.outer(xd, xd) / 4 t = t + FixedEffect( v.ravel(), names=xd_name, add_intercept=False, _check_categorical=False, ) s.variance = s.variance + t * r.variance if r.mean.matrix.values.size > 0: x = r.mean.matrix.values.T / r.mean.matrix.abs().values.max() t = FixedEffect() for i in range(x.shape[0]): for j in range(i + 1): if i == j: t = t + FixedEffect( np.outer(x[i], x[j]).ravel(), names=r.mean.names[i], _check_categorical=False, ) else: xs = x[i] + x[j] xs_name = f"({r.mean.names[i]}+{r.mean.names[j]})" xd = x[i] - x[j] xd_name = f"({r.mean.names[i]}-{r.mean.names[j]})" v = np.outer(xs, xs) / 4 t = t + FixedEffect( v.ravel(), names=xs_name, add_intercept=False, _check_categorical=False, ) v = np.outer(xd, xd) / 4 t = t + FixedEffect( v.ravel(), names=xd_name, add_intercept=False, _check_categorical=False, ) s.variance = s.variance + self.variance * t s.set_identity_last() return s def __mul__(self, r: Union[FixedEffect, "MixedEffect"]) -> "MixedEffect": # type: ignore[misc] return self._mul(r) def __rmul__(self, r: Union[FixedEffect, "MixedEffect"]) -> "MixedEffect": # type: ignore[misc] return self._mul(r, side="right") def __pow__(self, p: int) -> "MixedEffect": if p > 1: return self * self ** (p - 1) return self @property def empty(self) -> bool: return self.mean.empty and self.variance.empty @property def shape(self) -> np.ndarray: return self.mean.shape + self.variance.shape def _repr_html_(self) -> str: return ( "Mean:\n" + self.mean._repr_html_() + "\n\nVariance:\n" + self.variance._repr_html_() )