Skip to content

statspai.utils

utils

Utility functions for StatsPAI.

Provides Stata-style data manipulation tools: - Variable labels (label_var, get_label) - Pairwise correlation with stars (pwcorr) - Winsorization (winsor)

label_var

label_var(df: DataFrame, var: str, label: str) -> None

Attach a human-readable label to a variable.

Equivalent to Stata's label variable wage "Monthly wage (CNY)".

Parameters:

Name Type Description Default
df DataFrame

DataFrame to label (modified in place).

required
var str

Column name.

required
label str

Human-readable label.

required

Examples:

>>> sp.label_var(df, 'wage', 'Monthly wage (CNY)')
>>> sp.label_var(df, 'edu', 'Years of education')

label_vars

label_vars(df: DataFrame, labels: Dict[str, str]) -> None

Attach labels to multiple variables at once.

Parameters:

Name Type Description Default
df DataFrame
required
labels dict

{column_name: label_string} mapping.

required

Examples:

>>> sp.label_vars(df, {
...     'wage': 'Monthly wage (CNY)',
...     'edu': 'Years of education',
...     'exp': 'Work experience (years)',
... })

get_label

get_label(df: DataFrame, var: str) -> str

Get the label for a variable, falling back to the column name.

Parameters:

Name Type Description Default
df DataFrame
required
var str
required

Returns:

Type Description
str

The label if set, otherwise the column name itself.

get_labels

get_labels(df: DataFrame) -> Dict[str, str]

Get all variable labels as a dictionary.

Returns:

Type Description
dict

{column_name: label} for all labeled columns. Unlabeled columns map to their own name.

describe

describe(df: DataFrame, columns: Optional[list] = None) -> DataFrame

Stata-style describe — variable names, types, labels, and non-missing counts in one table.

Parameters:

Name Type Description Default
df DataFrame
required
columns list

Subset of columns. Default: all.

None

Returns:

Type Description
DataFrame

Columns: variable, type, n, n_missing, label.

Examples:

>>> sp.describe(df)
   variable    type     n  n_missing              label
0      wage  float64  1000         0  Monthly wage (CNY)
1       edu    int64  1000         0  Years of education
2    female    int64   998         2            female

pwcorr

pwcorr(data: DataFrame, vars: Optional[List[str]] = None, stars: bool = True, method: str = 'pearson', decimals: int = 3, output: str = 'text') -> Union[str, DataFrame]

Pairwise correlation matrix with significance stars.

Equivalent to Stata's pwcorr var1 var2 var3, star(0.05).

Parameters:

Name Type Description Default
data DataFrame
required
vars list of str

Variables to correlate. Default: all numeric columns.

None
stars bool

Show significance stars (* p<0.1, ** p<0.05, *** p<0.01).

True
method str

'pearson', 'spearman', or 'kendall'.

'pearson'
decimals int

Decimal places.

3
output str

'text', 'dataframe', 'latex', 'html'.

'text'

Returns:

Type Description
str or DataFrame

Formatted correlation matrix.

Examples:

>>> print(sp.pwcorr(df, vars=['wage', 'education', 'experience']))
                wage    education  experience
wage           1.000
education     0.523***   1.000
experience    0.412***   0.156**     1.000

winsor

winsor(data: DataFrame, vars: Optional[List[str]] = None, cuts: Tuple[float, float] = (1, 99), replace: bool = False, suffix: str = '_w') -> DataFrame

Winsorize variables at specified percentiles.

Equivalent to Stata's winsor2 var1 var2, cuts(1 99).

Parameters:

Name Type Description Default
data DataFrame
required
vars list of str

Variables to winsorize. Default: all numeric columns.

None
cuts tuple of (float, float)

Lower and upper percentile cutoffs.

(1, 99)
replace bool

If True, overwrite original columns. If False, create new columns with suffix.

False
suffix str

Suffix for new winsorized columns (when replace=False).

'_w'

Returns:

Type Description
DataFrame

DataFrame with winsorized variables.

Examples:

>>> # Winsorize at 1st and 99th percentile
>>> df = sp.winsor(df, vars=['wage', 'income'], cuts=(1, 99))
>>> # Now df has 'wage_w' and 'income_w' columns
>>> # Replace in place
>>> df = sp.winsor(df, vars=['wage'], replace=True)
Notes

