笔记

因果推断(14)——合成双重差分

合成双重差分

在一开始的时候,学习了双重差分、合成控制、以及通过面板数据来识别处理效应。这里我们学习如何将这两种方法结合起来,即合成双重差分,这种方法同时利用了两种方法的优点,并且提高了精度。

我们用一个复杂点的例子,即块状处理分配,我们在多个时间段观察多个单位,并且同一时间内只有一部分单位接受处理,另外一部分单位保持未处理。可以将这种分配表示为一个处理指示矩阵 DD ,其中矩阵的列对应单位矩阵的行对应时间段

D=[00000000000001100011]\begin{split} D = \begin{bmatrix} 0 & 0 & 0 & \dots & 0 & 0 \\ 0 & 0 & 0 & \dots & 0 & 0 \\ \vdots & \vdots & \vdots & & \vdots & \vdots\\ 0 & 0 & 0 & \dots & 1 & 1 \\ 0 & 0 & 0 & \dots & 1 & 1 \\ \end{bmatrix} \end{split}

在上面这个矩阵里,我们假设这个矩阵 DRa×bD \in \mathbb{R}^{a×b} , 则对于第 bb 个单位和第 b1b-1 个单位在时间 t=a1t=a-1 处给予处理。在这里,我们只讨论所有被处理单位在同一时间接受处理的情况(如上面的表格,只有一个时间节点),且需要注意的是: 一旦一个单位接受处理,它就不会回到未处理状态。

然后对于这个矩阵,我们可以把它拆块处理:

首先看下面的一个矩阵:

Dpost,tr=[1111]\begin{split} D_{post, tr} = \begin{bmatrix} 1 & 1 \\ 1 & 1 \\ \end{bmatrix} \end{split}

这就代表了一个接受处理且处理后时期的一个子结果矩阵,同理,对于同列的,其上面的行就是接受处理组且处理前时期的子结果矩阵。以此类推,我们就可以转化为一个结果矩阵:

Y=[Ypre,coYpre,trYpost,coYpost,tr]\begin{split} Y = \begin{bmatrix} \pmb{Y}_{pre, co} & \pmb{Y}_{pre, tr} \\ \pmb{Y}_{post, co} & \pmb{Y}_{post, tr} \\ \end{bmatrix} \end{split}

在估计合成控制权重时我们将使用上述矩阵表示

这时候我们还有一种表:

一列表示单位,一列表示时间段,一列是结果变量,还有两列布尔变量分别标记“该单位是否属于处理组”和“该时期是否属于处理后”。即:

id time Y treated after_treatment
1 1970 5 False False
1 1971 6 False False
1 1972 8 False False
1 1973 9 False False
1 1974 11 False False

该表的行数等于单位数 NN 乘以 TT 时期数,这种数据表示方法在双重差分时非常有用熟悉表格数据的都知道,这其实就是一个长表格。 如果想从这种表格形式回到我们之前讨论的矩阵表示,只需按时间和单位进行透视。

回顾

双重差分

还记得之前学习的双向固定效应,我们可以把其写成TWFE形式:

τ^did=argminμ,α,β,τ{i=1Nt=1T(Yit(μ+αi+βt+τDit))2}\hat{\tau}^{did} = \underset{\mu, \alpha, \beta, \tau}{\arg\min} \bigg\{ \sum_{i=1}^N \sum_{t=1}^T \big(Y_{it} - (\mu + \alpha_i + \beta_t + \tau D_{it})\big)^2 \bigg\}

其中,αi\alpha_i为单位效应, βt\beta_t为时间效应, μ\mu 为截距。这里的截距μ\mu,,其匹配的是处理前和处理后时期之间的差异,它使控制组的处理前期和处理后期的平均值尽可能接近

然后我们利用Frisch-Waugh-Lovell theorem进行拟合:

Y¨it=YitYˉiYˉtD¨it=DitDˉiDˉt\begin{split} \ddot{Y}_{it} = Y_{it} - \bar{Y}_i - \bar{Y}_t\\ \ddot{D}_{it} = D_{it} - \bar{D}_i - \bar{D}_t \end{split}

即:

Y¨it=YitT1t=1TYitN1i=1NYitD¨it=DitT1t=1TDitN1i=1NDit\begin{split} \ddot{Y}_{it} = Y_{it} - T^{-1}\sum_{t=1}^{T} Y_{it} - N^{-1}\sum_{i=1}^{N} Y_{it}\\ \ddot{D}_{it} = D_{it} - T^{-1}\sum_{t=1}^{T} D_{it} - N^{-1}\sum_{i=1}^{N} D_{it} \end{split}

合成控制

在合成控制法里,我们使得处理单位在处理前的结果与控制单位的加权平均结果之间的差异最小,同时要求这些权重为正且和为 1。

w^sc=argminw  yˉpre,trYpre,cowco22s.t. wi=1 and wi>0  i\begin{split} \hat{w}^{sc} = \underset{w}{\mathrm{argmin}}\; \|\pmb{\bar{y}}_{pre, tr} - \pmb{Y}_{pre, co} \pmb{w}_{co}\|^2_2 \\ \text{s.t. } \sum w_i = 1 \text{ and } w_i > 0 \; \forall i \end{split} Y=[Ypre,coYpre,trYpost,coYpost,tr]\begin{split} Y = \begin{bmatrix} \pmb{Y}_{pre, co} & \pmb{Y}_{pre, tr} \\ \pmb{Y}_{post, co} & \pmb{Y}_{post, tr} \\ \end{bmatrix} \end{split}

