1import numpy as np
2from rpy2.robjects.conversion import localconverter
3from rpy2.robjects.packages import importr
4from rpy2.robjects import pandas2ri, numpy2ri
5import rpy2.robjects as ro
6import polars as pl
7from polars import selectors as cs
8from functools import wraps
9
10lib_base = importr("base")
11
12__all__ = [
13 "polars2R",
14 "R2polars",
15 "R2numpy",
16 "numpy2R",
17 "to_dict",
18 "ensure_r_input",
19 "ensure_py_output",
20 "sanitize_polars_columns",
21 "con2R",
22 "convert_argkwarg_model",
23 "convert_argkwarg_dataframe",
24 "convert_argkwarg_list",
25 "convert_argkwarg_numpy",
26 "convert_argkwarg_none",
27 "convert_argkwarg_dict",
28]
29
30
[docs]
31def to_dict(listVector):
32 """Recursively convert an R ListVector into a Python dict with all Python types. Ignores R 'call' and 'terms'. Useful for seeing an ``lm()`` or ``lmer()`` model object or the output of ``summary()`` as a Python dict."""
33
34 if not isinstance(listVector, ro.vectors.ListVector):
35 raise TypeError("Input must be an R ListVector")
36
37 temp = dict(zip(listVector.names, listVector))
38
39 out = dict()
40 for key, orig_value in temp.items():
41 # Ignore formula and call
42 if key in ["call", "terms"]:
43 continue
44 # Numerics
45 elif isinstance(
46 orig_value,
47 (ro.vectors.FloatVector, ro.vectors.IntVector, ro.vectors.BoolVector),
48 ):
49 new_value = R2numpy(orig_value)
50
51 # Nested ListVectors
52 elif isinstance(orig_value, ro.vectors.ListVector):
53 try:
54 new_value = to_dict(orig_value)
55 except Exception:
56 # raise Exception(f"Failed on key = {key}")
57 new_value = orig_value
58
59 # StrVectors
60 elif isinstance(orig_value, ro.vectors.StrVector):
61 new_value = list(orig_value)
62
63 # Data frames
64 elif isinstance(orig_value, ro.vectors.DataFrame) or (
65 hasattr(orig_value, "rclass") and "tbl_df" in list(orig_value.rclass)
66 ):
67 new_value = R2polars(orig_value)
68
69 else:
70 # warn(f"Ignoring Unhandled: Key: {key}, Type: {type(orig_value)}")
71 continue
72
73 out[key] = new_value
74 return out
75
76
[docs]
77def convert_argkwarg_dataframe(arg):
78 """Convert args/kwargs that are Python DataFrames to proper R type(s)"""
79 if isinstance(arg, pl.DataFrame):
80 return polars2R(arg)
81 return arg
82
83
[docs]
84def convert_argkwarg_list(arg):
85 """Convert args/kwargs that are Python lists to proper R type(s)"""
86 if isinstance(arg, list):
87 if any(isinstance(elem, str) for elem in arg):
88 arg = ro.StrVector(arg)
89 elif any(isinstance(elem, float) for elem in arg):
90 arg = ro.FloatVector(arg)
91 elif any(isinstance(elem, int) for elem in arg):
92 arg = ro.IntVector(arg)
93 elif any(isinstance(elem, bool) for elem in arg):
94 arg = ro.BoolVector(arg[0], bool)
95 return arg
96
97
98def convert_argkwarg_numpy(arg):
99 if isinstance(arg, np.ndarray):
100 return numpy2R(arg)
101 return arg
102
103
[docs]
104def convert_argkwarg_none(arg):
105 """Convert args/kwargs that are Python None to proper R type(s)"""
106 if arg is None:
107 # arg = ro.NA_Real
108 arg = ro.NULL
109 return arg
110
111
[docs]
112def convert_argkwarg_dict(arg):
113 """Convert args/kwargs that are Python dicts to proper R type(s)"""
114 if isinstance(arg, dict):
115 out = dict()
116 for key, value in arg.items():
117 if isinstance(value, pl.DataFrame):
118 out[key] = convert_argkwarg_dataframe(value)
119 elif isinstance(value, list):
120 out[key] = convert_argkwarg_list(value)
121 elif isinstance(value, np.ndarray):
122 out[key] = convert_argkwarg_numpy(value)
123 elif value is None:
124 out[key] = convert_argkwarg_none(value)
125 elif isinstance(value, dict):
126 # Recursive
127 out[key] = ro.ListVector(convert_argkwarg_dict(value))
128 else:
129 out[key] = value
130
131 return out
132 return arg
133
134
[docs]
135def convert_argkwarg_model(arg):
136 """Convert arg/kwargs that are pymer4 model objects to access their r_model attribute"""
137 from ..models.base import model
138
139 if isinstance(arg, model):
140 return arg.r_model
141 return arg
142
143
[docs]
144def polars2R(df):
145 """Local conversion of polars dataframe to R dataframe as recommended by rpy2"""
146 if isinstance(df, pl.DataFrame):
147 with localconverter(ro.default_converter + pandas2ri.converter):
148 data = ro.conversion.get_conversion().py2rpy(df.to_pandas())
149 return data
150 return df
151
152
[docs]
153def R2polars(rdf):
154 """Local conversion of R dataframe to polars as recommended by rpy2"""
155 if not isinstance(rdf, pl.DataFrame):
156 with localconverter(ro.default_converter + pandas2ri.converter):
157 pandas_df = ro.conversion.get_conversion().rpy2py(rdf)
158 pandas_df = pandas_df.map(
159 lambda elem: np.nan if elem is ro.NA_Character else elem
160 )
161 pl_df = pl.from_pandas(pandas_df)
162 pl_df = sanitize_polars_columns(pl_df)
163 return pl_df
164 return rdf
165
166
[docs]
167def numpy2R(arr):
168 """Local conversion of numpy array to R array as recommended by rpy2"""
169 if isinstance(arr, np.ndarray):
170 with localconverter(ro.default_converter + numpy2ri.converter):
171 data = ro.conversion.get_conversion().py2rpy(arr)
172 return data
173 return arr
174
175
[docs]
176def R2numpy(rarr):
177 """Local conversion of R array to numpy as recommended by rpy2"""
178 if not isinstance(rarr, np.ndarray):
179 return np.asarray(rarr)
180 return rarr
181
182
183def _Rdot_to_Pyunder(df):
184 """Replace all column names including '.' with '_' and strip whitespace"""
185 new_cols = []
186 for c in df.columns:
187 c = c.strip("")
188 c = c.replace(".", "_")
189 c = c.lstrip("_")
190 new_cols.append(c)
191 df.columns = new_cols
192 return df
193
194
[docs]
195def ensure_py_output(func):
196 """
197 Decorator that converts R outputs to Python equivalents. Currently this includes:
198
199 - R FloatVector -> numpy array
200 - R StrVector -> list
201 - R dataframe/tibble -> polars dataframe
202 - R ListVector of Dataframes -> list of polars dataframes
203 """
204
205 @wraps(func)
206 def wrapper(*args, **kwargs):
207 result = func(*args, **kwargs)
208
209 # Check if result is an R FloatVector
210 if isinstance(result, ro.vectors.FloatVector):
211 result = R2numpy(result)
212
213 # Check if result is an R ListVector
214 elif isinstance(result, ro.vectors.StrVector):
215 result = list(result)
216
217 # Check if result is an R dataframe
218 elif isinstance(result, ro.vectors.DataFrame):
219 result = R2polars(result)
220
221 # Check if result is a tibble
222 elif hasattr(result, "rclass") and "tbl_df" in list(result.rclass):
223 result = R2polars(result)
224
225 # Check if result is an R ListVector
226 # typically a list of DataFrames
227 elif isinstance(result, ro.vectors.ListVector):
228 if isinstance(result[0], ro.vectors.DataFrame):
229 out = []
230 for df in result:
231 row_names = list(lib_base.row_names(df))
232 df = (
233 R2polars(df)
234 .with_columns(level=np.array(row_names))
235 .select("level", cs.exclude("level"))
236 )
237 out.append(df)
238 result = out
239 if len(result) == 1:
240 result = result[0]
241
242 return result
243
244 return wrapper
245
246
247def _drop_rownames(result):
248 """
249 Drops the `rownames` column some R funcs add
250 """
251
252 return result.drop("rownames", strict=False)
253
254
[docs]
255def sanitize_polars_columns(result):
256 """
257 Clean up polars columns using auxillary functions
258 """
259
260 result = _Rdot_to_Pyunder(result)
261 result = _drop_rownames(result)
262
263 return result
264
265
295
296
[docs]
297def con2R(arr):
298 """
299 Convert human-readable contrasts into a form that R requires. Works like the `make.contrasts() <https://www.rdocumentation.org/packages/gmodels/versions/2.18.1/topics/make.contrasts>`_ function from the `gmodels <https://cran.r-project.org/web/packages/gmodels/index.html>`_ package, in that it will auto-solve for the remaining orthogonal k-1 contrasts if fewer than k-1 contrasts are specified.
300
301 Arguments:
302 arr (np.ndarray): 1d or 2d numpy array with each row reflecting a unique contrast and each column a factor level
303
304 Returns:
305 A 2d numpy array useable with the contrasts argument of R models
306 """
307
308 if isinstance(arr, list):
309 arr = np.array(arr)
310 if arr.ndim < 2:
311 arr = np.atleast_2d(arr)
312 elif arr.ndim > 2:
313 raise ValueError(
314 f"input array should be 1d or 2d but a {arr.ndim}d array was passed"
315 )
316
317 nrow, ncol = arr.shape[0], arr.shape[1]
318
319 # At most k-1 contrasts are possible
320 if nrow >= ncol:
321 raise ValueError(
322 f"Too many contrasts requested ({nrow}). Must be less than the number of factor levels ({ncol})."
323 )
324
325 # Pseudo-invert request contrasts
326 value = np.linalg.pinv(arr)
327 v_nrow, v_ncol = value.shape[0], value.shape[1]
328
329 # Upper triangle of R is the same as result from qr() in R
330 Q, R = np.linalg.qr(np.column_stack([np.ones((v_nrow, 1)), value]), mode="complete")
331 if np.linalg.matrix_rank(R) != v_ncol + 1:
332 raise ValueError(
333 "Singular contrast matrix. Some of the requested contrasts are perfectly co-linear."
334 )
335 cm = Q[:, 1:ncol]
336 cm[:, :v_ncol] = value
337
338 return cm
339
340
341def R2con(arr):
342 """
343 Convert R-flavored contrast matrix to intepretable contrasts as would be specified by user. `Reference <https://goo.gl/E4Mms2>`_
344
345 Args:
346 arr (np.ndarry): 2d contrast matrix output from R's contrasts() function.
347
348 Returns:
349 np.ndarray: 2d array organized as contrasts X factor levels
350 """
351
352 intercept = np.ones((arr.shape[0], 1))
353 mat = np.column_stack([intercept, arr])
354 inv = np.linalg.inv(mat)
355 return inv