Source code for brainstat.stats.SLM

""" Standard Linear regression models. """
import warnings
import numpy as np
from cmath import sqrt
from .terms import Term
from .utils import apply_mask, undo_mask
from brainstat.mesh.utils import mesh_edges, _mask_edges


[docs]class SLM: """Core Class for running BrainStat linear models""" # Import class methods from ._t_test import t_test from ._linear_model import linear_model from ._multiple_comparisons import fdr, random_field_theory
[docs] def __init__( self, model, contrast, surf=None, mask=None, *, correction=None, niter=1, thetalim=0.01, drlim=0.1, two_tailed=True, cluster_threshold=0.001, ): """Constructor for the SLM class. Parameters ---------- model : brainstat.stats.terms.Term The linear model to be fitted of dimensions (observations, predictors). contrast : array-like, brainstat.stats.terms.Term Vector of contrasts in the observations. surf : dict, BSPolyData, optional A surface provided as either a dictionary with keys 'tri' for its faces (n-by-3 array) and 'coord' for its coordinates (3-by-n array), or as a BrainSpace BSPolyData object by default None. mask : array-like, optional A mask containing True for vertices to include in the analysis, by default None. correction : str, list, optional String or list of strings. If it contains "rft" a random field theory multiple comparisons correction will be run. If it contains "fdr" a false discovery rate multiple comparisons correction will be run. Both may be provided. By default None. niter : int, optional Number of iterations of the Fisher scoring algorithm for fitting mixed effects models, by default 1. thetalim : float, optional Lower limit on variance coefficients in standard deviations, by default 0.01. drlim : float, optional Step of ratio of variance coefficients in standard deviations, by default 0.1. two_tailed : bool, optional Determines whether to return two-tailed or one-tailed p-values. Note that multivariate analyses can only be two-tailed, by default True. cluster_threshold : float, optional P-value threshold or statistic threshold for defining clusters in random field theory, by default 0.001. """ # Input arguments. self.model = model self.contrast = contrast self.surf = surf self.mask = mask self.correction = correction if isinstance(self.correction, str): self.correction = [self.correction] self.niter = niter self.thetalim = thetalim self.drlim = drlim self.two_tailed = two_tailed self.cluster_threshold = cluster_threshold # Error check if self.surf is None: if self.correction is not None and "rft" in self.correction: raise ValueError("Random Field Theory corrections require a surface.") # We have to initialize fit parameters for our unit tests here. # TODO: remove this requirement. self._reset_fit_parameters()
[docs] def fit(self, Y): """Fits the SLM model Parameters ---------- Y : numpy.array Input data (observation, vertex, variate) Raises ------ ValueError An error will be thrown when multivariate data is provided and a one-tailed test is requested. """ if Y.ndim > 2: if (not self.two_tailed) and Y.shape[2] > 1: raise ValueError( "One-tailed tests are not implemented for multivariate data." ) self._reset_fit_parameters() if self.mask is not None: Y = apply_mask(Y, self.mask, axis=1) self.linear_model(Y) self.t_test() if self.mask is not None: self._unmask() if self.correction is not None: self.multiple_comparison_corrections()
[docs] def multiple_comparison_corrections(self): """Performs multiple comparisons corrections.""" P1, Q1 = self._run_multiple_comparisons() if self.two_tailed: self.t = -self.t P2, Q2 = self._run_multiple_comparisons() self.t = -self.t self.P = _merge_rft(P1, P2) self.Q = _merge_fdr(Q1, Q2) else: self.P = P1 self.Q = Q1
def _run_multiple_comparisons(self): """Runs the multiple comparisons tests and returns their outputs. Returns ------- dict, None Results of random_field_theory. None if not requested. np.array, None Results of fdr. None if not requested. """ P = None Q = None if "rft" in self.correction: P = {} P["pval"], P["peak"], P["clus"], P["clusid"] = self.random_field_theory() if "fdr" in self.correction: Q = self.fdr() return P, Q def _reset_fit_parameters(self): """Sets empty parameters before fitting. Prevents issues arising from using the same object to fit twice. """ self.X = None self.t = None self.df = None self.SSE = None self.coef = None self.V = None self.k = None self.r = None self.dr = None self.resl = None self.tri = None self.lat = None self.c = None self.ef = None self.sd = None self.dfs = None self.P = None self.Q = None self.du = None def _unmask(self): """Changes all masked parameters to their input dimensions.""" simple_unmask_parameters = ["t", "coef", "SSE", "r", "ef", "sd", "dfs"] for key in simple_unmask_parameters: attr = getattr(self, key) if attr is not None: setattr(self, key, undo_mask(attr, self.mask, axis=1)) # slm.resl unmask if self.resl is not None: edges = mesh_edges(self.surf) _, idx = _mask_edges(edges, self.mask) self.resl = undo_mask(self.resl, idx, axis=0)
def _merge_rft(P1, P2): """Merge two one-tailed outputs of the random_field_theory function. Parameters ---------- P1 : dict Output dict of random_field_theory P2 : dict Output dict of random_field_theory Returns ------- dict Two-tailed version of the inputs. """ if P1 is None and P2 is None: return None P = {} for key1 in P1: P[key1] = {} if key1 == "clusid": P[key1] = [P1[key1], P2[key1]] continue for key2 in P1[key1]: if key2 == "P" and key1 == "pval": P[key1][key2] = _onetailed_to_twotailed(P1[key1][key2], P2[key1][key2]) else: P[key1][key2] = [P1[key1][key2], P2[key1][key2]] return P def _merge_fdr(Q1, Q2): """Merge two one-tailed outputs of the fdr function. Parameters ---------- Q1 : array-like, None Q-values Q2 : array-like, None Q-values Returns ------- array-like Two-tailed FDR p-values """ if Q1 is None and Q2 is None: return None return _onetailed_to_twotailed(Q1, Q2) def _onetailed_to_twotailed(p1, p2): """ Converts two one-tailed tests to a two-tailed test """ return np.minimum(np.minimum(p1, p2) * 2, 1)
[docs]def f_test(slm1, slm2): """F-statistics for comparing two uni- or multi-variate fixed effects models. Parameters ---------- slm1 : brainstat.stats.SLM.SLM Standard linear model returned by the t_test function; see Notes for details. slm2 : brainstat.stats.SLM.SLM Standard linear model returned by the t_test function; see Notes for details. Returns ------- brainstat.stats.SLM.SLM Standard linear model with f-test results included. """ if slm1.r is not None or slm2.r is not None: warnings.warn("Mixed effects models not programmed yet.") slm = SLM(Term(1), Term(1)) if slm1.df > slm2.df: X1 = slm1.X X2 = slm2.X df1 = slm1.df df2 = slm2.df SSE1 = slm1.SSE SSE2 = slm2.SSE for key in slm2.__dict__: setattr(slm, key, getattr(slm2, key)) else: X1 = slm2.X X2 = slm1.X df1 = slm2.df df2 = slm1.df SSE1 = slm2.SSE SSE2 = slm1.SSE for key in slm1.__dict__: setattr(slm, key, getattr(slm1, key)) r = X1 - np.dot(np.dot(X2, np.linalg.pinv(X2)), X1) d = np.sum(r.flatten() ** 2) / np.sum(X1.flatten() ** 2) if d > np.spacing(1): print("Models are not nested.") return slm.df = np.array([[df1 - df2, df2]]) h = SSE1 - SSE2 # if slm['coef'] is 3D and third dimension is 1, then squeeze it to 2D if np.ndim(slm.coef) == 3 and np.shape(slm.coef)[2] == 1: x1, x2, x3 = np.shape(slm.coef) slm.coef = slm.coef.reshape(x1, x2) if np.ndim(slm.coef) == 2: slm.k = np.array(1) slm.t = np.dot(h / (SSE2 + (SSE2 <= 0)) * (SSE2 > 0), df2 / (df1 - df2)) elif np.ndim(slm.coef) > 2: k2, v = np.shape(SSE2) k = np.around((np.sqrt(1 + 8 * k2) - 1) / 2) slm.k = np.array(k) if k > 3: print("Roy's max root for k>3 not programmed yet.") return l = min(k, df1 - df2) slm.t = np.zeros((int(l), int(v))) if k == 2: det = SSE2[0, :] * SSE2[2, :] - SSE2[1, :] ** 2 a11 = SSE2[2, :] * h[0, :] - SSE2[1, :] * h[1, :] a21 = SSE2[0, :] * h[1, :] - SSE2[1, :] * h[0, :] a12 = SSE2[2, :] * h[1, :] - SSE2[1, :] * h[2, :] a22 = SSE2[0, :] * h[2, :] - SSE2[1, :] * h[1, :] a0 = a11 * a22 - a12 * a21 a1 = (a11 + a22) / 2 s1 = np.array([sqrt(x) for x in (a1 ** 2 - a0)]).real d = (df2 / (df1 - df2)) / (det + (det <= 0)) * (det > 0) slm.t[0, :] = (a1 + s1) * d if l == 2: slm.t[1, :] = (a1 - s1) * d if k == 3: det = ( SSE2[0, :] * (SSE2[2, :] * SSE2[5, :] - SSE2[4, :] ** 2) - SSE2[5, :] * SSE2[1, :] ** 2 + SSE2[3, :] * (SSE2[1, :] * SSE2[4, :] * 2 - SSE2[2, :] * SSE2[3, :]) ) m1 = SSE2[2, :] * SSE2[5, :] - SSE2[4, :] ** 2 m3 = SSE2[0, :] * SSE2[5, :] - SSE2[3, :] ** 2 m6 = SSE2[0, :] * SSE2[2, :] - SSE2[1, :] ** 2 m2 = SSE2[3, :] * SSE2[4, :] - SSE2[1, :] * SSE2[5, :] m4 = SSE2[1, :] * SSE2[4, :] - SSE2[2, :] * SSE2[3, :] m5 = SSE2[1, :] * SSE2[3, :] - SSE2[0, :] * SSE2[4, :] a11 = m1 * h[0, :] + m2 * h[1, :] + m4 * h[3, :] a12 = m1 * h[1, :] + m2 * h[2, :] + m4 * h[4, :] a13 = m1 * h[3, :] + m2 * h[4, :] + m4 * h[5, :] a21 = m2 * h[0, :] + m3 * h[1, :] + m5 * h[3, :] a22 = m2 * h[1, :] + m3 * h[2, :] + m5 * h[4, :] a23 = m2 * h[3, :] + m3 * h[4, :] + m5 * h[5, :] a31 = m4 * h[0, :] + m5 * h[1, :] + m6 * h[3, :] a32 = m4 * h[1, :] + m5 * h[2, :] + m6 * h[4, :] a33 = m4 * h[3, :] + m5 * h[4, :] + m6 * h[5, :] a0 = ( -a11 * (a22 * a33 - a23 * a32) + a12 * (a21 * a33 - a23 * a31) - a13 * (a21 * a32 - a22 * a31) ) a1 = a22 * a33 - a23 * a32 + a11 * a33 - a13 * a31 + a11 * a22 - a12 * a21 a2 = -(a11 + a22 + a33) q = a1 / 3 - a2 ** 2 / 9 r = (a1 * a2 - 3 * a0) / 6 - a2 ** 3 / 27 s1 = (r + [sqrt(x) for x in (q ** 3 + r ** 2)]) ** (1 / 3) z = np.zeros((3, v)) z[0, :] = 2 * s1.real - a2 / 3 z[1, :] = -s1.real - a2 / 3 + np.sqrt(3) * s1.imag z[2, :] = -s1.real - a2 / 3 - np.sqrt(3) * s1.imag if not np.count_nonzero(z) == 0: z.sort(axis=0) z = z[::-1] d = df2 / (df1 - df2) / (det + (det <= 0)) * (det > 0) for j in range(0, l): slm.t[j, :] = z[j, :] * d return slm