其中,结果矩阵 Ypre,co\pmb{Y}_{pre, co} 是一个 Tpre×NcoT_{pre}\times N_{co} 的矩阵,列对应单位,行对应时间段;wco\pmb{w}_{co} 是一个 Nco×1N_{co}\times 1 的列向量,每个元素对应一个单位。最后,yˉpre,tr\pmb{\bar{y}}_{pre, tr} 是一个 Tpre×1T_{pre}\times 1 的列向量,其中的每个元素都是被处理单位在处理前时期的单位平均。我们有时也将合成控制称为“横向回归”:单位是矩阵的列。我们将被处理单位的平均结果回归到控制单位上。

当我们优化得到了w^sc\hat{w}^{sc},就可以在所有时间段上将它们与控制单位相乘,得到被处理单位的合成控制:

ysc=Ycow^sc\pmb{y}_{sc} = \pmb{Y}_{co}\hat{\pmb{w}}^{sc}

此时认为 ypost,sc\pmb{y}_{post, sc} 是缺失潜在结果 Y(0)post,trY(0)_{post, tr} 的良好估计。此时,ATT 就是处理后时期被处理单位的平均结果减去合成控制的平均结果:

τ^=yˉpost,tryˉpost,sc\hat{\tau} = \bar{y}_{post, tr} - \bar{y}_{post, sc}

合成控制能够更好地处理处理前的非平行趋势,因为在构造合成控制的过程中,其在处理前时期强制让趋势平行。

我们也可以将合成控制估计写成优化问题:

τ^sc=argminβ,τ{i=1Nt=1T(YitβtτDit)2w^isc}\hat{\tau}^{sc} = \underset{\beta, \tau}{\arg\min} \bigg\{ \sum_{i=1}^N \sum_{t=1}^T \big(Y_{it} - \beta_t - \tau D_{it}\big)^2 \hat{w}^{sc}_i \bigg\}

我们发现这里与TWFE不太一样的是,我们没有截距 μ\mu,也没有单位效应αi\alpha_i。然后我们引入了控制单位的权重 w^isc\hat{w}_i^{sc}, 对于被处理单位,其权重简单地设为均匀权重 1/Ntr1/N_{tr}

合成双重差分

我们用一种方法,就像我们当初做双重稳健估计一样,来将两个方法合成一起:

τ^sc=argminβ,τ{i=1Nt=1T(YitβtτDit)2w^isc}τ^did=argminμ,α,β,τ{i=1Nt=1T(Yit(μ+αi+βt+τDit))2}\hat{\tau}^{sc} = \underset{\beta, \tau}{\arg\min} \bigg\{ \sum_{i=1}^N \sum_{t=1}^T \big(Y_{it} - \beta_t - \tau D_{it}\big)^2 \hat{w}^{sc}_i \bigg\} \\ \hat{\tau}^{did} = \underset{\mu, \alpha, \beta, \tau}{\arg\min} \bigg\{ \sum_{i=1}^N \sum_{t=1}^T \big(Y_{it} - (\mu + \alpha_i + \beta_t + \tau D_{it})\big)^2 \bigg\}

我们合成一下:

τ^sdid=argminμ,α,β,τ{i=1Nt=1T(Yit(μ+αi+βt+τDit))2w^isdidλ^tsdid}\hat{\tau}^{sdid} = \underset{\mu, \alpha, \beta, \tau}{\arg\min} \bigg\{ \sum_{i=1}^N \sum_{t=1}^T \big(Y_{it} - (\mu + \alpha_i + \beta_t + \tau D_{it})\big)^2 \hat{w}^{sdid}_i \hat{\lambda}^{sdid}_t \bigg\}

我们加回了固定效应 αi\alpha_i 还有截距 μ\mu,另外我们保留了单位权重,然后我们另外加了一个时间权重 λ^tsdid\hat{\lambda}^{sdid}_t.

看回我们是怎么估计单位权重的:

w^sc=argminw  yˉpre,trYpre,cowco22s.t. wi=1 and wi>0  i\begin{split} \hat{w}^{sc} = \underset{w}{\mathrm{argmin}}\; \|\pmb{\bar{y}}_{pre, tr} - \pmb{Y}_{pre, co} \pmb{w}_{co}\|^2_2 \\ \text{s.t. } \sum w_i = 1 \text{ and } w_i > 0 \; \forall i \end{split}

但与合成控制法不太一样,这里我们会想着在处理前时期控制组的加权平均结果处理组的算术平均结果尽可能接近,相差一个常数(截距项),这个截距项是为了保证事前趋势平行,不需要完全匹配。另外,我们会添加一个L2正则化惩罚项,使得权重分散且唯一。

(ω^0,ω^sdid)=argminω0R,ωΩunit(ω0,ω)unit(ω0,ω)=t=1Tpre(ω0+i=1NcoωiYitControl weighted Y1Ntri=Nco+1NYitTreatment Weighted Y)2+ζ2Tpreω22Ω={ωR+N:i=1Ncoωi=1,ωi=Ntr1 for all i=Nco+1,,N}(\hat{\omega}_0,\hat{\omega}^{sdid}) = \mathop{\arg \min} \limits_{\omega_0\in \mathbb{R},\omega \in \Omega} \ell_{unit}(\omega_0,\omega)\\ \ell_{unit}(\omega_0,\omega) = \sum^{T_{pre}}_{t=1} \bigg(\omega_0+\underbrace{\sum^{N_{co}}_{i=1}\omega_iY_{it}}_{\text{Control weighted Y}}-\underbrace{\frac{1}{N_{tr}}\sum^{N}_{i=N_{co}+1}Y_{it}}_{\text{Treatment Weighted Y}} \bigg)^2 + \zeta^2T_{pre}||\omega||^2_2 \\ \Omega = \bigg \{ \omega \in \mathbb{R}_+^N:\sum^{N_{co}}_{i=1}\omega_i = 1, \omega_i = N_{tr}^{-1} \text{ for all } i = N_{co} + 1, \dots, N \bigg\}