Winsorization replaces values below the cuts[0]-th percentile with that percentile value, and values above the cuts[1]-th percentile with that percentile value. Unlike trimming, winsorization does not remove observations.

rowmean

rowmean(data: DataFrame, vars: Sequence[str]) -> Series

Row-wise mean across vars, ignoring NaN (Stata egen rowmean).

rowtotal

rowtotal(data: DataFrame, vars: Sequence[str]) -> Series

Row-wise sum across vars, treating NaN as 0 (Stata egen rowtotal).

rowmax

rowmax(data: DataFrame, vars: Sequence[str]) -> Series

Row-wise max across vars, ignoring NaN.

rowmin

rowmin(data: DataFrame, vars: Sequence[str]) -> Series

Row-wise min across vars, ignoring NaN.

rowsd

rowsd(data: DataFrame, vars: Sequence[str]) -> Series

Row-wise standard deviation across vars (Stata egen rowsd).

rowcount

rowcount(data: DataFrame, vars: Sequence[str]) -> Series

Row-wise count of non-missing values (Stata egen rownonmiss).

rank

rank(data: DataFrame, var: str, by: Optional[Union[str, Sequence[str]]] = None, method: str = 'average', ascending: bool = True) -> Series

Rank a variable, optionally within groups.

Equivalent to Stata's egen newvar = rank(var), by(group) with method mapping to Stata's tie-handling options:

  • 'average' → Stata default
  • 'min'field
  • 'dense'track
  • 'first'unique

Parameters:

Name Type Description Default
data DataFrame
required
var str

Column to rank.

required
by str or list of str

Group variables. If given, ranks are computed within each group.

None
method (average, min, max, dense, first)
'average'
ascending bool
True

Returns:

Type Description
pd.Series aligned to ``data.index``.

outlier_indicator

outlier_indicator(data: DataFrame, vars: Sequence[str], cuts: tuple = (1, 99), combined: bool = False) -> DataFrame

Flag outlier observations by percentile cutoffs.

Returns 0/1 indicator columns for values outside [cuts[0], cuts[1]] percentile bounds. Mirrors the pywinsor2.outlier_indicator API.

Parameters:

Name Type Description Default
data DataFrame
required
vars list of str

Columns to check.

required
cuts tuple of (float, float)

Lower and upper percentile cutoffs.

(1, 99)
combined bool

If True, also add a column _outlier_any marking rows that are outliers on any variable.

False

Returns:

Type Description
DataFrame

Copy of data with added {var}_outlier 0/1 columns.

read_data

read_data(path: str, encoding: Optional[str] = None, **kwargs) -> DataFrame

Read data from any common format, preserving variable labels.

Automatically detects format from file extension and stores Stata/SPSS/SAS variable labels in df.attrs['_labels'].

Parameters:

Name Type Description Default
path str

File path. Supported: .dta, .csv, .xlsx, .xls, .parquet, .sas7bdat, .sav, .feather, .json.

required
encoding str

Character encoding (for CSV).

None
**kwargs

Passed to the underlying pandas reader.

{}

Returns:

Type Description
DataFrame

With variable labels in df.attrs['_labels'] if available.

Examples:

>>> df = sp.read_data('survey.dta')
>>> sp.describe(df)  # shows variable labels from Stata
>>> sp.get_label(df, 'wage')
'Monthly wage in CNY'
>>> df = sp.read_data('data.csv')  # CSV has no labels
>>> sp.label_vars(df, {'wage': 'Monthly wage', 'edu': 'Education'})

scalar_iv_projection

scalar_iv_projection(data: DataFrame, treat: str, instruments: List[str], covariates: Optional[List[str]] = None, new_col: Optional[str] = None, return_column: bool = False) -> Union[DataFrame, Series]

Project a vector of instruments onto a scalar first-stage index.

Fits the first-stage OLS regression::

treat ~ intercept + Z1 + Z2 + ... + X1 + X2 + ...

and returns the in-sample fitted values. This scalar "first-stage index" can then be passed as a single-instrument input to estimators that don't accept vector-valued Z (e.g. StatsPAI's PLIV / IIVM implementations).

Parameters:

Name Type Description Default
data DataFrame
required
treat str

Endogenous treatment variable name.

required
instruments list of str

Excluded-instrument column names. Must have length ≥ 1.

required
covariates list of str

Exogenous controls to include in the first stage.

None
new_col str

