Source code for pymer4.models.Lm2

"""
Pymer4 Lm2 Class
================

Main class for two-stage regression models
"""

from copy import copy
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from patsy import dmatrices
from joblib import Parallel, delayed
from .Lm import Lm
from ..stats import rsquared, rsquared_adj
from ..utils import _sig_stars, _permute_sign, _ols_group, _corr_group, _perm_find


[docs]class Lm2(object): """ Model class to perform two-stage OLS regression. Practically, this class fits a separate Lm() model to each cluster/group in the data and performs inference on the coefficients of each model (i.e. 1-sample t-test per coefficient). The results from this second level regression are reported. This is an alternative to using Lmer, as it implicitly allows intercept and slopes to vary by group, however with no prior/smoothing/regularization on the random effects. See https://bit.ly/2SwHhQU and Gelman (2005). This approach maybe less preferable to Lmer if the number of observations per group are few, but the number of groups is large, in which case the 1st-level estimates are much noisier and are not smoothed/regularized as in Lmer. It maybe preferable when a "maximal" rfx Lmer model is not estimable. Formula specification works just like in R based on columns of a dataframe. Formulae are parsed by patsy which makes it easy to utilize specific columns as factors. This is **different** from Lmer. See patsy for more information on the different use cases. Args: formula (str): Complete lm-style model formula data (pd.DataFrame): input data family (string): what distribution family (i.e.) link function to use for the generalized model; default is gaussian (linear model) group (list/string): the grouping variable to use to run the 1st-level regression; if a list is provided will run multiple levels feeding the coefficients from the previous level into the subsequent level Attributes: fitted (bool): whether model has been fit formula (str): model formula data (pandas.core.frame.DataFrame): model copy of input data grps (dict): groups and number of observations per groups recognized by lmer AIC (float): model akaike information criterion logLike (float): model Log-likelihood family (string): model family warnings (list): warnings output from Python fixef (pd.DataFrame): cluster-level parameters coefs (pd.DataFrame): model summary table of population parameters residuals (numpy.ndarray): model residuals fits (numpy.ndarray): model fits/predictions se_type (string): how standard errors are computed sig_type (string): how inference is performed """ def __init__(self, formula, data, group, family="gaussian"): self.family = family # implemented_fams = ['gaussian','binomial'] if self.family != "gaussian": raise NotImplementedError( "Currently only linear (family ='gaussian') models supported! " ) if isinstance(group, str): self.group = group else: raise TypeError("group must be a string or list") self.fitted = False self.formula = formula.replace(" ", "") self.data = copy(data) self.AIC = None self.logLike = None self.warnings = [] self.residuals = None self.fixef = None self.coefs = None self.model_obj = None self.ci_type = None self.se_type = None self.sig_type = None self.ranked_data = False self.iscorrs = False def __repr__(self): out = "{}(fitted={}, formula={}, family={}, group={})".format( self.__class__.__module__, self.fitted, self.formula, self.family, self.group, ) return out
[docs] def fit( self, robust=False, conf_int="standard", permute=False, perm_on="t-stat", rank=False, verbose=False, n_boot=500, n_jobs=1, n_lags=1, to_corrs=False, ztrans_corrs=True, cluster=None, **kwargs, ): """ Fit a variety of second-level OLS models; all 1st-level models are standard OLS. By default will fit a model that makes parametric assumptions (under a t-distribution) replicating the output of software like R. 95% confidence intervals (CIs) are also estimated parametrically by default. However, empirical bootstrapping can also be used to compute CIs, which will resample with replacement from the first level regression estimates and uses these CIs to perform inference unless permutation tests are requested. Permutation testing will perform a one-sample sign-flipped permutation test on the estimates directly (perm_on='coef') or the t-statistic (perm_on='t-stat'). Permutation is a bit different than Lm which always permutes based on the t-stat. Heteroscedasticity robust standard errors can also be computed, but these are applied at the second-level, *not* at the first level. See the Lm() documentatation for more information about robust standard errors. Args: robust (bool/str): whether to use heteroscedasticity robust s.e. and optionally which estimator type to use ('hc0','hc3','hac','cluster'). If robust = True, default robust estimator is 'hc0'; default False conf_int (str): whether confidence intervals should be computed through bootstrap ('boot') or assuming a t-distribution ('standard'); default 'standard' permute (int): if non-zero, computes parameter significance tests by permuting t-stastics rather than parametrically; works with robust estimators perm_on (str): permute based on a null distribution of the 'coef' of first-level estimates or the 't-stat' of first-level estimates; default 't-stat' rank (bool): convert all predictors and dependent variable to ranks before estimating model; default False to_corrs (bool/string): for each first level model estimate a semi-partial or partial correlations instead of betas and perform inference over these partial correlation coefficients. *note* this is different than Lm(); default False ztrans_corrs (bool): whether to fisher-z transform (arcsin) first-level correlations before running second-level model. Ignored if to_corrs is False; default True summarize (bool): whether to print a model summary after fitting; default True verbose (bool): whether to print which model, standard error, confidence interval, and inference type are being fitted n_boot (int): how many bootstrap resamples to use for confidence intervals (ignored unless conf_int='boot') n_jobs (int): number of cores for parallelizing bootstrapping or permutations; default 1 n_lags (int): number of lags for robust estimator type 'hac' (ignored unless robust='hac'); default 1 cluster (str): column name identifying clusters/groups for robust estimator type 'cluster' (ignored unless robust='cluster') Returns: DataFrame: R style summary() table """ # Alllow summary or summarize for compatibility if "summary" in kwargs and "summarize" in kwargs: raise ValueError( "You specified both summary and summarize, please prefer summarize" ) summarize = kwargs.pop("summarize", True) summarize = kwargs.pop("summary", summarize) if robust: if isinstance(robust, bool): robust = "hc0" self.se_type = "robust" + " (" + robust + ")" if cluster: if cluster not in self.data.columns: raise ValueError( "cluster identifier must be an existing column in data" ) else: cluster = self.data[cluster] else: self.se_type = "non-robust" self.ci_type = ( conf_int + " (" + str(n_boot) + ")" if conf_int == "boot" else conf_int ) if isinstance(to_corrs, str): if to_corrs not in ["semi", "partial"]: raise ValueError("to_corrs must be 'semi' or 'partial'") if (conf_int == "boot") and (permute is None): self.sig_type = "bootstrapped" else: if permute: if perm_on not in ["coef", "t-stat"]: raise ValueError("perm_on must be 't-stat' or 'coef'") self.sig_type = "permutation" + " (" + str(permute) + ")" if permute is True: raise TypeError( "permute should 'False' or the number of permutations to perform" ) else: self.sig_type = "parametric" # Parallelize regression computation for 1st-level models par_for = Parallel(n_jobs=n_jobs, backend="multiprocessing") if rank: self.ranked_data = True else: self.ranked_data = False if to_corrs: # Loop over each group and get semi/partial correlation estimates # Reminder len(betas) == len(betas) - 1, from normal OLS, since corr of intercept is not computed # TODO: Update _corr_group like _ols_group below betas = par_for( delayed(_corr_group)( self.data, self.formula, self.group, self.data[self.group].unique()[i], self.ranked_data, to_corrs, ) for i in range(self.data[self.group].nunique()) ) if ztrans_corrs: betas = np.arctanh(betas) else: betas = np.array(betas) else: # Loop over each group and fit a separate regression out = par_for( delayed(_ols_group)( self.data, self.formula, self.group, self.data[self.group].unique()[i], self.ranked_data, ) for i in range(self.data[self.group].nunique()) ) betas = np.array([e["betas"] for e in out]) fits = np.concatenate([e["pred"] for e in out], axis=0) residuals = np.concatenate([e["res"] for e in out], axis=0) self.residuals = residuals self.data["residuals"] = residuals self.fits = fits self.data["fits"] = fits # Get the model matrix formula from patsy to make it more reliable to set the results dataframe index like Lmer _, x = dmatrices(self.formula, self.data, 1, return_type="dataframe") self.design_matrix = x # Perform an intercept only regression for each beta results = [] perm_ps = [] for i in range(betas.shape[1]): df = pd.DataFrame({"X": np.ones_like(betas[:, i]), "Y": betas[:, i]}) lm = Lm("Y ~ 1", data=df) lm.fit( robust=robust, conf_int=conf_int, summarize=False, n_boot=n_boot, n_jobs=n_jobs, n_lags=n_lags, ) results.append(lm.coefs) if permute: # sign-flip permutation test for each beta instead to replace p-values if perm_on == "coef": return_stat = "mean" else: return_stat = "t-stat" seeds = np.random.randint(np.iinfo(np.int32).max, size=permute) par_for = Parallel(n_jobs=n_jobs, backend="multiprocessing") perm_est = par_for( delayed(_permute_sign)( data=betas[:, i], seed=seeds[j], return_stat=return_stat ) for j in range(permute) ) perm_est = np.array(perm_est) if perm_on == "coef": perm_ps.append(_perm_find(perm_est, betas[:, i].mean())) else: perm_ps.append(_perm_find(perm_est, lm.coefs["T-stat"].values)) results = pd.concat(results, axis=0) ivs = self.formula.split("~")[-1].strip().split("+") ivs = [e.strip() for e in ivs] if to_corrs: intercept_pd = dict() for c in results.columns: intercept_pd[c] = np.nan intercept_pd = pd.DataFrame(intercept_pd, index=[0]) results = pd.concat([intercept_pd, results], ignore_index=True) results.index = x.columns self.coefs = results if to_corrs: self.fixef = pd.DataFrame(betas, columns=ivs) else: self.fixef = pd.DataFrame(betas, columns=x.columns) self.fixef.index = self.data[self.group].unique() self.fixef.index.name = self.group if permute: # get signifance stars sig = [_sig_stars(elem) for elem in perm_ps] # Replace dof and p-vales with permutation results if conf_int != "boot": self.coefs = self.coefs.drop(columns=["DF", "P-val"]) if to_corrs: self.coefs["Num_perm"] = [np.nan] + [permute] * ( self.coefs.shape[0] - 1 ) self.coefs["Sig"] = [np.nan] + sig self.coefs["Perm-P-val"] = [np.nan] + perm_ps else: self.coefs["Num_perm"] = [permute] * self.coefs.shape[0] self.coefs["Sig"] = sig self.coefs["Perm-P-val"] = perm_ps self.coefs = self.coefs[ [ "Estimate", "2.5_ci", "97.5_ci", "SE", "Num_perm", "T-stat", "Perm-P-val", "Sig", ] ] self.fitted = True self.iscorrs = to_corrs # Fit statistics if "Intercept" in self.design_matrix.columns: center_tss = True else: center_tss = False # NOTE: This are rudimentary calculations because there are a variety of ways # to compute R^2 metrics for LMM models. Most however use intermediate # calculations from running a "true" MLM like Lmer rather than a two-stage # regression like Lm2. Here we compute the "naive" R^2 over the whole dataset, # and the same metrics on a per-group basis to give the user flexibilty. We # don't compute anything from the second-level fits or residuals since those # are just univariate mean tests. if not self.iscorrs: # Method 1: "naive" over the whole dataset self.rsquared = rsquared(fits, residuals, center_tss) self.rsquared_adj = rsquared_adj( self.rsquared, len(residuals), len(residuals) - x.shape[1], center_tss ) # Method 2: calculated separately group. Potentially useful for inspecting 1st # level model fits separate_results = [(e["pred"], e["res"]) for e in out] self.rsquared_per_group = np.array( [rsquared(e[0], e[1], center_tss) for e in separate_results] ) self.rsquared_adj_per_group = np.array( [ rsquared_adj( self.rsquared_per_group[i], len(separate_results[i][0]), len(separate_results[i][0]) - x.shape[1], center_tss, ) for i in range(len(self.rsquared_per_group)) ] ) if summarize: return self.summary()
[docs] def summary(self): """ Summarize the output of a fitted model. Returns: pd.DataFrame: R/statsmodels style summary """ if not self.fitted: raise RuntimeError("Model must be fitted to generate summary!") print("Formula: {}\n".format(self.formula)) print("Family: {}\n".format(self.family)) print( "Std-errors: {}\tCIs: {} 95%\tInference: {} \n".format( self.se_type, self.ci_type, self.sig_type ) ) print( "Number of observations: %s\t Groups: %s\n" % (self.data.shape[0], {str(self.group): self.data[self.group].nunique()}) ) print("Fixed effects:\n") if self.iscorrs: if self.iscorrs == "semi": corr = "semi-partial" else: corr = self.iscorrs print("Note: {} correlations reported".format(corr)) return self.coefs.round(3)
[docs] def plot_summary( self, figsize=(12, 6), error_bars="ci", ranef=True, axlim=None, ranef_alpha=0.5, coef_fmt="o", orient="v", **kwargs, ): """ Create a forestplot overlaying estimated coefficients with first-level effects. By default display the 95% confidence intervals computed during fitting. Args: error_bars (str): one of 'ci' or 'se' to change which error bars are plotted; default 'ci' ranef (bool): overlay BLUP estimates on figure; default True axlim (tuple): lower and upper limit of plot; default min and max of BLUPs ranef_alpha (float): opacity of random effect points; default .5 coef_fmt (str): matplotlib marker style for population coefficients Returns: plt.axis: matplotlib axis handle """ if not self.fitted: raise RuntimeError("Model must be fit before plotting!") if orient not in ["h", "v"]: raise ValueError("orientation must be 'h' or 'v'") m_ranef = self.fixef m_fixef = self.coefs.drop("Intercept", axis=0) if error_bars == "ci": col_lb = (m_fixef["Estimate"] - m_fixef["2.5_ci"]).values col_ub = (m_fixef["97.5_ci"] - m_fixef["Estimate"]).values elif error_bars == "se": col_lb, col_ub = m_fixef["SE"], m_fixef["SE"] # For seaborn m = pd.melt(m_ranef) _, ax = plt.subplots(1, 1, figsize=figsize) if ranef: alpha_plot = ranef_alpha else: alpha_plot = 0 if orient == "v": x_strip = "value" x_err = m_fixef["Estimate"] y_strip = "variable" y_err = range(m_fixef.shape[0]) xerr = [col_lb, col_ub] yerr = None ax.vlines( x=0, ymin=-1, ymax=self.coefs.shape[0], linestyles="--", color="grey" ) if not axlim: xlim = (m["value"].min() - 1, m["value"].max() + 1) else: xlim = axlim ylim = None else: y_strip = "value" y_err = m_fixef["Estimate"] x_strip = "variable" x_err = range(m_fixef.shape[0]) yerr = [col_lb, col_ub] xerr = None ax.hlines( y=0, xmin=-1, xmax=self.coefs.shape[0], linestyles="--", color="grey" ) if not axlim: ylim = (m["value"].min() - 1, m["value"].max() + 1) else: ylim = axlim xlim = None sns.stripplot( x=x_strip, y=y_strip, data=m, ax=ax, size=6, alpha=alpha_plot, color="grey" ) ax.errorbar( x=x_err, y=y_err, xerr=xerr, yerr=yerr, fmt=coef_fmt, capsize=0, elinewidth=4, color="black", ms=12, zorder=9999999999, ) ax.set(ylabel="", xlabel="Estimate", xlim=xlim, ylim=ylim) sns.despine(top=True, right=True, left=True) return ax