umm,看了一下,我们有理由相信,我们的时间权重的最优化解也是差不多一个形式,只不过时间权重匹配的是处理前和处理后时期之间的差异,它使控制组的处理前期和处理后期的平均值尽可能接近,然后对于每一个控制组中的个体处理前时期的加权平均结果与处理后时期的算术平均结果,相差一个常数截距 λ0\lambda_0

(λ^0,λ^sdid)=argminλ0R,λΛtime(λ0,λ)time(λ0,λ)=i=1Nco(λ0+t=1TpreλtYitPre-treatment weighted effect1Tpostt=Tpre+1TYitPost-treatment weighted effect)2Λ={λR+T:t=1Tpreλt=1,λt=Tpost1 for all t=Tpre+1,,T}(\hat{\lambda}_0,\hat{\lambda}^{sdid}) = \mathop{\arg \min} \limits_{\lambda_0\in \mathbb{R},\lambda \in \Lambda} \ell_{time}(\lambda_0,\lambda)\\ \ell_{time}(\lambda_0,\lambda) = \sum^{N_{co}}_{i=1} \bigg(\lambda_0+\underbrace{\sum^{T_{pre}}_{t=1}\lambda_tY_{it}}_{\text{Pre-treatment weighted effect}}-\underbrace{\frac{1}{T_{post}}\sum^{T}_{t=T_{pre}+1}Y_{it}}_{\text{Post-treatment weighted effect}} \bigg)^2 \\ \Lambda = \bigg \{ \lambda \in \mathbb{R}_+^T:\sum^{T_{pre}}_{t=1}\lambda_t = 1, \lambda_t = T_{post}^{-1} \text{ for all } t = T_{pre} + 1, \dots, T \bigg\}

应注意加权平均和算术平均

总体思想就是与处理组个体越相似的个体赋予更高的权重,与处理时期越相似的时期赋予更高的权重。

最后我们就可以得到我们想要的估计量

τ^sdid=δ^Tri=1Ncoω^iδ^i=i=Nco+1Nδ^iNtri=1Ncoω^iδ^i=[1NtrTposti=Nco+1Nt=Tpre+1TYit1Ntri=Nco+1Nt=1TpreλtsdidYit][i=1Ncoω^i(1Tpostt=Tpre+1TYitt=1TpreλtsdidYit)]\begin{aligned} \hat{\tau}^{sdid} &= \hat{\delta}_{Tr} - \sum^{N_{co}}_{i=1} \hat{\omega}_i\hat{\delta}_{i} \\ &= \dfrac{\sum^{N}_{i = N_{co}+1}\hat{\delta}_i}{N_{tr}} - \sum^{N_{co}}_{i=1} \hat{\omega}_i\hat{\delta}_{i} \\ &= \bigg[\frac{1}{N_{tr}T_{post}}\sum^N_{i=N_{co}+1}\sum^T_{t=T_{pre}+1}Y_{it} - \frac{1}{N_{tr}}\sum^N_{i=N_{co}+1}\sum^{T_{pre}}_{t= 1}\lambda_t^{sdid}Y_{it} \bigg] -\\ & \qquad \bigg[\sum^{N_{co}}_{i=1}\hat{\omega}_i \bigg(\frac{1}{T_{post}}\sum^{T}_{t=T_{pre}+1}Y_{it} - \sum^{T_{pre}}_{t =1} \lambda_t^{sdid}Y_{it} \bigg) \bigg] \end{aligned}

其中:

δ^isdid=1Tpostt=Tpre+1TYitt=1TpreλtsdidYit\hat{\delta}_i^{sdid} = \frac{1}{T_{post}}\sum^{T}_{t = T_{pre}+1}Y_{it} - \sum^{T_{pre}}_{t=1}\lambda_t^{sdid}Y_{it}

欢迎读者进行验算。

代码实现

下面用 pysynthdid 自带的 California smoking 数据做一个最小示例。这个库使用的是“宽表”形式:行对应时间,列对应单位,表格中的值就是结果变量 YitY_{it}。这和前面提到的“长表”不同,如果数据是长表,需要先用 pivot 转换。

pip install git+https://github.com/MasaAsami/pysynthdid
import pandas as pd
import matplotlib.pyplot as plt

from synthdid.model import SynthDID
from synthdid.sample_data import fetch_CaliforniaSmoking


# 让打印出来的小数更整齐,方便查看估计结果和权重
pd.options.display.float_format = "{:.4f}".format

# 读取 pysynthdid 自带的示例数据
# 该数据的行是年份,列是州名,单元格是每个州的人均香烟消费量
df = fetch_CaliforniaSmoking()

# 处理前时期和处理后时期
# California Proposition 99 在 1989 年开始生效,所以 1970-1988 是处理前,
# 1989-2000 是处理后
PRE_TERM = [1970, 1988]
POST_TERM = [1989, 2000]

# 被处理单位。这里把 California 作为处理组,其余州自动作为控制组
TREATMENT_UNIT = ["California"]