Name of the column for the scalar index. Defaults to f'{treat}_iv_hat'.

None
return_column bool

If True, return just the fitted pandas Series. If False, return a copy of data with the new column appended.

False

Returns:

Type Description
DataFrame or Series

By default, data with the fitted-value column appended. With return_column=True, returns the fitted-value Series.

Examples:

>>> # PLIV with two excluded instruments — project first
>>> df_aug = sp.scalar_iv_projection(
...     df, treat='schooling',
...     instruments=['quarter_of_birth', 'distance_to_college'],
...     covariates=['age', 'parent_edu'],
... )
>>> result = sp.dml(df_aug, y='earnings', treat='schooling',
...                 covariates=['age', 'parent_edu'],
...                 model='pliv',
...                 instrument='schooling_iv_hat')
Notes

Projecting instruments onto a scalar index loses power relative to a full multi-instrument 2SLS when the instruments are heterogeneous, but the bias of the causal estimate remains valid under the same exogeneity conditions. A proper multi-instrument DML (vector r(X) and a Cragg-Donald-style first-stage test) is on the roadmap.

dgp_did

dgp_did(n_units: int = 100, n_periods: int = 10, effect: float = 0.5, staggered: bool = False, n_groups: int = 4, heterogeneous: bool = False, seed: int | None = None) -> DataFrame

Generate Difference-in-Differences panel data.

Parameters:

Name Type Description Default
n_units int

Number of cross-sectional units.

100
n_periods int

Number of time periods.

10
effect float

Average treatment effect on the treated.

0.5
staggered bool

If True, treatment adoption is staggered across groups.

False
n_groups int

Number of treatment-timing groups (used when staggered=True).

4
heterogeneous bool

If True, the treatment effect varies by unit.

False
seed int or None

Random seed for reproducibility.

None

Returns:

Type Description
DataFrame

Columns: unit, time, y, treated, first_treat, group. df.attrs['true_effect'] stores the average treatment effect.

Examples:

>>> df = dgp_did(n_units=50, n_periods=8, effect=1.0, seed=0)
>>> df.shape
(400, 6)

dgp_rd

dgp_rd(n: int = 1000, effect: float = 0.3, cutoff: float = 0.0, fuzzy: bool = False, bandwidth_relevant: float = 0.5, seed: int | None = None) -> DataFrame

Generate Regression Discontinuity data (sharp or fuzzy).

Parameters:

Name Type Description Default
n int

Sample size.

1000
effect float

Treatment effect at the cutoff.

0.3
cutoff float

RD cutoff value.

0.0
fuzzy bool

If True, generate a fuzzy RD design.

False
bandwidth_relevant float

Standard deviation of the running variable (controls spread).

0.5
seed int or None

Random seed.

None

Returns:

Type Description
DataFrame

Columns: y, x, treatment.

Examples:

>>> df = dgp_rd(n=500, effect=0.5, seed=0)
>>> 'treatment' in df.columns
True

dgp_rd_kink

dgp_rd_kink(n: int = 2000, kink: float = 0.8, cutoff: float = 0.0, seed: int | None = None) -> DataFrame

Generate Regression Kink Design data (Card et al. 2015).

The treatment function has a kink (change in slope) at the cutoff. The true kink effect on the outcome equals kink.

DGP: T = X + kink_T * max(X, 0), Y = 0.4*T + noise True RKD = 0.4 * kink_T / kink_T = kink (simplified)

Parameters:

Name Type Description Default
n int

Sample size.

2000
kink float

True kink in slope at cutoff.

0.8
cutoff float

Kink point location.

0.0
seed int or None

Random seed.

None

Returns:

Type Description
DataFrame

Columns: y, x, treatment.

Examples:

>>> df = dgp_rd_kink(n=2000, kink=0.8, seed=42)
>>> df.attrs['true_kink']
0.8

dgp_rd_multi

dgp_rd_multi(n: int = 3000, cutoffs: list | None = None, effects: list | None = None, seed: int | None = None) -> DataFrame

Generate multi-cutoff RD data.

Parameters:

Name Type Description Default
n int

Sample size.

3000
cutoffs list of float

Cutoff values. Default [0.0, 1.0].

None
effects list of float

Treatment effects at each cutoff. Default [2.0, 3.0].

None
seed int or None

Random seed.

None

Returns:

Type Description
DataFrame

Columns: y, x, z (covariate).

