# type: ignore
""" Standard Linear regression models. """
import warnings
from cmath import sqrt
from pathlib import Path
from pprint import pformat
from typing import Optional, Sequence, Tuple, Union
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import statsmodels.api as sm
from scipy.stats import skew, kurtosis
from brainspace.mesh.mesh_elements import get_cells, get_points
from brainspace.vtk_interface.wrappers.data_object import BSPolyData
from nibabel.nifti1 import Nifti1Image
from brainstat._typing import ArrayLike
from brainstat.datasets import fetch_parcellation, fetch_template_surface
from brainstat.datasets.base import fetch_template_surface, fetch_yeo_networks_metadata
from brainstat.mesh.utils import _mask_edges, mesh_edges
from brainstat.stats.terms import FixedEffect, MixedEffect
from brainstat.stats.utils import apply_mask, undo_mask
[docs]class SLM:
"""Core Class for running BrainStat linear models"""
# Import class methods
from ._linear_model import _linear_model
from ._multiple_comparisons import _fdr, _random_field_theory
from ._t_test import _t_test
[docs] def __init__(
self,
model: Union[FixedEffect, MixedEffect],
contrast: ArrayLike,
surf: Optional[Union[str, dict, BSPolyData]] = None,
mask: Optional[ArrayLike] = None,
*,
correction: Optional[Union[str, Sequence[str]]] = None,
thetalim: float = 0.01,
drlim: float = 0.1,
two_tailed: bool = True,
cluster_threshold: float = 0.001,
data_dir: Optional[Union[str, Path]] = None
) -> None:
"""Constructor for the SLM class.
Parameters
----------
model : brainstat.stats.terms.FixedEffect, brainstat.stats.terms.MixedEffect
The linear model to be fitted of dimensions (observations, predictors).
Note that, for volumetric input, BrainStat follows Fortran (MATLAB)
convention for ordering voxels, i.e. the first dimension changes
first.
contrast : array-like
Vector of contrasts in the observations.
surf : str, dict, BSPolyData, Nifti1Image, optional
A surface provided as either a dictionary with keys 'tri' for its
faces (n-by-3 array) and 'coord' for its coordinates (3-by-n array),
or as a BrainSpace BSPolyData object, a string containing a template
name accepted by fetch_template_surface, or a Nifti1Image wherein 0
denotes excluded voxels and any other value denotes included voxels,
by default None.
mask : array-like, optional
A mask containing True for vertices to include in the analysis, by
default None.
correction : str, Sequence, optional
String or sequence of strings. If it contains "rft" a random field
theory multiple comparisons correction will be run. If it contains
"fdr" a false discovery rate multiple comparisons correction will be
run. Both may be provided. By default None.
thetalim : float, optional
Lower limit on variance coefficients in standard deviations, by
default 0.01.
drlim : float, optional
Step of ratio of variance coefficients in standard deviations, by
default 0.1.
two_tailed : bool, optional
Determines whether to return two-tailed or one-tailed p-values. Note
that multivariate analyses can only be two-tailed, by default True.
cluster_threshold : float, optional
P-value threshold or statistic threshold for defining clusters in
random field theory, by default 0.001.
data_dir : str, pathlib.Path, optional
Path to the location to store BrainStat data files, defaults to
$HOME_DIR/brainstat_data.
"""
# Input arguments.
self.model = model
self.contrast = np.array(contrast)
if isinstance(surf, str):
self.surf_name = surf
self.surf = fetch_template_surface(self.surf_name)
else:
self.surf_name = None
self.surf = surf
self.mask = mask
self.correction = [correction] if isinstance(correction, str) else correction
self.niter = 1
self.thetalim = thetalim
self.drlim = drlim
self.two_tailed = two_tailed
self.cluster_threshold = cluster_threshold
self.data_dir = data_dir
# Error check
if self.surf is None:
if self.correction is not None and "rft" in self.correction:
raise ValueError("Random Field Theory corrections require a surface.")
# We have to initialize fit parameters for our unit tests here.
# TODO: remove this requirement.
self._reset_fit_parameters()
[docs] def fit(self, Y: np.ndarray) -> None:
"""Fits the SLM model
Parameters
----------
Y : numpy.array
Input data (observation, vertex, variate)
Raises
------
ValueError
An error will be thrown when multivariate data is provided and a
one-tailed test is requested.
"""
if Y.ndim < 2 or Y.ndim > 3:
raise ValueError("Input data must be two or three dimensional.")
elif Y.ndim == 3:
if (not self.two_tailed) and Y.shape[2] > 1:
raise NotImplementedError(
"One-tailed tests are not implemented for multivariate data."
)
if Y.shape[2] > 3 and "rft" in self.correction:
raise NotImplementedError(
"Random Field Theory corrections are not implemented for more than three variates."
)
student_t_test = Y.shape[2] == 1
else:
student_t_test = True
self._reset_fit_parameters()
if self.mask is not None:
Y_masked = apply_mask(Y, self.mask, axis=1)
else:
Y_masked = Y.copy()
self._linear_model(Y_masked)
self._t_test()
if self.mask is not None:
self._unmask()
if self.correction is not None:
self.multiple_comparison_corrections(student_t_test)
[docs] def qc(self, Y=None, feat=None, v=None, histo=True, qq=True):
"""Quality check of the data (author: @saratheriver)
Parameters
----------
Y : numpy.array
Input data (observation, vertex, variate)
feat : numpy.array, optional
Dimension of variate to qc. Default is 0 - assuming 2D matrix.
v : numpy.array, optional
specify vertex or parcel number. Default to all.
histo : bool, optional
Outputs histogram of the residuals. Default is True.
qq : bool, optional
Outputs qq plot of the residuals. Default is True.
Returns
-------
sk : ndarray
Skewness of residuals distribution
ku : ndarray
Kurtosis of residuals distribution
"""
if Y is None:
raise ValueError("Input data must be provided.")
elif Y.ndim < 2 or Y.ndim > 3:
raise ValueError("Input data must be two or three dimensional.")
if feat is not None and Y.ndim == 3:
Y = np.squeeze(Y[:, :, feat])
if v is None:
v = list(range(Y.shape[1]))
# Histogram of the residuals
if histo:
fig, ax = plt.subplots(1, figsize=(8, 6))
plt.hist(Y[:, v] - np.dot(self.X, self.coef[:, v]), edgecolor="k")
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
plt.title("Histogram of the residuals")
# qqplot of the residuals
if qq:
fig, ax = plt.subplots(1, figsize=(8, 6))
sm.qqplot(
Y[:, v] - np.dot(self.X, self.coef[:, v]),
line="q",
ax=ax,
markerfacecolor="k",
markeredgecolor="w",
markersize=8.88,
)
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.get_lines()[1].set_color("black")
ax.get_lines()[1].set_linewidth("2.8")
plt.title("QQ plot of sample data versus standard normal")
# Characterize distribution based on two statistical moments at each vertex
sk = np.empty((Y.shape[1], 1))
ku = np.empty_like(sk)
for ii in range(Y.shape[1]):
sk[ii] = skew(Y[:, ii] - np.dot(self.X, self.coef[:, ii]))
ku[ii] = kurtosis(Y[:, ii] - np.dot(self.X, self.coef[:, ii]))
np.nan_to_num(sk, nan=-np.inf)
np.nan_to_num(ku, nan=-np.inf)
return sk, ku
[docs] def multiple_comparison_corrections(self, student_t_test: bool) -> None:
"""Performs multiple comparisons corrections. If a (one-sided) student-t
test was run, then make it two-tailed if requested."""
P1, Q1 = self._run_multiple_comparisons()
if self.two_tailed and student_t_test:
self.t = -self.t
P2, Q2 = self._run_multiple_comparisons()
self.t = -self.t
self.P = _merge_rft(P1, P2)
self.Q = _merge_fdr(Q1, Q2)
else:
self.P = P1
# Make variable type consistent with two one tailed outputs.
for field in ["peak", "clus", "clusid"]:
if self.P is not None and field in self.P:
if field == "clusid":
self.P[field] = [self.P[field]]
else:
for field2 in self.P[field].keys():
self.P[field][field2] = [self.P[field][field2]]
self.Q = Q1
self._surfstat_to_brainstat_rft()
def _run_multiple_comparisons(self) -> Tuple[Optional[dict], Optional[np.ndarray]]:
"""Runs the multiple comparisons tests and returns their outputs.
Returns
-------
dict, None
Results of random_field_theory. None if not requested.
np.array, None
Results of fdr. None if not requested.
"""
P = None
Q = None
if "rft" in self.correction:
P = {}
P["pval"], P["peak"], P["clus"], P["clusid"] = self._random_field_theory()
if "fdr" in self.correction:
Q = self._fdr()
return P, Q
def _reset_fit_parameters(self) -> None:
"""Sets empty parameters before fitting. Prevents issues arising from
using the same object to fit twice.
"""
self.X = None
self.t = None
self.df = None
self.SSE = None
self.coef = None
self.V = None
self.k = None
self.r = None
self.dr = None
self.resl = None
self.c = None
self.ef = None
self.sd = None
self.dfs = None
self.P = None
self.Q = None
self.du = None
def _surfstat_to_brainstat_rft(self) -> None:
"""Convert SurfStat RFT output to BrainStat RFT output.
In short, we convert the peak/clus arrays to pandas dataframes and add Yeo networks.
"""
if self.P is not None:
if "peak" in self.P:
for i in range(len(self.P["peak"]["t"])):
if self.P["peak"]["vertid"][i] is not None:
self.P["peak"]["vertid"][i] = self.P["peak"]["vertid"][
i
].astype(int)
if self.P["peak"]["clusid"][i] is not None:
self.P["peak"]["clusid"][i] = self.P["peak"]["clusid"][
i
].astype(int)
if self.surf_name in (
"fsaverage",
"fsaverage5",
"fslr32k",
"civet41k",
"civet164k",
):
yeo7 = fetch_parcellation(
self.surf_name, "yeo", 7, data_dir=self.data_dir
)
yeo_names, _ = fetch_yeo_networks_metadata(7)
yeo_names.insert(0, "Undefined")
yeo7_index = yeo7[self.P["peak"]["vertid"][i]]
if "yeo7" not in self.P["peak"]:
self.P["peak"]["yeo7"] = []
self.P["peak"]["yeo7"].append(np.take(yeo_names, yeo7_index))
if "clus" in self.P:
for i in range(len(self.P["clus"]["clusid"])):
if self.P["clus"]["clusid"][i] is not None:
self.P["clus"]["clusid"][i] = self.P["clus"]["clusid"][
i
].astype(int)
none_squeeze = lambda x: np.squeeze(x) if x is not None else None
for field in ["peak", "clus"]:
P_tmp = []
for i in range(len(self.P[field]["P"])):
tail_dict = {
key: none_squeeze(value[i])
for key, value in self.P[field].items()
}
if tail_dict["P"] is not None:
if tail_dict["P"].size == 1:
P_tmp.append(pd.DataFrame.from_dict([tail_dict]))
else:
P_tmp.append(pd.DataFrame.from_dict(tail_dict))
P_tmp[i].sort_values(by="P", ascending=True)
else:
P_tmp.append(pd.DataFrame(columns=tail_dict.keys()))
self.P[field] = P_tmp
def _unmask(self) -> None:
"""Changes all masked parameters to their input dimensions."""
simple_unmask_parameters = ["t", "coef", "SSE", "r", "ef", "sd", "dfs"]
for key in simple_unmask_parameters:
attr = getattr(self, key)
if attr is not None:
setattr(self, key, undo_mask(attr, self.mask, axis=1))
# slm.resl unmask
if self.resl is not None:
edges = mesh_edges(self.surf)
_, idx = _mask_edges(edges, self.mask)
self.resl = undo_mask(self.resl, idx, axis=0)
def __str__(self) -> str:
"""Returns a string representation of the model."""
return pformat(
{key: value for key, value in self.__dict__.items() if not callable(value)}
)
""" Property specifications. """
@property
def surf(self) -> Union[BSPolyData, dict, Nifti1Image, None]:
return self._surf
@surf.setter
def surf(self, value):
self._surf = value
if self.surf is not None:
if isinstance(self.surf, BSPolyData):
self.tri = np.array(get_cells(self.surf)) + 1
self.coord = np.array(get_points(self.surf)).T
elif isinstance(self.surf, Nifti1Image):
self.lat = self.surf.get_fdata() != 0
else:
if "tri" in value:
self.tri = value["tri"]
self.coord = value["coord"]
elif "lat" in value:
self.lat = value["lat"]
self.coord = value["coord"]
@surf.deleter
def surf(self):
del self._surf
@property
def tri(self) -> np.ndarray:
if hasattr(self, "_tri"):
return self._tri
else:
return None
@tri.setter
def tri(self, value):
if value is not None and np.any(value < 0):
raise ValueError("Triangle indices must be non-negative.")
self._tri = value
@tri.deleter
def tri(self):
del self._tri
@property
def lat(self) -> np.ndarray:
if hasattr(self, "_lat"):
return self._lat
else:
return None
@lat.setter
def lat(self, value):
self._lat = value
@lat.deleter
def lat(self):
del self._lat
def _merge_rft(P1: dict, P2: dict) -> dict:
"""Merge two one-tailed outputs of the random_field_theory function.
Parameters
----------
P1 : dict
Output dict of random_field_theory
P2 : dict
Output dict of random_field_theory
Returns
-------
dict
Two-tailed version of the inputs.
"""
if P1 is None and P2 is None:
return None
P = {}
for key1 in P1 or P2:
P[key1] = {}
if key1 == "clusid":
P[key1] = [P1[key1], P2[key1]]
continue
for key2 in P1[key1]:
if key1 == "pval":
P[key1][key2] = _onetailed_to_twotailed(P1[key1][key2], P2[key1][key2])
else:
P[key1][key2] = [P1[key1][key2], P2[key1][key2]]
return P
def _merge_fdr(Q1: Optional[ArrayLike], Q2: Optional[ArrayLike]) -> np.ndarray:
"""Merge two one-tailed outputs of the fdr function.
Parameters
----------
Q1 : array-like, None
Q-values
Q2 : array-like, None
Q-values
Returns
-------
np.ndarray
Two-tailed FDR p-values
"""
if Q1 is None and Q2 is None:
return None
return _onetailed_to_twotailed(Q1, Q2)
def _onetailed_to_twotailed(p1: ArrayLike, p2: ArrayLike) -> np.ndarray:
"""Converts two one-tailed tests to a two-tailed test"""
if p1 is None and p2 is None:
return None
elif p1 is None:
p1 = p2
elif p2 is None:
p2 = p1
return np.minimum(np.minimum(p1, p2) * 2, 1)
[docs]def f_test(slm1: SLM, slm2: SLM) -> SLM:
"""F-statistics for comparing two uni- or multi-variate fixed effects models.
Parameters
----------
slm1 : brainstat.stats.SLM.SLM
Standard linear model returned by the t_test function; see Notes for
details.
slm2 : brainstat.stats.SLM.SLM
Standard linear model returned by the t_test function; see Notes for
details.
Returns
-------
brainstat.stats.SLM.SLM
Standard linear model with f-test results included.
"""
if slm1.r is not None or slm2.r is not None:
warnings.warn("Mixed effects models not programmed yet.")
slm = SLM(FixedEffect(1), FixedEffect(1))
if slm1.df > slm2.df:
X1 = slm1.X
X2 = slm2.X
df1 = slm1.df
df2 = slm2.df
SSE1 = slm1.SSE
SSE2 = slm2.SSE
for key in slm2.__dict__:
setattr(slm, key, getattr(slm2, key))
else:
X1 = slm2.X
X2 = slm1.X
df1 = slm2.df
df2 = slm1.df
SSE1 = slm2.SSE
SSE2 = slm1.SSE
for key in slm1.__dict__:
setattr(slm, key, getattr(slm1, key))
r = X1 - np.dot(np.dot(X2, np.linalg.pinv(X2)), X1)
d = np.sum(r.flatten() ** 2) / np.sum(X1.flatten() ** 2)
if d > np.spacing(1):
print("Models are not nested.")
return
slm.df = np.array([[df1 - df2, df2]])
h = SSE1 - SSE2
# if slm['coef'] is 3D and third dimension is 1, then squeeze it to 2D
if np.ndim(slm.coef) == 3 and np.shape(slm.coef)[2] == 1:
x1, x2, x3 = np.shape(slm.coef)
slm.coef = slm.coef.reshape(x1, x2)
if np.ndim(slm.coef) == 2:
slm.k = np.array(1)
slm.t = np.dot(h / (SSE2 + (SSE2 <= 0)) * (SSE2 > 0), df2 / (df1 - df2))
elif np.ndim(slm.coef) > 2:
k2, v = np.shape(SSE2)
k = np.around((np.sqrt(1 + 8 * k2) - 1) / 2)
slm.k = np.array(k)
if k > 3:
print("Roy's max root for k>3 not programmed yet.")
return
l = min(k, df1 - df2)
slm.t = np.zeros((int(l), int(v)))
if k == 2:
det = SSE2[0, :] * SSE2[2, :] - SSE2[1, :] ** 2
a11 = SSE2[2, :] * h[0, :] - SSE2[1, :] * h[1, :]
a21 = SSE2[0, :] * h[1, :] - SSE2[1, :] * h[0, :]
a12 = SSE2[2, :] * h[1, :] - SSE2[1, :] * h[2, :]
a22 = SSE2[0, :] * h[2, :] - SSE2[1, :] * h[1, :]
a0 = a11 * a22 - a12 * a21
a1 = (a11 + a22) / 2
s1 = np.array([sqrt(x) for x in (a1**2 - a0)]).real
d = (df2 / (df1 - df2)) / (det + (det <= 0)) * (det > 0)
slm.t[0, :] = (a1 + s1) * d
if l == 2:
slm.t[1, :] = (a1 - s1) * d
if k == 3:
det = (
SSE2[0, :] * (SSE2[2, :] * SSE2[5, :] - SSE2[4, :] ** 2)
- SSE2[5, :] * SSE2[1, :] ** 2
+ SSE2[3, :] * (SSE2[1, :] * SSE2[4, :] * 2 - SSE2[2, :] * SSE2[3, :])
)
m1 = SSE2[2, :] * SSE2[5, :] - SSE2[4, :] ** 2
m3 = SSE2[0, :] * SSE2[5, :] - SSE2[3, :] ** 2
m6 = SSE2[0, :] * SSE2[2, :] - SSE2[1, :] ** 2
m2 = SSE2[3, :] * SSE2[4, :] - SSE2[1, :] * SSE2[5, :]
m4 = SSE2[1, :] * SSE2[4, :] - SSE2[2, :] * SSE2[3, :]
m5 = SSE2[1, :] * SSE2[3, :] - SSE2[0, :] * SSE2[4, :]
a11 = m1 * h[0, :] + m2 * h[1, :] + m4 * h[3, :]
a12 = m1 * h[1, :] + m2 * h[2, :] + m4 * h[4, :]
a13 = m1 * h[3, :] + m2 * h[4, :] + m4 * h[5, :]
a21 = m2 * h[0, :] + m3 * h[1, :] + m5 * h[3, :]
a22 = m2 * h[1, :] + m3 * h[2, :] + m5 * h[4, :]
a23 = m2 * h[3, :] + m3 * h[4, :] + m5 * h[5, :]
a31 = m4 * h[0, :] + m5 * h[1, :] + m6 * h[3, :]
a32 = m4 * h[1, :] + m5 * h[2, :] + m6 * h[4, :]
a33 = m4 * h[3, :] + m5 * h[4, :] + m6 * h[5, :]
a0 = (
-a11 * (a22 * a33 - a23 * a32)
+ a12 * (a21 * a33 - a23 * a31)
- a13 * (a21 * a32 - a22 * a31)
)
a1 = a22 * a33 - a23 * a32 + a11 * a33 - a13 * a31 + a11 * a22 - a12 * a21
a2 = -(a11 + a22 + a33)
q = a1 / 3 - a2**2 / 9
r = (a1 * a2 - 3 * a0) / 6 - a2**3 / 27
s1 = (r + [sqrt(x) for x in (q**3 + r**2)]) ** (1 / 3)
z = np.zeros((3, v))
z[0, :] = 2 * s1.real - a2 / 3
z[1, :] = -s1.real - a2 / 3 + np.sqrt(3) * s1.imag
z[2, :] = -s1.real - a2 / 3 - np.sqrt(3) * s1.imag
if not np.count_nonzero(z) == 0:
z.sort(axis=0)
z = z[::-1]
d = df2 / (df1 - df2) / (det + (det <= 0)) * (det > 0)
for j in range(0, l):
slm.t[j, :] = z[j, :] * d
return slm