# 构造合成双重差分模型
# df: 宽表形式的结果变量矩阵
# PRE_TERM: 处理前时间窗口
# POST_TERM: 处理后时间窗口
# TREATMENT_UNIT: 被处理单位名称列表
sdid = SynthDID(df, PRE_TERM, POST_TERM, TREATMENT_UNIT)

# 拟合模型
# zeta_type="base" 表示使用库中默认的正则化参数估计方式
sdid.fit(zeta_type="base")

# 分别计算 SDID、合成控制和普通 DID 的 ATT,便于比较
att_sdid = sdid.hat_tau(model="sdid")
att_sc = sdid.hat_tau(model="sc")
att_did = sdid.hat_tau(model="did")

print(f"SDID 估计的 ATT: {att_sdid:.4f}")
print(f"合成控制估计的 ATT: {att_sc:.4f}")
print(f"普通 DID 估计的 ATT: {att_did:.4f}")

# 查看单位权重 omega 和时间权重 lambda
# unit_weights 反映哪些控制州更像 California
# time_weights 反映哪些处理前时期更像处理后的比较基准
unit_weights, time_weights = sdid.estimated_params(model="sdid")
unit_weights = unit_weights[unit_weights["features"] != "intercept"]

print("单位权重 omega:")
print(unit_weights.sort_values("sdid_weight", ascending=False).head(10))

print("时间权重 lambda:")
print(time_weights.sort_values("sdid_weight", ascending=False).head(10))

# 画出真实处理组轨迹与合成反事实轨迹
sdid.plot(model="sdid")
plt.show()

如果自己的原始数据是长表,例如包含 idtimeY 三列,可以先转换为宽表再放入模型:

# panel 是长表数据:每行是一组 id-time 观测
Y_wide = (
    panel
    .pivot(index="time", columns="id", values="Y")
    .sort_index()
)

sdid = SynthDID(
    Y_wide,
    pre_term=[1970, 1988],        # 改成自己的处理前起止时间
    post_term=[1989, 2000],       # 改成自己的处理后起止时间
    treatment_unit=["treated_id"], # 改成自己的被处理单位 id
)
sdid.fit(zeta_type="base")
print(sdid.hat_tau(model="sdid"))

但现在,我们还是面临一个问题,处理万一随着时间变化而产生异质性效应的情况:

分批采用

我们可以为每一个处理后的时期单独估计一个效应,我们通过设置不同的处理后时间节点,就可以得到不同处理后时间节点的效应,从而就可以得到每一时期效应的变化。多次运行 SDID 在处理分批采用情形时也非常重要。在分批采用设计中,我们有多个被处理单位,它们在不同的时间接受处理。如下面一个矩阵:

D=[010101020202030313041414]\begin{split} D = \begin{bmatrix} 0_1 & 0_1 & 0_1 \\ 0_2 & 0_2 & 0_2 \\ 0_3 & 0_3 & 1_3 \\ 0_4 & 1_4 & 1_4 \\ \end{bmatrix} \end{split}

这个矩阵表达了单位 1 从未接受处理,单位 2 在第 4 个时间段接受处理,单位 3 在第 3 个时间段接受处理。我们在实际SDID的时候,当我们有多组处理组,我们只需要保留控制单位和其中一组即可轻松做到这一点,分别估计每一组的SDID。需要注意的是,我们的处理后指示变量是不一样的,如上面的单位2,其处理指示变量是 t4t \ge 4,而如果是第三个单位,其处理指示变量就是 t3t\ge3。现在我们来合成最后的ATT:

设有GG个处理组,每个处理组里面有 kgk_g 个单元,然后对于第 gg 个处理组,其每一个单元处理后的观察值有 tgt_g个,记τg\tau_g 为第g个处理组的效应值,则有:

ATTˉ=g=1Gkgtgτgg=1Gkgtg\bar{ATT} = \frac{\sum^G_{g=1}k_gt_g\tau_g}{\sum^G_{g=1}k_gt_g}

代码实现

pysynthdidSynthDID 一次只适合一个“共同处理起点”的块状处理设计。因此在分批采用情形下, 先按首次处理时间把处理单位分组;每次只保留从未处理的控制单位和其中一个处理批次;把该批次处理单位的结果先取算术平均,作为一个“处理组平均结果”列;分别估计该批次的 SDID;最后再按照 kgtgk_g t_g 加权合成总体 ATT。

下面的代码先构造一个小型分批采用面板数据,再演示如何估计每个批次的 ATT 和总体加权 ATT。

import numpy as np
import pandas as pd

from synthdid.model import SynthDID

# 为了让示例结果可以复现,固定随机数种子
rng = np.random.default_rng(42)

# 构造一个用于学习的小型面板数据:
# - unit_1、unit_2、unit_3 从未接受处理
# - unit_4、unit_5 在第 5 期开始接受处理
# - unit_6 在第 6 期开始接受处理
times = np.arange(1, 9)
units = [f"unit_{i}" for i in range(1, 7)]
never_treated_units = ["unit_1", "unit_2", "unit_3"]
first_treat_time = {
    "unit_4": 5,
    "unit_5": 5,
    "unit_6": 6,
}

rows = []
for unit in units:
    unit_effect = rng.normal(loc=0, scale=1)

    for t in times:
        g = first_treat_time.get(unit)
        is_treated = g is not None and t >= g

        # 这里故意设置一个随处理后时间递增的处理效应,方便后面演示动态效应
        horizon = t - g if is_treated else 0
        treatment_effect = 2 + 0.5 * horizon if is_treated else 0

        # 结果变量 = 单位效应 + 时间趋势 + 处理效应 + 随机扰动
        y = unit_effect + 0.3 * t + 0.05 * t**2 + treatment_effect
        y += rng.normal(loc=0, scale=0.2)

        rows.append(
            {
                "id": unit,
                "time": t,
                "Y": y,
                "first_treat_time": g,
                "treated": is_treated,
            }
        )