Examples:

>>> df = dgp_rd_multi(n=3000, seed=42)
>>> df.attrs['true_effects']
{0.0: 2.0, 1.0: 3.0}

dgp_rd_hte

dgp_rd_hte(n: int = 3000, ate: float = 2.0, hte_coef: float = 1.5, cutoff: float = 0.0, seed: int | None = None) -> DataFrame

Generate RD data with heterogeneous treatment effects.

CATE(z) = ate + hte_coef * z, where z ~ N(0,1). So ATE = ate and the slope of heterogeneity = hte_coef.

Parameters:

Name Type Description Default
n int

Sample size.

3000
ate float

Average treatment effect at cutoff.

2.0
hte_coef float

Slope of heterogeneity w.r.t. covariate z.

1.5
cutoff float

RD cutoff.

0.0
seed int or None

Random seed.

None

Returns:

Type Description
DataFrame

Columns: y, x, z.

Examples:

>>> df = dgp_rd_hte(n=3000, ate=2.0, hte_coef=1.5, seed=42)
>>> df.attrs['true_ate']
2.0
>>> df.attrs['true_hte_coef']
1.5

dgp_rd_2d

dgp_rd_2d(n: int = 2000, effect: float = 2.0, seed: int | None = None) -> DataFrame

Generate 2D boundary RD data.

Treatment is assigned when x1 >= 0 (vertical boundary). True effect = effect.

Parameters:

Name Type Description Default
n int

Sample size.

2000
effect float

Treatment effect at boundary.

2.0
seed int or None

Random seed.

None

Returns:

Type Description
DataFrame

Columns: y, x1, x2, d (treatment).

Examples:

>>> df = dgp_rd_2d(n=2000, effect=2.0, seed=42)
>>> df.attrs['true_effect']
2.0

dgp_rdit

dgp_rdit(n_periods: int = 200, effect: float = 2.0, cutoff_period: int = 100, seasonality: bool = True, seed: int | None = None) -> DataFrame

Generate Regression Discontinuity in Time data (Hausman-Rapson 2018).

Parameters:

Name Type Description Default
n_periods int

Number of time periods.

200
effect float

Treatment effect at the policy change date.

2.0
cutoff_period int

Period of the policy change.

100
seasonality bool

Include monthly seasonality.

True
seed int or None

Random seed.

None

Returns:

Type Description
DataFrame

Columns: y, time, date.

Examples:

>>> df = dgp_rdit(n_periods=200, effect=2.0, seed=42)
>>> df.attrs['true_effect']
2.0

dgp_iv

dgp_iv(n: int = 500, effect: float = 0.5, first_stage: float = 0.4, n_instruments: int = 1, seed: int | None = None) -> DataFrame

Generate Instrumental Variables data with endogeneity.

Parameters:

Name Type Description Default
n int

Sample size.

500
effect float

True causal effect of treatment on outcome.

0.5
first_stage float

Strength of the instrument in the first stage.

0.4
n_instruments int

Number of instruments.

1
seed int or None

Random seed.

None

Returns:

Type Description
DataFrame

Columns: y, treatment, instrument (or instrument_1, ...), x1, x2.

Examples:

>>> df = dgp_iv(n=300, effect=0.5, seed=0)
>>> df.attrs['true_effect']
0.5

dgp_rct

dgp_rct(n: int = 500, effect: float = 0.3, p_treat: float = 0.5, heterogeneous: bool = False, n_covariates: int = 3, seed: int | None = None) -> DataFrame

Generate Randomised Controlled Trial data.

Parameters:

Name Type Description Default
n int

Sample size.

500
effect float

Average treatment effect.

0.3
p_treat float

Probability of assignment to treatment.

0.5
heterogeneous bool

If True, the treatment effect varies with the first covariate.

False
n_covariates int

Number of pre-treatment covariates.

3
seed int or None

Random seed.

None

Returns:

Type Description
DataFrame

Columns: y, treatment, x1, x2, ...

Examples:

>>> df = dgp_rct(n=200, effect=1.0, seed=0)
>>> df['treatment'].mean()  # approximately 0.5
0.525

dgp_panel

dgp_panel(n_units: int = 100, n_periods: int = 20, fe_sd: float = 1.0, te_sd: float = 0.5, ar1_coef: float = 0.5, seed: int | None = None) -> DataFrame