panel = pd.DataFrame(rows)

# pysynthdid 需要宽表:行是时间,列是单位,值是结果变量 Y_it
Y_wide = (
    panel
    .pivot(index="time", columns="id", values="Y")
    .sort_index()
)


def make_cohorts(first_treat_time):
    """把处理单位按照首次处理时间分组。"""
    cohorts = {}
    for unit, g in first_treat_time.items():
        cohorts.setdefault(g, []).append(unit)
    return cohorts


def fit_sdid_block(
    Y_wide,
    treated_units,
    control_units,
    cohort_start,
    post_start=None,
    post_end=None,
):
    """
    对某一个处理批次估计 SDID。
    参数说明:
    - treated_units: 当前批次中首次处理时间相同的处理单位
    - control_units: 控制单位。这里为了和上文保持一致,只使用从未处理单位
    - cohort_start: 当前批次首次接受处理的时间
    - post_start/post_end: 本次估计使用的处理后窗口

    如果 post_start 和 post_end 都不传入,就估计该批次从首次处理到样本末期的平均效应。
    如果 post_start = post_end,就估计某一个处理后时期的单期效应。
    """
    all_times = pd.Index(Y_wide.index).sort_values()
    pre_times = all_times[all_times < cohort_start]

    if len(pre_times) == 0:
        raise ValueError("当前处理批次之前没有处理前时期,无法估计 SDID。")

    if post_start is None:
        post_start = cohort_start
    if post_end is None:
        post_end = all_times.max()

    post_times = all_times[(all_times >= post_start) & (all_times <= post_end)]
    if len(post_times) == 0:
        raise ValueError("处理后窗口为空,请检查 post_start 和 post_end。")

    # 只保留当前 SDID 需要的行:
    # 所有处理前时期 + 当前设定的处理后窗口
    use_rows = list(pre_times) + list(post_times)

    # 控制组仍然保留为一列一个单位
    block = Y_wide.loc[use_rows, list(control_units)].copy()

    # 处理组如果包含多个单位,先取算术平均。
    # 这样做更贴合前面公式里的“处理组平均结果”,也避免库内部对多处理列支持不充分。
    treated_mean_name = "__treated_mean__"
    block[treated_mean_name] = Y_wide.loc[use_rows, list(treated_units)].mean(axis=1)

    model = SynthDID(
        block,
        pre_term=[pre_times.min(), pre_times.max()],
        post_term=[post_times.min(), post_times.max()],
        treatment_unit=[treated_mean_name],
    )
    model.fit(zeta_type="base")

    tau = model.hat_tau(model="sdid")

    return {
        "tau_n": tau,
        "k_n": len(treated_units),
        "t_n": len(post_times),
        "model": model,
    }

# 1. 按首次处理时间分组
cohorts = make_cohorts(first_treat_time)

# 2. 分别估计每一个处理批次的 SDID
cohort_rows = []
cohort_models = {}

for cohort_start, treated_units in cohorts.items():
    result = fit_sdid_block(
        Y_wide=Y_wide,
        treated_units=treated_units,
        control_units=never_treated_units,
        cohort_start=cohort_start,
    )

    cohort_rows.append(
        {
            "cohort_start": cohort_start,
            "treated_units": treated_units,
            "tau_n": result["tau_n"],
            "k_n": result["k_n"],
            "t_n": result["t_n"],
        }
    )
    cohort_models[cohort_start] = result["model"]

cohort_results = pd.DataFrame(cohort_rows)

# 3. 按照 k_n * t_n 加权合成总体 ATT
cohort_results["weight"] = cohort_results["k_n"] * cohort_results["t_n"]
att_staggered = np.average(
    cohort_results["tau_n"],
    weights=cohort_results["weight"],
)

print("各处理批次的 SDID 估计:")
print(cohort_results)

print(f"分批采用下的加权 ATT: {att_staggered:.4f}")

现在如果我们关注处理效应是否随处理后的相对时间变化,可以把处理后窗口设成单个时期。例如 horizon = 0 表示刚接受处理的那一期,horizon = 1 表示接受处理后的下一期。下面这段代码就是在每个相对处理时间上分别运行 SDID,然后再按当前 horizon 上的处理单位数加权平均。

def estimate_event_time_effects(
    Y_wide,
    cohorts,
    control_units,
    horizons=(0, 1, 2, 3),
):
    """估计不同相对处理时间 horizon 上的动态 SDID 效应。"""
    all_times = pd.Index(Y_wide.index).sort_values()
    event_rows = []

    for cohort_start, treated_units in cohorts.items():
        # 当前批次处理开始之后的所有时期
        available_post_times = all_times[all_times >= cohort_start]

        for horizon in horizons:
            # 如果当前批次没有这么远的处理后时期,就跳过
            if horizon >= len(available_post_times):
                continue

            event_time = available_post_times[horizon]

            # 只把当前 event_time 作为处理后窗口,
            # 这样 tau_n 就对应“处理后第 horizon 期”的单期效应
            result = fit_sdid_block(
                Y_wide=Y_wide,
                treated_units=treated_units,
                control_units=control_units,
                cohort_start=cohort_start,
                post_start=event_time,
                post_end=event_time,
            )

            event_rows.append(
                {
                    "cohort_start": cohort_start,
                    "horizon": horizon,
                    "event_time": event_time,
                    "tau_n": result["tau_n"],
                    "k_n": result["k_n"],
                }
            )

    event_results = pd.DataFrame(event_rows)

    # 对同一个 horizon 下不同处理批次的估计量做加权平均
    # 因为每个 horizon 只对应一个处理后时期,所以权重使用当前批次处理单位数 k_n
    event_results["weighted_tau"] = event_results["tau_n"] * event_results["k_n"]
    event_att = (
        event_results
        .groupby("horizon", as_index=False)
        .agg(
            weighted_tau=("weighted_tau", "sum"),
            weight=("k_n", "sum"),
        )
    )
    event_att["tau_h"] = event_att["weighted_tau"] / event_att["weight"]

    return event_results, event_att[["horizon", "tau_h"]]