Generate panel data with unit/time fixed effects and AR(1) covariate.

Parameters:

Name Type Description Default
n_units int

Number of units.

100
n_periods int

Number of time periods.

20
fe_sd float

Standard deviation of unit fixed effects.

1.0
te_sd float

Standard deviation of time fixed effects.

0.5
ar1_coef float

AR(1) coefficient for the covariate process.

0.5
seed int or None

Random seed.

None

Returns:

Type Description
DataFrame

Columns: unit, time, y, x.

Examples:

>>> df = dgp_panel(n_units=20, n_periods=5, seed=0)
>>> df.shape
(100, 4)

dgp_observational

dgp_observational(n: int = 1000, effect: float = 0.5, confounding: float = 0.3, seed: int | None = None) -> DataFrame

Generate observational data with selection on observables.

Parameters:

Name Type Description Default
n int

Sample size.

1000
effect float

True treatment effect.

0.5
confounding float

Strength of confounding (higher = more selection bias).

0.3
seed int or None

Random seed.

None

Returns:

Type Description
DataFrame

Columns: y, treatment, x1, x2, propensity_score.

Examples:

>>> df = dgp_observational(n=500, effect=1.0, seed=0)
>>> df.attrs['true_effect']
1.0

dgp_cluster_rct

dgp_cluster_rct(n_clusters: int = 50, cluster_size: int = 20, effect: float = 0.3, icc: float = 0.1, seed: int | None = None) -> DataFrame

Generate cluster-randomised trial data.

Parameters:

Name Type Description Default
n_clusters int

Number of clusters.

50
cluster_size int

Number of individuals per cluster.

20
effect float

Treatment effect.

0.3
icc float

Intra-cluster correlation coefficient (0-1).

0.1
seed int or None

Random seed.

None

Returns:

Type Description
DataFrame

Columns: y, treatment, cluster_id, unit_id.

Examples:

>>> df = dgp_cluster_rct(n_clusters=10, cluster_size=5, effect=0.5, seed=0)
>>> df.shape
(50, 4)

dgp_bunching

dgp_bunching(n: int = 10000, kink_point: float = 50000.0, elasticity: float = 0.3, seed: int | None = None) -> DataFrame

Generate bunching data around a kink point.

Parameters:

Name Type Description Default
n int

Sample size.

10000
kink_point float

Location of the kink (e.g., tax threshold).

50000.0
elasticity float

Behavioral elasticity governing bunching intensity.

0.3
seed int or None

Random seed.

None

Returns:

Type Description
DataFrame

Columns: income, counterfactual_income.

Examples:

>>> df = dgp_bunching(n=5000, kink_point=50000, elasticity=0.2, seed=0)
>>> (df['income'] <= 50000).mean() > (df['counterfactual_income'] <= 50000).mean()
True

dgp_synth

dgp_synth(n_units: int = 20, n_periods: int = 30, treated_unit: int = 0, treatment_time: int = 20, effect: float = 0.5, seed: int | None = None) -> DataFrame

Generate synthetic control data with a factor model.

Parameters:

Name Type Description Default
n_units int

Number of units (including the treated unit).

20
n_periods int

Number of time periods.

30
treated_unit int

Index of the treated unit.

0
treatment_time int

Period when treatment begins.

20
effect float

Treatment effect (additive, constant post-treatment).

0.5
seed int or None

Random seed.

None

Returns:

Type Description
DataFrame

Columns: unit, time, y, treated.

Examples:

>>> df = dgp_synth(n_units=10, n_periods=20, effect=1.0, seed=0)
>>> df.loc[(df['unit'] == 0) & (df['time'] >= 20), 'treated'].unique()
array([1.])

dgp_bartik

dgp_bartik(n_regions: int = 50, n_industries: int = 10, effect: float = 1.0, seed: int | None = None) -> dict

Generate shift-share (Bartik) instrument data.

Parameters:

Name Type Description Default
n_regions int

Number of regions.

50
n_industries int

Number of industries.

10
effect float

True effect of the Bartik instrument on the outcome.

1.0
seed int or None

Random seed.

None

Returns:

Type Description
dict

'data': DataFrame with y, bartik, region; 'shares': (n_regions, n_industries) array; 'shocks': (n_industries,) array.

Examples:

>>> result = dgp_bartik(n_regions=20, n_industries=5, seed=0)
>>> result['data'].shape
(20, 3)