event_results, event_att = estimate_event_time_effects(
    Y_wide=Y_wide,
    cohorts=cohorts,
    control_units=never_treated_units,
    horizons=(0, 1, 2, 3),
)

print("各批次、各相对处理时间的 SDID 估计:")
print(event_results)

print("按相对处理时间汇总后的动态效应:")
print(event_att)

安慰剂方差估计

上面我们只对 SDID 计算了点估计值,那么,置信区间呢?因为我们一开始就说到与合成控制相比,SDID 的误差更小。

SDID 常见的方差估计方法包括 bootstrap、jackknife 和 placebo 三类。一般来说:

  • 当处理单位数量比较多时,bootstrap 或 jackknife 更常用;
  • 当只有一个处理单位,或者处理单位很少时,bootstrap 和 jackknife 很难稳定工作,此时最常用的是安慰剂推断;
  • 在合成控制和单处理单位 SDID 的经验应用中,安慰剂检验也是最常见的稳健性检验之一。

安慰剂检验的基本想法是:假装控制池中的某个单位被处理,实际上它并没有被处理。然后,我们用同样的 SDID 流程估计这个“假处理”的 ATTATT 并保存结果。重复这个步骤多次,每次从控制池里抽取一个或一组安慰剂处理单位。最后,我们将得到一组安慰剂 ATTATT 值。这组值的离散程度可以用来估计 SDID 点估计的不确定性。

需要注意的是,安慰剂方差估计的可用范围并不是无限的。它通常要求:

  • 面板数据是平衡面板,处理前和处理后时期都没有缺失值;
  • 至少存在一批从未接受处理的纯控制单位;
  • 控制单位数量要大于安慰剂处理单位数量,否则无法在抽出安慰剂处理单位后继续构造 donor pool;
  • 不应包含一开始就已经接受处理的 always-treated 单位,因为这类单位没有处理前时期;
  • 安慰剂单位应该和真实处理单位具有可比性,否则安慰剂分布反映的是 donor pool 的异质性,而不只是估计误差;
  • 如果控制单位数量很少,安慰剂分布会非常离散,置信区间也会不稳定。

因此,安慰剂推断最常用的范围是:单个处理单位或少量处理单位、较多从未处理控制单位、较长处理前时期的 SDID 或合成控制应用。如果有较多处理单位,尤其是每个处理批次里都有不少处理单位,通常更倾向于 bootstrap 或 jackknife。

安慰剂方差可以写成:

V^τplacebo=1B1b=1B(τ^(b)τ^ˉplacebo)2\hat{V}^{placebo}_{\tau} = \frac{1}{B-1}\sum_{b=1}^B\bigg(\hat{\tau}^{(b)} - \bar{\hat{\tau}}^{placebo}\bigg)^2

其中,τ^(b)\hat{\tau}^{(b)} 是第 bb 次安慰剂重复得到的 SDID 估计量,τ^ˉplacebo\bar{\hat{\tau}}^{placebo} 是所有安慰剂估计量的平均值。这里使用 B1B-1 是样本方差写法;当 BB 很大时,和使用 BB 的差别很小。

然后可以用正态近似构造置信区间:

ττ^sdid±z1α/2V^τplacebo\tau \in \hat{\tau}^{sdid} \pm z_{1-\alpha/2} \sqrt{\hat{V}^{placebo}_{\tau}}

另外这个方法也能用于单个处理组时的估计每个处理后时期效应的方差,从而为其构建置信区间。

代码实现

pysynthdid 本身提供了 cal_se(algo="placebo"),可以直接计算安慰剂标准误。不过为了理解安慰剂推断的过程,下面手动写一遍安慰剂重复抽样。这样做还有一个好处:后面处理多组处理组加权 ATT 的置信区间时,可以复用同样的思路。

import numpy as np
import pandas as pd

from synthdid.model import SynthDID
from synthdid.sample_data import fetch_CaliforniaSmoking


def fit_sdid_with_mean_treated(
    Y_wide,
    control_units,
    treated_units,
    pre_term,
    post_term,
    treated_name="__treated_mean__",
):
    """
    拟合一个块状处理设计下的 SDID。

    参数说明:
    - Y_wide: 宽表,行是时间,列是单位,值是结果变量 Y_it
    - control_units: 控制单位列表
    - treated_units: 处理单位列表
    - pre_term: 处理前时期,例如 [1970, 1988]
    - post_term: 处理后时期,例如 [1989, 2000]

    这里把多个处理单位先取算术平均,合成一个处理组平均结果列。
    这样既贴合公式里的处理组平均结果,也能避免多处理列带来的接口细节问题。
    """
    use_times = Y_wide.loc[pre_term[0]:post_term[1]].index

    # donor pool:仍然保留为一列一个控制单位
    block = Y_wide.loc[use_times, list(control_units)].copy()

    # 处理组:如果有多个处理单位,就先取平均,作为一个处理组代表序列
    block[treated_name] = Y_wide.loc[use_times, list(treated_units)].mean(axis=1)

    model = SynthDID(
        block,
        pre_term=pre_term,
        post_term=post_term,
        treatment_unit=[treated_name],
    )
    model.fit(zeta_type="base")

    tau_hat = model.hat_tau(model="sdid")
    return model, tau_hat


def placebo_tau_distribution(
    Y_wide,
    control_units,
    treated_group_size,
    pre_term,
    post_term,
    replications=500,
    random_seed=42,
):
    """
    生成安慰剂 ATT 分布。

    每次从控制池中抽出 treated_group_size 个单位,假装它们接受了处理;
    剩下的控制单位继续作为 donor pool。这样可以让安慰剂处理组规模
    与真实处理组规模一致。
    """
    if len(control_units) <= treated_group_size:
        raise ValueError("控制单位数量必须大于安慰剂处理单位数量。")

    rng = np.random.default_rng(random_seed)
    placebo_taus = []

    for _ in range(replications):
        placebo_units = rng.choice(
            control_units,
            size=treated_group_size,
            replace=False,
        ).tolist()

        placebo_controls = [
            unit for unit in control_units
            if unit not in placebo_units
        ]

        _, tau_b = fit_sdid_with_mean_treated(
            Y_wide=Y_wide,
            control_units=placebo_controls,
            treated_units=placebo_units,
            pre_term=pre_term,
            post_term=post_term,
            treated_name="__placebo_mean__",
        )
        placebo_taus.append(tau_b)

    return np.array(placebo_taus)


# 示例:California smoking 数据
df = fetch_CaliforniaSmoking()

PRE_TERM = [1970, 1988]
POST_TERM = [1989, 2000]
TREATED_UNITS = ["California"]
CONTROL_UNITS = [unit for unit in df.columns if unit not in TREATED_UNITS]

# 真实处理效应点估计
sdid, tau_hat = fit_sdid_with_mean_treated(
    Y_wide=df,
    control_units=CONTROL_UNITS,
    treated_units=TREATED_UNITS,
    pre_term=PRE_TERM,
    post_term=POST_TERM,
)

# 安慰剂重复抽样
placebo_taus = placebo_tau_distribution(
    Y_wide=df,
    control_units=CONTROL_UNITS,
    treated_group_size=len(TREATED_UNITS),
    pre_term=PRE_TERM,
    post_term=POST_TERM,
    replications=500,
    random_seed=42,
)

# 安慰剂标准误和 95% 置信区间
placebo_se = np.std(placebo_taus, ddof=1)
z_975 = 1.96
ci_low = tau_hat - z_975 * placebo_se
ci_high = tau_hat + z_975 * placebo_se

print(f"SDID 点估计: {tau_hat:.4f}")
print(f"安慰剂标准误: {placebo_se:.4f}")
print(f"95% 置信区间: [{ci_low:.4f}, {ci_high:.4f}]")

如果只想直接调用 pysynthdid 内置方法,可以写得更短:

sdid.cal_se(algo="placebo", replications=500)

print(f"SDID 标准误: {sdid.sdid_se:.4f}")
print(f"95% 置信区间: [{tau_hat - 1.96 * sdid.sdid_se:.4f}, {tau_hat + 1.96 * sdid.sdid_se:.4f}]")

多组处理组加权 ATT 的置信区间

在分批采用或多组处理组 SDID 中,我们前面已经把总体 ATT 写成:

ATTˉ=g=1Gkgtgτ^gg=1Gkgtg\bar{ATT} = \frac{\sum^G_{g=1}k_gt_g\hat{\tau}_g}{\sum^G_{g=1}k_gt_g}

其中,gg 表示处理批次或处理组,kgk_g 是第 gg 组中的处理单位数量,tgt_g 是第 gg 组的处理后时期数量,τ^g\hat{\tau}_g 是第 gg 组单独估计出来的 SDID 效应。

ag=kgtgg=1Gkgtga_g = \frac{k_gt_g}{\sum^G_{g=1}k_gt_g}

则总体 ATT 可以写成:

τ^weighted=g=1Gagτ^g\hat{\tau}^{weighted} = \sum^G_{g=1}a_g\hat{\tau}_g

如果已经有每个处理组的方差估计 V^g\hat{V}_g,并且近似认为不同处理组估计量相互独立,可以用一个简单的加权方差:

V^(τ^weighted)g=1Gag2V^g\hat{V}(\hat{\tau}^{weighted}) \approx \sum^G_{g=1}a_g^2\hat{V}_g

但是这个独立性假设通常偏强,因为不同处理组可能共享同一批控制单位,也共享同一组时间冲击。因此在实际计算中,更推荐直接构造“总体加权 ATT 的安慰剂分布”:

τ^weighted,(b)=g=1Gagτ^g(b)\hat{\tau}^{weighted,(b)} = \sum^G_{g=1}a_g\hat{\tau}^{(b)}_g

然后用 {τ^weighted,(b)}b=1B\{\hat{\tau}^{weighted,(b)}\}_{b=1}^B 的样本方差作为总体加权 ATT 的方差估计:

V^placebo(τ^weighted)=1B1b=1B(τ^weighted,(b)τ^ˉweighted,placebo)2\hat{V}^{placebo}(\hat{\tau}^{weighted}) = \frac{1}{B-1}\sum^B_{b=1} \bigg( \hat{\tau}^{weighted,(b)} - \bar{\hat{\tau}}^{weighted,placebo} \bigg)^2

最后仍然使用正态近似:

τ^weighted±z1α/2V^placebo(τ^weighted)\hat{\tau}^{weighted} \pm z_{1-\alpha/2} \sqrt{\hat{V}^{placebo}(\hat{\tau}^{weighted})}

代码实现

下面的代码沿用上一节的 fit_sdid_with_mean_treated。它会先按处理批次估计真实的加权 ATT,然后在每次安慰剂重复中,对每个处理批次都抽取一组安慰剂处理单位,计算该次重复下的加权 ATT,最后用这些加权 ATT 的分布构造置信区间。

def weighted_staggered_att_ci(
    Y_wide,
    cohorts,
    control_units,
    replications=500,
    random_seed=42,
):
    """
    计算多组处理组 SDID 的加权 ATT 及其安慰剂置信区间。

    参数说明:
    - Y_wide: 宽表,行是时间,列是单位
    - cohorts: 字典,键是首次处理时间,值是该批次的处理单位列表
      例如 {5: ["unit_4", "unit_5"], 6: ["unit_6"]}
    - control_units: 从未处理单位列表
    - replications: 安慰剂重复次数
    """
    rng = np.random.default_rng(random_seed)
    all_times = pd.Index(Y_wide.index).sort_values()

    cohort_info = []

    # 1. 先估计每一个真实处理批次的 SDID
    for cohort_start, treated_units in cohorts.items():
        pre_times = all_times[all_times < cohort_start]
        post_times = all_times[all_times >= cohort_start]

        if len(pre_times) == 0:
            raise ValueError(f"{cohort_start} 批次之前没有处理前时期。")
        if len(post_times) == 0:
            raise ValueError(f"{cohort_start} 批次没有处理后时期。")

        pre_term = [pre_times.min(), pre_times.max()]
        post_term = [post_times.min(), post_times.max()]

        _, tau_g = fit_sdid_with_mean_treated(
            Y_wide=Y_wide,
            control_units=control_units,
            treated_units=treated_units,
            pre_term=pre_term,
            post_term=post_term,
            treated_name=f"__treated_mean_{cohort_start}__",
        )

        k_g = len(treated_units)
        t_g = len(post_times)
        m_g = k_g * t_g

        cohort_info.append(
            {
                "cohort_start": cohort_start,
                "treated_units": treated_units,
                "pre_term": pre_term,
                "post_term": post_term,
                "k_g": k_g,
                "t_g": t_g,
                "m_g": m_g,
                "tau_g": tau_g,
            }
        )

    cohort_results = pd.DataFrame(cohort_info)

    # 2. 按 k_g * t_g 加权,得到真实总体 ATT
    total_weight = cohort_results["m_g"].sum()
    cohort_results["a_g"] = cohort_results["m_g"] / total_weight
    tau_weighted = np.sum(cohort_results["a_g"] * cohort_results["tau_g"])

    # 3. 构造总体加权 ATT 的安慰剂分布
    weighted_placebo_taus = []

    for _ in range(replications):
        tau_b_list = []

        for row in cohort_info:
            k_g = row["k_g"]

            if len(control_units) <= k_g:
                raise ValueError("控制单位数量必须大于当前批次处理单位数量。")

            # 为当前处理批次抽取同样规模的安慰剂处理组
            placebo_units = rng.choice(
                control_units,
                size=k_g,
                replace=False,
            ).tolist()

            placebo_controls = [
                unit for unit in control_units
                if unit not in placebo_units
            ]

            _, tau_g_b = fit_sdid_with_mean_treated(
                Y_wide=Y_wide,
                control_units=placebo_controls,
                treated_units=placebo_units,
                pre_term=row["pre_term"],
                post_term=row["post_term"],
                treated_name=f"__placebo_mean_{row['cohort_start']}__",
            )

            tau_b_list.append(tau_g_b)

        # 每一次重复都先得到所有批次的 placebo tau,再按真实权重合成
        tau_b_weighted = np.sum(cohort_results["a_g"].to_numpy() * np.array(tau_b_list))
        weighted_placebo_taus.append(tau_b_weighted)

    weighted_placebo_taus = np.array(weighted_placebo_taus)

    # 4. 用总体加权 placebo ATT 的方差构造置信区间
    se_weighted = np.std(weighted_placebo_taus, ddof=1)
    ci_low = tau_weighted - 1.96 * se_weighted
    ci_high = tau_weighted + 1.96 * se_weighted

    return {
        "tau_weighted": tau_weighted,
        "se_weighted": se_weighted,
        "ci_low": ci_low,
        "ci_high": ci_high,
        "cohort_results": cohort_results,
        "weighted_placebo_taus": weighted_placebo_taus,
    }


# 示例:沿用“分批采用”小节里的 Y_wide、cohorts 和 never_treated_units
ci_result = weighted_staggered_att_ci(
    Y_wide=Y_wide,
    cohorts=cohorts,
    control_units=never_treated_units,
    replications=500,
    random_seed=42,
)

print("各批次估计结果:")
print(ci_result["cohort_results"])

print(f"加权 ATT: {ci_result['tau_weighted']:.4f}")
print(f"安慰剂标准误: {ci_result['se_weighted']:.4f}")
print(f"95% 置信区间: [{ci_result['ci_low']:.4f}, {ci_result['ci_high']:.4f}]")