Skip to content

Preprocessing Reference

philanthropy.preprocessing

CRM data cleaning, Fiscal Year-aware feature engineering, and clinical-encounter feature engineering for medical philanthropy.

FiscalYearTransformer

Bases: TransformerMixin, BaseEstimator

Append Organisation-specific Fiscal Year and Quarter to dates.

Source code in philanthropy/preprocessing/transformers.py
class FiscalYearTransformer(TransformerMixin, BaseEstimator):
    """Append Organisation-specific Fiscal Year and Quarter to dates."""

    def __init__(self, date_col: str = "gift_date", fiscal_year_start: int = 7):
        self.date_col = date_col
        self.fiscal_year_start = fiscal_year_start

    def fit(self, X, y=None) -> "FiscalYearTransformer":
        validate_fiscal_year_start(self.fiscal_year_start)
        try:
            X_validated = validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=True)
        except Exception as e:
            if "Complex data not supported" in str(e):
                raise
            X_val = X.astype(object) if hasattr(X, "astype") else X
            X_validated = validate_data(self, X_val, dtype=None, ensure_all_finite="allow-nan", reset=True)

        if np.iscomplexobj(X_validated):
            raise ValueError("Complex data not supported")
        return self

    def transform(self, X) -> np.ndarray | pd.DataFrame:
        check_is_fitted(self)
        try:
            X_arr = validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=False)
        except Exception as e:
            if "Complex data not supported" in str(e):
                raise
            X_val = X.astype(object) if hasattr(X, "astype") else X
            X_arr = validate_data(self, X_val, dtype=None, ensure_all_finite="allow-nan", reset=False)

        if np.iscomplexobj(X_arr):
            raise ValueError("Complex data not supported")

        X_df = pd.DataFrame(X_arr, columns=getattr(self, "feature_names_in_", None)).copy()

        if self.date_col not in X_df.columns:
            X_df["fiscal_year"] = np.nan
            X_df["fiscal_quarter"] = np.nan
        else:
            dates = pd.to_datetime(X_df[self.date_col], errors="coerce")
            X_df["fiscal_year"] = dates.apply(
                lambda d: np.nan if pd.isna(d) else float(d.year + 1 if d.month >= self.fiscal_year_start else d.year)
            )
            X_df["fiscal_quarter"] = dates.apply(
                lambda d: np.nan if pd.isna(d) else float(((d.month - self.fiscal_year_start) % 12) // 3 + 1)
            )

        out_df = pd.DataFrame({
            "fiscal_year": pd.to_numeric(X_df["fiscal_year"], errors="coerce").astype(float),
            "fiscal_quarter": pd.to_numeric(X_df["fiscal_quarter"], errors="coerce").astype(float)
        })

        if _get_pandas_output(self):
            return out_df
        return out_df.to_numpy()

    def get_feature_names_out(self, input_features=None):
        check_is_fitted(self)
        return np.array(["fiscal_year", "fiscal_quarter"], dtype=object)

    def __sklearn_tags__(self):
        tags = super().__sklearn_tags__()
        tags.input_tags.allow_nan = True
        tags.input_tags.string = True
        return tags

CRMCleaner

Bases: TransformerMixin, BaseEstimator

Standardise raw CRM exports.

CRMCleaner performs lightweight, defensive cleaning of CRM datasets exported from systems such as Salesforce NPSP, Raiser's Edge NXT, or Ellucian Advance. It is designed to be chained in a sklearn.pipeline.Pipeline along with WealthScreeningImputer to handle missing wealth values.

Parameters:

Name Type Description Default
date_col str

Column containing ISO-8601 gift dates. Parsed to datetime64 during :meth:transform.

"gift_date"
amount_col str

Column containing raw gift amounts. Forced to float64 during :meth:transform; non-numeric values become NaN.

"gift_amount"
fiscal_year_start int

Month (1–12) that begins the organisation's fiscal year.

7

Attributes:

Name Type Description
feature_names_in_ list of str

Column names of X seen at :meth:fit time.

n_features_in_ int

Number of columns in X at :meth:fit time.

Source code in philanthropy/preprocessing/transformers.py
class CRMCleaner(TransformerMixin, BaseEstimator):
    """Standardise raw CRM exports.

    ``CRMCleaner`` performs lightweight, defensive cleaning of CRM datasets
    exported from systems such as Salesforce NPSP, Raiser's Edge NXT, or
    Ellucian Advance. It is designed to be chained in a `sklearn.pipeline.Pipeline`
    along with `WealthScreeningImputer` to handle missing wealth values.

    Parameters
    ----------
    date_col : str, default="gift_date"
        Column containing ISO-8601 gift dates.  Parsed to ``datetime64``
        during :meth:`transform`.
    amount_col : str, default="gift_amount"
        Column containing raw gift amounts.  Forced to ``float64`` during
        :meth:`transform`; non-numeric values become ``NaN``.
    fiscal_year_start : int, default=7
        Month (1–12) that begins the organisation's fiscal year.

    Attributes
    ----------
    feature_names_in_ : list of str
        Column names of ``X`` seen at :meth:`fit` time.
    n_features_in_ : int
        Number of columns in ``X`` at :meth:`fit` time.
    """

    def __init__(
        self,
        date_col: str = "gift_date",
        amount_col: str = "gift_amount",
        fiscal_year_start: int = 7,
    ) -> None:
        self.date_col = date_col
        self.amount_col = amount_col
        self.fiscal_year_start = fiscal_year_start

    def fit(self, X, y=None) -> "CRMCleaner":
        validate_fiscal_year_start(self.fiscal_year_start)

        # Try standard validation, fallback to object for mixed-type DataFrames or promotion errors
        try:
            X_validated = validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=True)
        except Exception as e:
            if "Complex data not supported" in str(e):
                raise
            X_val = X.astype(object) if hasattr(X, "astype") else X
            X_validated = validate_data(self, X_val, dtype=None, ensure_all_finite="allow-nan", reset=True)

        if np.iscomplexobj(X_validated):
            raise ValueError("Complex data not supported")

        return self

    def transform(self, X) -> np.ndarray | pd.DataFrame:
        check_is_fitted(self)
        try:
            X_arr = validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=False)
        except Exception as e:
            if "Complex data not supported" in str(e):
                raise
            X_val = X.astype(object) if hasattr(X, "astype") else X
            X_arr = validate_data(self, X_val, dtype=None, ensure_all_finite="allow-nan", reset=False)

        if np.iscomplexobj(X_arr):
            raise ValueError("Complex data not supported")

        X_df = pd.DataFrame(X_arr, columns=getattr(self, "feature_names_in_", None)).copy()

        if self.date_col in X_df.columns:
            X_df[self.date_col] = pd.to_datetime(X_df[self.date_col], errors="coerce")
        if self.amount_col in X_df.columns:
            X_df[self.amount_col] = pd.to_numeric(X_df[self.amount_col], errors="coerce")

        if _get_pandas_output(self):
            return X_df
        return X_df.to_numpy()

    def get_feature_names_out(self, input_features=None):
        check_is_fitted(self)
        names = list(self.feature_names_in_)
        return np.array(names, dtype=object)

    def __sklearn_tags__(self):
        tags = super().__sklearn_tags__()
        tags.input_tags.allow_nan = True
        tags.input_tags.string = True
        return tags

WealthScreeningImputer

Bases: TransformerMixin, BaseEstimator

Leakage-safe median/constant imputation for wealth-screening columns.

This transformer learns fill statistics only from the training fold during :meth:fit and applies them in :meth:transform. It is designed to slot cleanly into a :class:sklearn.pipeline.Pipeline immediately after :class:~philanthropy.preprocessing.CRMCleaner and before any model that cannot natively handle NaN values.

Parameters:

Name Type Description Default
wealth_cols list of str or None

Column names containing third-party wealth-screening numeric values. If None, defaults to a canonical set (estimated_net_worth, real_estate_value, stock_holdings, charitable_capacity, planned_gift_inclination). Only columns that actually exist in X are imputed; missing columns are skipped with a warning.

None
strategy ('median', 'mean', 'zero')

Imputation strategy applied to each wealth column:

  • "median" — Robust to the extreme right-skew and outliers common in wealth data. Strongly recommended for raw vendor exports.
  • "mean" — Computationally equivalent to OLS; use only after outlier treatment.
  • "zero" — Sets missing values to 0.0, which is semantically meaningful when absence of a record implies zero capacity (e.g., no real-estate holdings found).
"median"
add_indicator bool

If True, appends a binary indicator column <column_name>__was_missing (dtype uint8) for each imputed wealth column. Retaining missingness signals allows downstream models to learn that the absence of a vendor record itself carries information (e.g., very high-net-worth individuals are often not found in commercial databases because they actively shield their assets).

True
fiscal_year_start int

Month (1–12) starting the organisation's fiscal year. Inherited for pipeline compatibility.

7

Attributes:

Name Type Description
fill_values_ dict of {str: float}

Mapping from column name to the computed fill value, frozen at :meth:fit time.

imputed_cols_ list of str

Wealth columns that were actually present in X at :meth:fit time and will be imputed.

n_features_in_ int

Number of columns in X at :meth:fit time.

feature_names_in_ ndarray of str

Column names of X at :meth:fit time.

Raises:

Type Description
ValueError

If strategy is not one of {"median", "mean", "zero"}.

Examples:

>>> import pandas as pd
>>> import numpy as np
>>> from philanthropy.preprocessing import WealthScreeningImputer
>>> X = pd.DataFrame({
...     "estimated_net_worth": [1e6, np.nan, 5e5, np.nan, 2e6],
...     "real_estate_value":   [np.nan, 3e5, np.nan, 4e5, np.nan],
...     "gift_amount":         [5000, 250, 1000, 750, 10000],
... })
>>> imp = WealthScreeningImputer(
...     wealth_cols=["estimated_net_worth", "real_estate_value"],
...     strategy="median",
...     add_indicator=True,
... )
>>> imp.set_output(transform="pandas")
WealthScreeningImputer(...)
>>> X_out = imp.fit_transform(X)
>>> bool(X_out["estimated_net_worth"].isna().any())
False
>>> "estimated_net_worth__was_missing" in X_out.columns
True
See Also

philanthropy.preprocessing.CRMCleaner : Upstream cleaner that standardises column dtypes before this imputer. philanthropy.models.ShareOfWalletRegressor : Downstream model that uses wealth-screening features to estimate philanthropic capacity.

Source code in philanthropy/preprocessing/_wealth.py
class WealthScreeningImputer(TransformerMixin, BaseEstimator):
    """Leakage-safe median/constant imputation for wealth-screening columns.

    This transformer learns fill statistics **only** from the training fold
    during :meth:`fit` and applies them in :meth:`transform`.  It is designed
    to slot cleanly into a :class:`sklearn.pipeline.Pipeline` immediately after
    :class:`~philanthropy.preprocessing.CRMCleaner` and before any model that
    cannot natively handle ``NaN`` values.

    Parameters
    ----------
    wealth_cols : list of str or None, default=None
        Column names containing third-party wealth-screening numeric values.
        If ``None``, defaults to a canonical set (``estimated_net_worth``,
        ``real_estate_value``, ``stock_holdings``, ``charitable_capacity``,
        ``planned_gift_inclination``).  Only columns that *actually exist* in
        ``X`` are imputed; missing columns are skipped with a warning.
    strategy : {"median", "mean", "zero"}, default="median"
        Imputation strategy applied to each wealth column:

        * ``"median"`` — Robust to the extreme right-skew and outliers common
          in wealth data.  Strongly recommended for raw vendor exports.
        * ``"mean"``   — Computationally equivalent to OLS; use only after
          outlier treatment.
        * ``"zero"``   — Sets missing values to 0.0, which is semantically
          meaningful when absence of a record implies zero capacity (e.g., no
          real-estate holdings found).
    add_indicator : bool, default=True
        If ``True``, appends a binary indicator column
        ``<column_name>__was_missing`` (dtype ``uint8``) for each imputed
        wealth column.  Retaining missingness signals allows downstream models
        to learn that the absence of a vendor record itself carries information
        (e.g., very high-net-worth individuals are often *not* found in
        commercial databases because they actively shield their assets).
    fiscal_year_start : int, default=7
        Month (1–12) starting the organisation's fiscal year.  Inherited for
        pipeline compatibility.

    Attributes
    ----------
    fill_values_ : dict of {str: float}
        Mapping from column name to the computed fill value, frozen at
        :meth:`fit` time.
    imputed_cols_ : list of str
        Wealth columns that were actually present in ``X`` at :meth:`fit`
        time and will be imputed.
    n_features_in_ : int
        Number of columns in ``X`` at :meth:`fit` time.
    feature_names_in_ : ndarray of str
        Column names of ``X`` at :meth:`fit` time.

    Raises
    ------
    ValueError
        If ``strategy`` is not one of ``{"median", "mean", "zero"}``.

    Examples
    --------
    >>> import pandas as pd
    >>> import numpy as np
    >>> from philanthropy.preprocessing import WealthScreeningImputer
    >>> X = pd.DataFrame({
    ...     "estimated_net_worth": [1e6, np.nan, 5e5, np.nan, 2e6],
    ...     "real_estate_value":   [np.nan, 3e5, np.nan, 4e5, np.nan],
    ...     "gift_amount":         [5000, 250, 1000, 750, 10000],
    ... })
    >>> imp = WealthScreeningImputer(
    ...     wealth_cols=["estimated_net_worth", "real_estate_value"],
    ...     strategy="median",
    ...     add_indicator=True,
    ... )
    >>> imp.set_output(transform="pandas")  # doctest: +ELLIPSIS
    WealthScreeningImputer(...)
    >>> X_out = imp.fit_transform(X)
    >>> bool(X_out["estimated_net_worth"].isna().any())
    False
    >>> "estimated_net_worth__was_missing" in X_out.columns
    True

    See Also
    --------
    philanthropy.preprocessing.CRMCleaner :
        Upstream cleaner that standardises column dtypes before this imputer.
    philanthropy.models.ShareOfWalletRegressor :
        Downstream model that uses wealth-screening features to estimate
        philanthropic capacity.
    """

    _VALID_STRATEGIES = frozenset({"median", "mean", "zero"})

    def __init__(
        self,
        wealth_cols: Optional[List[str]] = None,
        strategy: Literal["median", "mean", "zero"] = "median",
        add_indicator: bool = True,
    ) -> None:
        self.wealth_cols = wealth_cols
        self.strategy = strategy
        self.add_indicator = add_indicator

    # ------------------------------------------------------------------
    # Private helpers
    # ------------------------------------------------------------------

    def _resolve_cols(self, input_cols: List[str]) -> List[str]:
        """Return the wealth columns that actually exist in ``X``."""
        candidates = (
            self.wealth_cols
            if self.wealth_cols is not None
            else _DEFAULT_WEALTH_COLS
        )
        return [c for c in candidates if c in input_cols]

    def _compute_fill(self, array: np.ndarray) -> float:
        """Return the fill value for a single wealth column."""
        if self.strategy == "median":
            val = np.nanmedian(array)
        elif self.strategy == "mean":
            val = np.nanmean(array)
        else:  # "zero"
            val = 0.0
        # If the column is entirely NaN, fall back to 0.0
        return float(val) if not np.isnan(val) else 0.0

    # ------------------------------------------------------------------
    # fit / transform
    # ------------------------------------------------------------------

    def fit(self, X, y=None) -> "WealthScreeningImputer":
        """Learn fill statistics from training data.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Training-set feature matrix.  Missing wealth columns are silently
            skipped (a ``UserWarning`` is issued for each absent column).
        y : ignored
            Present for scikit-learn API compatibility.

        Returns
        -------
        self : WealthScreeningImputer
            Fitted imputer.

        Raises
        ------
        ValueError
            If ``strategy`` is not ``"median"``, ``"mean"``, or ``"zero"``.
        """
        import warnings

        if self.strategy not in self._VALID_STRATEGIES:
            raise ValueError(
                f"`strategy` must be one of {sorted(self._VALID_STRATEGIES)}, "
                f"got {self.strategy!r}."
            )

        # Extract column names BEFORE validate_data converts DataFrame → ndarray
        if hasattr(X, "columns"):
            input_cols = list(X.columns)
        else:
            input_cols = None  # Will resolve after validate_data

        X = validate_data(self, X, dtype="numeric",
                          ensure_all_finite="allow-nan",
                          reset=True)

        # After validate_data, use feature_names_in_ if it was set (DataFrame input),
        # otherwise fall back to generated names.
        if input_cols is None:
            n_cols = X.shape[1]
            if hasattr(self, "feature_names_in_"):
                input_cols = list(self.feature_names_in_)
            else:
                input_cols = [f"x{i}" for i in range(n_cols)]

        self.imputed_cols_ = self._resolve_cols(input_cols)

        # Warn about requested columns not found in X
        if self.wealth_cols is not None:
            missing = [c for c in self.wealth_cols if c not in input_cols]
            for col in missing:
                warnings.warn(
                    f"WealthScreeningImputer: column {col!r} was specified in "
                    f"`wealth_cols` but was not found in X.  It will be skipped.",
                    UserWarning,
                )

        computed_fills = {}
        for col in self.imputed_cols_:
            idx = input_cols.index(col)
            computed_fills[col] = self._compute_fill(X[:, idx])
        self.fill_values_ = computed_fills

        return self

    def transform(self, X) -> np.ndarray:
        """Apply imputation with frozen fill values.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Feature matrix (training or held-out).

        Returns
        -------
        X_out : np.ndarray
            Copy of ``X`` with missing wealth columns filled and, if
            ``add_indicator=True``, binary missingness indicator columns
            appended.

        Raises
        ------
        sklearn.exceptions.NotFittedError
            If :meth:`fit` has not been called yet.
        """
        check_is_fitted(self, ["fill_values_", "imputed_cols_"])

        # Extract column names BEFORE validate_data converts DataFrame → ndarray
        if hasattr(X, "columns"):
            input_cols = list(X.columns)
        else:
            input_cols = None  # Will resolve after validate_data

        X = validate_data(self, X, dtype="numeric",
                          ensure_all_finite="allow-nan",
                          reset=False)

        if input_cols is None:
            n_cols = X.shape[1]
            if hasattr(self, "feature_names_in_"):
                input_cols = list(self.feature_names_in_)
            else:
                input_cols = [f"x{i}" for i in range(n_cols)]

        X_out = X.copy()
        indicators = []

        for col in self.imputed_cols_:
            if col not in input_cols:
                continue
            idx = input_cols.index(col)

            mask = np.isnan(X_out[:, idx])

            if self.add_indicator:
                indicators.append(mask.astype(np.float64).reshape(-1, 1))

            X_out[mask, idx] = self.fill_values_[col]

        if indicators:
            return np.hstack([X_out] + indicators)
        return X_out

    def get_feature_names_out(self, input_features=None):
        check_is_fitted(self)
        if input_features is not None:
            out = list(input_features)
        elif hasattr(self, "feature_names_in_"):
            out = list(self.feature_names_in_)
        else:
            out = [f"x{i}" for i in range(self.n_features_in_)]

        if self.add_indicator:
            for col in self.imputed_cols_:
                if col in out:
                    out.append(f"{col}__was_missing")
        return np.array(out, dtype=object)

    def __sklearn_tags__(self):
        tags = super().__sklearn_tags__()
        tags.input_tags.allow_nan = True
        return tags

fit(X, y=None)

Learn fill statistics from training data.

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)

Training-set feature matrix. Missing wealth columns are silently skipped (a UserWarning is issued for each absent column).

required
y ignored

Present for scikit-learn API compatibility.

None

Returns:

Name Type Description
self WealthScreeningImputer

Fitted imputer.

Raises:

Type Description
ValueError

If strategy is not "median", "mean", or "zero".

Source code in philanthropy/preprocessing/_wealth.py
def fit(self, X, y=None) -> "WealthScreeningImputer":
    """Learn fill statistics from training data.

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)
        Training-set feature matrix.  Missing wealth columns are silently
        skipped (a ``UserWarning`` is issued for each absent column).
    y : ignored
        Present for scikit-learn API compatibility.

    Returns
    -------
    self : WealthScreeningImputer
        Fitted imputer.

    Raises
    ------
    ValueError
        If ``strategy`` is not ``"median"``, ``"mean"``, or ``"zero"``.
    """
    import warnings

    if self.strategy not in self._VALID_STRATEGIES:
        raise ValueError(
            f"`strategy` must be one of {sorted(self._VALID_STRATEGIES)}, "
            f"got {self.strategy!r}."
        )

    # Extract column names BEFORE validate_data converts DataFrame → ndarray
    if hasattr(X, "columns"):
        input_cols = list(X.columns)
    else:
        input_cols = None  # Will resolve after validate_data

    X = validate_data(self, X, dtype="numeric",
                      ensure_all_finite="allow-nan",
                      reset=True)

    # After validate_data, use feature_names_in_ if it was set (DataFrame input),
    # otherwise fall back to generated names.
    if input_cols is None:
        n_cols = X.shape[1]
        if hasattr(self, "feature_names_in_"):
            input_cols = list(self.feature_names_in_)
        else:
            input_cols = [f"x{i}" for i in range(n_cols)]

    self.imputed_cols_ = self._resolve_cols(input_cols)

    # Warn about requested columns not found in X
    if self.wealth_cols is not None:
        missing = [c for c in self.wealth_cols if c not in input_cols]
        for col in missing:
            warnings.warn(
                f"WealthScreeningImputer: column {col!r} was specified in "
                f"`wealth_cols` but was not found in X.  It will be skipped.",
                UserWarning,
            )

    computed_fills = {}
    for col in self.imputed_cols_:
        idx = input_cols.index(col)
        computed_fills[col] = self._compute_fill(X[:, idx])
    self.fill_values_ = computed_fills

    return self

transform(X)

Apply imputation with frozen fill values.

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)

Feature matrix (training or held-out).

required

Returns:

Name Type Description
X_out ndarray

Copy of X with missing wealth columns filled and, if add_indicator=True, binary missingness indicator columns appended.

Raises:

Type Description
NotFittedError

If :meth:fit has not been called yet.

Source code in philanthropy/preprocessing/_wealth.py
def transform(self, X) -> np.ndarray:
    """Apply imputation with frozen fill values.

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)
        Feature matrix (training or held-out).

    Returns
    -------
    X_out : np.ndarray
        Copy of ``X`` with missing wealth columns filled and, if
        ``add_indicator=True``, binary missingness indicator columns
        appended.

    Raises
    ------
    sklearn.exceptions.NotFittedError
        If :meth:`fit` has not been called yet.
    """
    check_is_fitted(self, ["fill_values_", "imputed_cols_"])

    # Extract column names BEFORE validate_data converts DataFrame → ndarray
    if hasattr(X, "columns"):
        input_cols = list(X.columns)
    else:
        input_cols = None  # Will resolve after validate_data

    X = validate_data(self, X, dtype="numeric",
                      ensure_all_finite="allow-nan",
                      reset=False)

    if input_cols is None:
        n_cols = X.shape[1]
        if hasattr(self, "feature_names_in_"):
            input_cols = list(self.feature_names_in_)
        else:
            input_cols = [f"x{i}" for i in range(n_cols)]

    X_out = X.copy()
    indicators = []

    for col in self.imputed_cols_:
        if col not in input_cols:
            continue
        idx = input_cols.index(col)

        mask = np.isnan(X_out[:, idx])

        if self.add_indicator:
            indicators.append(mask.astype(np.float64).reshape(-1, 1))

        X_out[mask, idx] = self.fill_values_[col]

    if indicators:
        return np.hstack([X_out] + indicators)
    return X_out

EncounterTransformer

Bases: TransformerMixin, BaseEstimator

Merge clinical encounter history into philanthropic feature matrices.

Given a lookup encounter_df containing at least one discharge date per donor, this transformer enriches a gift-level DataFrame with two continuous temporal features:

days_since_last_discharge Integer number of days between the donor's most recent discharge date (observed at :meth:fit time) and the gift_date in X. Negative values indicate gifts made before discharge (pre-admission solicitations are uncommon and are flagged as NaN by default unless allow_negative_days=True). encounter_frequency_score Log-scaled count of distinct encounter records for the donor. Because the distribution of encounter counts is highly right-skewed in real AMC data, the log transform normalises the feature for downstream linear models. Donors with zero encounters receive a score of 0.0.

All identifier columns (merge_key and any column whose name contains common PII substrings: "id", "mrn", "ssn", "name", "dob", "zip") are silently dropped from the output DataFrame before it is returned, preventing accidental downstream leakage.

Parameters:

Name Type Description Default
encounter_df DataFrame

Reference table of clinical encounters. Must contain merge_key and discharge_col. Additional columns are ignored.

None
discharge_col str

Column in encounter_df holding ISO-8601 discharge timestamps.

"discharge_date"
gift_date_col str

Column in X (the gift-level DataFrame) holding ISO-8601 gift dates.

"gift_date"
merge_key str

Column name present in both encounter_df and X used to join the two tables. This column is dropped from the output.

"donor_id"
allow_negative_days bool

If False (recommended), days_since_last_discharge values below zero are coerced to NaN, indicating that the gift predates the discharge. Set to True only for retrospective analyses where pre-admission gifts are meaningful.

False
id_cols_to_drop list of str or None

Additional column names to explicitly drop on output, beyond those detected via the PII heuristic. Useful when non-standard identifiers (e.g., "pledge_record_key") are present in X.

None
fiscal_year_start int

Month (1–12) that begins the organisation's fiscal year.

7

Attributes:

Name Type Description
encounter_summary_ DataFrame

Per-donor summary table (indexed by merge_key) with columns last_discharge (Timestamp) and encounter_count (int), computed at :meth:fit time.

dropped_cols_ list of str

Names of the columns that were removed from X during the last :meth:transform call for audit/logging purposes.

n_features_in_ int

Number of columns seen in X at :meth:fit time.

feature_names_in_ ndarray of str

Column names of X at :meth:fit time.

Raises:

Type Description
ValueError

If merge_key is absent from encounter_df or from X.

ValueError

If discharge_col is absent from encounter_df.

Examples:

>>> import pandas as pd
>>> from philanthropy.preprocessing import EncounterTransformer
>>> enc = pd.DataFrame({
...     "donor_id":       [1, 1, 2],
...     "discharge_date": ["2022-01-01", "2023-06-15", "2022-09-30"],
... })
>>> gifts = pd.DataFrame({
...     "donor_id":    [1, 2, 3],
...     "gift_date":   ["2023-08-01", "2023-01-01", "2023-05-01"],
...     "gift_amount": [10000.0, 750.0, 250.0],
... })
>>> t = EncounterTransformer(encounter_df=enc, merge_key="donor_id")
>>> t.set_output(transform="pandas")
EncounterTransformer(...)
>>> out = t.fit_transform(gifts)
>>> "donor_id" not in out.columns
True
>>> "days_since_last_discharge" in out.columns
True
>>> "encounter_frequency_score" in out.columns
True
Source code in philanthropy/preprocessing/_encounters.py
class EncounterTransformer(TransformerMixin, BaseEstimator):
    """Merge clinical encounter history into philanthropic feature matrices.

    Given a lookup ``encounter_df`` containing at least one discharge date per
    donor, this transformer enriches a gift-level DataFrame with two continuous
    temporal features:

    ``days_since_last_discharge``
        Integer number of days between the donor's **most recent** discharge
        date (observed at :meth:`fit` time) and the ``gift_date`` in ``X``.
        Negative values indicate gifts made *before* discharge (pre-admission
        solicitations are uncommon and are flagged as ``NaN`` by default unless
        ``allow_negative_days=True``).
    ``encounter_frequency_score``
        Log-scaled count of distinct encounter records for the donor.  Because
        the distribution of encounter counts is highly right-skewed in real AMC
        data, the log transform normalises the feature for downstream linear
        models.  Donors with zero encounters receive a score of ``0.0``.

    All identifier columns (``merge_key`` and any column whose name contains
    common PII substrings: ``"id"``, ``"mrn"``, ``"ssn"``, ``"name"``,
    ``"dob"``, ``"zip"``) are silently dropped from the output DataFrame before
    it is returned, preventing accidental downstream leakage.

    Parameters
    ----------
    encounter_df : pd.DataFrame
        Reference table of clinical encounters.  Must contain ``merge_key``
        and ``discharge_col``.  Additional columns are ignored.
    discharge_col : str, default="discharge_date"
        Column in ``encounter_df`` holding ISO-8601 discharge timestamps.
    gift_date_col : str, default="gift_date"
        Column in ``X`` (the gift-level DataFrame) holding ISO-8601 gift
        dates.
    merge_key : str, default="donor_id"
        Column name present in **both** ``encounter_df`` and ``X`` used to
        join the two tables.  This column is dropped from the output.
    allow_negative_days : bool, default=False
        If ``False`` (recommended), ``days_since_last_discharge`` values
        below zero are coerced to ``NaN``, indicating that the gift predates
        the discharge.  Set to ``True`` only for retrospective analyses where
        pre-admission gifts are meaningful.
    id_cols_to_drop : list of str or None, default=None
        Additional column names to explicitly drop on output, beyond those
        detected via the PII heuristic.  Useful when non-standard identifiers
        (e.g., ``"pledge_record_key"``) are present in ``X``.
    fiscal_year_start : int, default=7
        Month (1–12) that begins the organisation's fiscal year.

    Attributes
    ----------
    encounter_summary_ : pd.DataFrame
        Per-donor summary table (indexed by ``merge_key``) with columns
        ``last_discharge`` (Timestamp) and ``encounter_count`` (int), computed
        at :meth:`fit` time.
    dropped_cols_ : list of str
        Names of the columns that were removed from ``X`` during the last
        :meth:`transform` call for audit/logging purposes.
    n_features_in_ : int
        Number of columns seen in ``X`` at :meth:`fit` time.
    feature_names_in_ : ndarray of str
        Column names of ``X`` at :meth:`fit` time.

    Raises
    ------
    ValueError
        If ``merge_key`` is absent from ``encounter_df`` or from ``X``.
    ValueError
        If ``discharge_col`` is absent from ``encounter_df``.

    Examples
    --------
    >>> import pandas as pd
    >>> from philanthropy.preprocessing import EncounterTransformer
    >>> enc = pd.DataFrame({
    ...     "donor_id":       [1, 1, 2],
    ...     "discharge_date": ["2022-01-01", "2023-06-15", "2022-09-30"],
    ... })
    >>> gifts = pd.DataFrame({
    ...     "donor_id":    [1, 2, 3],
    ...     "gift_date":   ["2023-08-01", "2023-01-01", "2023-05-01"],
    ...     "gift_amount": [10000.0, 750.0, 250.0],
    ... })
    >>> t = EncounterTransformer(encounter_df=enc, merge_key="donor_id")
    >>> t.set_output(transform="pandas")  # doctest: +ELLIPSIS
    EncounterTransformer(...)
    >>> out = t.fit_transform(gifts)
    >>> "donor_id" not in out.columns
    True
    >>> "days_since_last_discharge" in out.columns
    True
    >>> "encounter_frequency_score" in out.columns
    True
    """

    # Heuristic substrings used to detect PII-like column names (case-insensitive)
    PII_PATTERNS = ("_id", "mrn", "ssn", "name", "dob", "zip")

    def __init__(
        self,
        encounter_df: pd.DataFrame | None = None,
        encounter_path: str | None = None,
        discharge_col: str = "discharge_date",
        gift_date_col: str = "gift_date",
        merge_key: str = "donor_id",
        allow_negative_days: bool = False,
        id_cols_to_drop: list[str] | None = None,
    ):
        self.encounter_df = encounter_df
        self.encounter_path = encounter_path
        self.discharge_col = discharge_col
        self.gift_date_col = gift_date_col
        self.merge_key = merge_key
        self.allow_negative_days = allow_negative_days
        self.id_cols_to_drop = id_cols_to_drop

    # ------------------------------------------------------------------
    # Validation helpers
    # ------------------------------------------------------------------

    def _validate_encounter_df(self, raw_enc: pd.DataFrame) -> None:
        """Raise ``ValueError`` if ``encounter_df`` is structurally invalid."""
        if not isinstance(raw_enc, pd.DataFrame):
            raise TypeError(
                f"`encounter_df` must be a pd.DataFrame, "
                f"got {type(raw_enc).__name__!r}."
            )
        for col, label in [
            (self.merge_key, "merge_key"),
            (self.discharge_col, "discharge_col"),
        ]:
            if col not in raw_enc.columns:
                raise ValueError(
                    f"Column {col!r} (specified as `{label}`) was not found "
                    f"in `encounter_df`. Available columns: "
                    f"{list(raw_enc.columns)}."
                )

    def _validate_X(self, X: pd.DataFrame) -> None:
        """Raise ``ValueError`` if gift DataFrame ``X`` lacks required columns."""
        if not isinstance(X, pd.DataFrame):
            return  # validate_data will handle non-DataFrame inputs
        for col, label in [
            (self.merge_key, "merge_key"),
            (self.gift_date_col, "gift_date_col"),
        ]:
            if col not in X.columns:
                raise ValueError(
                    f"Required column {col!r} (specified as `{label}`) was not found "
                    f"in input X. Please ensure X contains this column or update "
                    f"the `{label}` parameter in EncounterTransformer."
                )


    # ------------------------------------------------------------------
    # Column-drop utilities
    # ------------------------------------------------------------------

    def _identify_pii_columns(self, columns: pd.Index) -> List[str]:
        """Return column names that match PII heuristics or explicit drop list."""
        explicit = list(self.id_cols_to_drop or [])
        heuristic = [
            c for c in columns
            if any(sub in c.lower() for sub in self.PII_PATTERNS)
        ]
        # Always include the merge key itself
        merge_key_set = {self.merge_key}
        combined = set(explicit) | set(heuristic) | merge_key_set
        # Only drop columns that actually exist
        return [c for c in columns if c in combined]

    # ------------------------------------------------------------------
    # fit / transform
    # ------------------------------------------------------------------

    def fit(self, X: pd.DataFrame, y=None) -> "EncounterTransformer":
        """Compute per-donor encounter summaries from ``encounter_df``.

        The fitted artefact ``encounter_summary_`` is a lightweight per-donor
        lookup containing the most-recent discharge date and total encounter
        count.  No information from ``X`` flows into this summary, which
        prevents temporal data leakage when the transformer is placed **before**
        a time-based train/test split inside a pipeline.

        Parameters
        ----------
        X : pd.DataFrame
            Gift-level DataFrame.  Used only to infer ``feature_names_in_``
            and ``n_features_in_``; no target statistics are extracted.
        y : ignored
            Present for scikit-learn API compatibility.

        Returns
        -------
        self : EncounterTransformer
            Fitted transformer instance.

        Raises
        ------
        ValueError
            If required columns are missing from ``encounter_df`` or ``X``.
        """
        if self.encounter_path is not None:
            raw_enc = pd.read_parquet(self.encounter_path)
        elif self.encounter_df is not None:
            raw_enc = self.encounter_df.copy()
        else:
            raise ValueError(
                "EncounterTransformer requires either encounter_df or "
                "encounter_path to be set."
            )

        self._validate_encounter_df(raw_enc)

        if hasattr(X, "columns"):
            input_cols = list(X.columns)
        else:
            # Although X must be a DataFrame based on _validate_X, we handle ndarray gracefully
            n_cols = np.shape(X)[1] if len(np.shape(X)) > 1 else 1
            input_cols = [f"x{i}" for i in range(n_cols)]

        self._validate_X(X)
        X = validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=True)
        self.n_features_in_ = X.shape[1]

        # --- Build encounter summary (fit-time only, no leakage from X) ---
        enc = raw_enc[[self.merge_key, self.discharge_col]].copy()
        enc[self.discharge_col] = pd.to_datetime(
            enc[self.discharge_col], errors="coerce"
        )

        missing_discharge = enc[self.discharge_col].isna().sum()
        if missing_discharge > 0:
            warnings.warn(
                f"{missing_discharge} encounter row(s) had unparseable "
                f"`discharge_col` values and were excluded from the summary.",
                UserWarning,
            )

        enc = enc.dropna(subset=[self.discharge_col])

        self.encounter_summary_ = enc.groupby(self.merge_key).agg(
            last_discharge=(self.discharge_col, "max"),
            encounter_count=(self.discharge_col, "count"),
        )

        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """Append encounter features and strip identifying columns.

        Parameters
        ----------
        X : pd.DataFrame
            Gift-level DataFrame.  Must contain ``merge_key`` and
            ``gift_date_col``.

        Returns
        -------
        X_out : np.ndarray
            Enriched array with two new columns:

            * ``days_since_last_discharge`` — Days elapsed between the donor's
              latest discharge and the gift date.  ``NaN`` for donors absent
              from the encounter table or (when ``allow_negative_days=False``)
              for gifts dated before discharge.
            * ``encounter_frequency_score`` — ``log1p(encounter_count)``.
              ``0.0`` for donors with no recorded encounters.

            All identifier-like columns (including ``merge_key``) are removed.

        Raises
        ------
        sklearn.exceptions.NotFittedError
            If :meth:`fit` has not been called yet.
        ValueError
            If ``merge_key`` or ``gift_date_col`` is absent from ``X``.
        """
        check_is_fitted(self)

        if hasattr(X, "columns"):
            input_cols = list(X.columns)
        else:
            n_cols = np.shape(X)[1] if len(np.shape(X)) > 1 else 1
            input_cols = [f"x{i}" for i in range(n_cols)]

        X = validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=False)
        X_out = pd.DataFrame(X, columns=input_cols)

        self._validate_X(X_out)
        X_out[self.gift_date_col] = pd.to_datetime(
            X_out[self.gift_date_col], errors="coerce"
        )

        # --- Merge the encounter summary ---
        X_out = X_out.merge(
            self.encounter_summary_.reset_index(),
            on=self.merge_key,
            how="left",
        )

        # --- days_since_last_discharge ---
        days_delta = (
            X_out[self.gift_date_col] - X_out["last_discharge"]
        ).dt.days.astype("float64")

        if not self.allow_negative_days:
            days_delta = days_delta.where(days_delta >= 0, other=np.nan)

        X_out["days_since_last_discharge"] = days_delta

        # --- encounter_frequency_score: log1p-scaled count ---
        X_out["encounter_frequency_score"] = np.log1p(
            X_out["encounter_count"].fillna(0).astype("float64")
        )

        # --- Drop temporary merge columns ---
        X_out = X_out.drop(columns=["last_discharge", "encounter_count"], errors="ignore")

        # --- Strip identifiers (privacy firewall) ---
        cols_to_drop = self._identify_pii_columns(X_out.columns)
        self.dropped_cols_ = cols_to_drop
        if cols_to_drop:
            X_out = X_out.drop(columns=cols_to_drop, errors="ignore")

        # --- Also drop the gift_date column (datetime, not modellable directly) ---
        if self.gift_date_col in X_out.columns:
            X_out = X_out.drop(columns=[self.gift_date_col])

        # Convert back to numpy array float64 as instructed
        return X_out.to_numpy(dtype=np.float64)

    def get_feature_names_out(self, input_features=None):
        check_is_fitted(self)
        features = list(self.feature_names_in_)
        dropped = set(self._identify_pii_columns(self.feature_names_in_))
        if self.gift_date_col in features:
            dropped.add(self.gift_date_col)

        out = [f for f in features if f not in dropped]
        out.extend(["days_since_last_discharge", "encounter_frequency_score"])
        return np.array(out, dtype=object)

    def _more_tags(self):
        return {"X_types": ["2darray", "dataframe"]}

    def __sklearn_tags__(self):
        tags = super().__sklearn_tags__()
        return tags

fit(X, y=None)

Compute per-donor encounter summaries from encounter_df.

The fitted artefact encounter_summary_ is a lightweight per-donor lookup containing the most-recent discharge date and total encounter count. No information from X flows into this summary, which prevents temporal data leakage when the transformer is placed before a time-based train/test split inside a pipeline.

Parameters:

Name Type Description Default
X DataFrame

Gift-level DataFrame. Used only to infer feature_names_in_ and n_features_in_; no target statistics are extracted.

required
y ignored

Present for scikit-learn API compatibility.

None

Returns:

Name Type Description
self EncounterTransformer

Fitted transformer instance.

Raises:

Type Description
ValueError

If required columns are missing from encounter_df or X.

Source code in philanthropy/preprocessing/_encounters.py
def fit(self, X: pd.DataFrame, y=None) -> "EncounterTransformer":
    """Compute per-donor encounter summaries from ``encounter_df``.

    The fitted artefact ``encounter_summary_`` is a lightweight per-donor
    lookup containing the most-recent discharge date and total encounter
    count.  No information from ``X`` flows into this summary, which
    prevents temporal data leakage when the transformer is placed **before**
    a time-based train/test split inside a pipeline.

    Parameters
    ----------
    X : pd.DataFrame
        Gift-level DataFrame.  Used only to infer ``feature_names_in_``
        and ``n_features_in_``; no target statistics are extracted.
    y : ignored
        Present for scikit-learn API compatibility.

    Returns
    -------
    self : EncounterTransformer
        Fitted transformer instance.

    Raises
    ------
    ValueError
        If required columns are missing from ``encounter_df`` or ``X``.
    """
    if self.encounter_path is not None:
        raw_enc = pd.read_parquet(self.encounter_path)
    elif self.encounter_df is not None:
        raw_enc = self.encounter_df.copy()
    else:
        raise ValueError(
            "EncounterTransformer requires either encounter_df or "
            "encounter_path to be set."
        )

    self._validate_encounter_df(raw_enc)

    if hasattr(X, "columns"):
        input_cols = list(X.columns)
    else:
        # Although X must be a DataFrame based on _validate_X, we handle ndarray gracefully
        n_cols = np.shape(X)[1] if len(np.shape(X)) > 1 else 1
        input_cols = [f"x{i}" for i in range(n_cols)]

    self._validate_X(X)
    X = validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=True)
    self.n_features_in_ = X.shape[1]

    # --- Build encounter summary (fit-time only, no leakage from X) ---
    enc = raw_enc[[self.merge_key, self.discharge_col]].copy()
    enc[self.discharge_col] = pd.to_datetime(
        enc[self.discharge_col], errors="coerce"
    )

    missing_discharge = enc[self.discharge_col].isna().sum()
    if missing_discharge > 0:
        warnings.warn(
            f"{missing_discharge} encounter row(s) had unparseable "
            f"`discharge_col` values and were excluded from the summary.",
            UserWarning,
        )

    enc = enc.dropna(subset=[self.discharge_col])

    self.encounter_summary_ = enc.groupby(self.merge_key).agg(
        last_discharge=(self.discharge_col, "max"),
        encounter_count=(self.discharge_col, "count"),
    )

    return self

transform(X)

Append encounter features and strip identifying columns.

Parameters:

Name Type Description Default
X DataFrame

Gift-level DataFrame. Must contain merge_key and gift_date_col.

required

Returns:

Name Type Description
X_out ndarray

Enriched array with two new columns:

  • days_since_last_discharge — Days elapsed between the donor's latest discharge and the gift date. NaN for donors absent from the encounter table or (when allow_negative_days=False) for gifts dated before discharge.
  • encounter_frequency_scorelog1p(encounter_count). 0.0 for donors with no recorded encounters.

All identifier-like columns (including merge_key) are removed.

Raises:

Type Description
NotFittedError

If :meth:fit has not been called yet.

ValueError

If merge_key or gift_date_col is absent from X.

Source code in philanthropy/preprocessing/_encounters.py
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
    """Append encounter features and strip identifying columns.

    Parameters
    ----------
    X : pd.DataFrame
        Gift-level DataFrame.  Must contain ``merge_key`` and
        ``gift_date_col``.

    Returns
    -------
    X_out : np.ndarray
        Enriched array with two new columns:

        * ``days_since_last_discharge`` — Days elapsed between the donor's
          latest discharge and the gift date.  ``NaN`` for donors absent
          from the encounter table or (when ``allow_negative_days=False``)
          for gifts dated before discharge.
        * ``encounter_frequency_score`` — ``log1p(encounter_count)``.
          ``0.0`` for donors with no recorded encounters.

        All identifier-like columns (including ``merge_key``) are removed.

    Raises
    ------
    sklearn.exceptions.NotFittedError
        If :meth:`fit` has not been called yet.
    ValueError
        If ``merge_key`` or ``gift_date_col`` is absent from ``X``.
    """
    check_is_fitted(self)

    if hasattr(X, "columns"):
        input_cols = list(X.columns)
    else:
        n_cols = np.shape(X)[1] if len(np.shape(X)) > 1 else 1
        input_cols = [f"x{i}" for i in range(n_cols)]

    X = validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=False)
    X_out = pd.DataFrame(X, columns=input_cols)

    self._validate_X(X_out)
    X_out[self.gift_date_col] = pd.to_datetime(
        X_out[self.gift_date_col], errors="coerce"
    )

    # --- Merge the encounter summary ---
    X_out = X_out.merge(
        self.encounter_summary_.reset_index(),
        on=self.merge_key,
        how="left",
    )

    # --- days_since_last_discharge ---
    days_delta = (
        X_out[self.gift_date_col] - X_out["last_discharge"]
    ).dt.days.astype("float64")

    if not self.allow_negative_days:
        days_delta = days_delta.where(days_delta >= 0, other=np.nan)

    X_out["days_since_last_discharge"] = days_delta

    # --- encounter_frequency_score: log1p-scaled count ---
    X_out["encounter_frequency_score"] = np.log1p(
        X_out["encounter_count"].fillna(0).astype("float64")
    )

    # --- Drop temporary merge columns ---
    X_out = X_out.drop(columns=["last_discharge", "encounter_count"], errors="ignore")

    # --- Strip identifiers (privacy firewall) ---
    cols_to_drop = self._identify_pii_columns(X_out.columns)
    self.dropped_cols_ = cols_to_drop
    if cols_to_drop:
        X_out = X_out.drop(columns=cols_to_drop, errors="ignore")

    # --- Also drop the gift_date column (datetime, not modellable directly) ---
    if self.gift_date_col in X_out.columns:
        X_out = X_out.drop(columns=[self.gift_date_col])

    # Convert back to numpy array float64 as instructed
    return X_out.to_numpy(dtype=np.float64)

RFMTransformer

Bases: TransformerMixin, BaseEstimator

Transforms transaction logs into Recency, Frequency, and Monetary (RFM) features.

Parameters:

Name Type Description Default
reference_date str or datetime - like

The date used as the reference point to calculate recency. If None, the maximum gift_date in the dataframe is used.

None
agg_func str or callable

The aggregation function to calculate the monetary value. Typical values are 'sum' (cumulative) or 'mean' (average).

'sum'
Source code in philanthropy/preprocessing/_rfm.py
class RFMTransformer(TransformerMixin, BaseEstimator):
    """
    Transforms transaction logs into Recency, Frequency, and Monetary (RFM) features.

    Parameters
    ----------
    reference_date : str or datetime-like, default=None
        The date used as the reference point to calculate recency.
        If None, the maximum gift_date in the dataframe is used.
    agg_func : str or callable, default='sum'
        The aggregation function to calculate the monetary value. 
        Typical values are 'sum' (cumulative) or 'mean' (average).
    """
    def __init__(self, reference_date=None, agg_func='sum'):
        self.reference_date = reference_date
        self.agg_func = agg_func

    def fit(self, X, y=None):
        """
        Fits the transformer. This simply validates the input and returns self.
        """
        # Manual validation to avoid name/length strictness during fit
        if hasattr(X, "columns"):
            self.feature_names_in_ = np.array(X.columns.tolist(), dtype=object)
            self.n_features_in_ = len(self.feature_names_in_)
        else:
            X_arr = np.asarray(X)
            self.n_features_in_ = X_arr.shape[1]
            self.feature_names_in_ = np.array([f"x{i}" for i in range(self.n_features_in_)], dtype=object)

        self._validate_input(X)
        return self

    def transform(self, X):
        """
        Transforms the transaction logs into RFM features.
        """
        check_is_fitted(self)
        if not hasattr(X, "columns") and not isinstance(X, pd.DataFrame):
             raise TypeError("X must be a pandas DataFrame")
        # Manual validation
        self._validate_input(X)

        X_df = X.copy() if hasattr(X, "columns") else pd.DataFrame(X, columns=self.feature_names_in_)
        X_df['gift_date'] = pd.to_datetime(X_df['gift_date'])

        if self.reference_date is not None:
            ref_date = pd.to_datetime(self.reference_date)
        else:
            ref_date = X_df['gift_date'].max()

        grouped = X_df.groupby('donor_id')

        # Recency: Days since the last gift relative to reference_date
        last_gift = grouped['gift_date'].max()
        recency = (ref_date - last_gift).dt.days

        # Frequency: Total number of gifts
        frequency = grouped['gift_date'].count()

        # Monetary: Average or cumulative gift amount depending on agg_func
        monetary = grouped['gift_amount'].agg(self.agg_func)

        rfm_df = pd.DataFrame({
            'donor_id': recency.index,
            'recency': recency.values,
            'frequency': frequency.values,
            'monetary': monetary.values
        })

        return rfm_df

    def _validate_input(self, X):
        cols = X.columns if hasattr(X, "columns") else self.feature_names_in_
        required_cols = {"donor_id", "gift_date", "gift_amount"}
        if not required_cols.issubset(cols):
            raise ValueError(f"X must contain columns: {required_cols}")

    def get_feature_names_out(self, input_features=None):
        check_is_fitted(self)
        return np.array(['donor_id', 'recency', 'frequency', 'monetary'], dtype=object)

    def _more_tags(self):
        return {"X_types": ["2darray", "dataframe", "string"]}

    def __sklearn_tags__(self):
        tags = super().__sklearn_tags__()
        tags.input_tags.allow_nan = True
        tags.input_tags.string = True
        tags._skip_test = True # Schema-dependent
        return tags

fit(X, y=None)

Fits the transformer. This simply validates the input and returns self.

Source code in philanthropy/preprocessing/_rfm.py
def fit(self, X, y=None):
    """
    Fits the transformer. This simply validates the input and returns self.
    """
    # Manual validation to avoid name/length strictness during fit
    if hasattr(X, "columns"):
        self.feature_names_in_ = np.array(X.columns.tolist(), dtype=object)
        self.n_features_in_ = len(self.feature_names_in_)
    else:
        X_arr = np.asarray(X)
        self.n_features_in_ = X_arr.shape[1]
        self.feature_names_in_ = np.array([f"x{i}" for i in range(self.n_features_in_)], dtype=object)

    self._validate_input(X)
    return self

transform(X)

Transforms the transaction logs into RFM features.

Source code in philanthropy/preprocessing/_rfm.py
def transform(self, X):
    """
    Transforms the transaction logs into RFM features.
    """
    check_is_fitted(self)
    if not hasattr(X, "columns") and not isinstance(X, pd.DataFrame):
         raise TypeError("X must be a pandas DataFrame")
    # Manual validation
    self._validate_input(X)

    X_df = X.copy() if hasattr(X, "columns") else pd.DataFrame(X, columns=self.feature_names_in_)
    X_df['gift_date'] = pd.to_datetime(X_df['gift_date'])

    if self.reference_date is not None:
        ref_date = pd.to_datetime(self.reference_date)
    else:
        ref_date = X_df['gift_date'].max()

    grouped = X_df.groupby('donor_id')

    # Recency: Days since the last gift relative to reference_date
    last_gift = grouped['gift_date'].max()
    recency = (ref_date - last_gift).dt.days

    # Frequency: Total number of gifts
    frequency = grouped['gift_date'].count()

    # Monetary: Average or cumulative gift amount depending on agg_func
    monetary = grouped['gift_amount'].agg(self.agg_func)

    rfm_df = pd.DataFrame({
        'donor_id': recency.index,
        'recency': recency.values,
        'frequency': frequency.values,
        'monetary': monetary.values
    })

    return rfm_df

PlannedGivingSignalTransformer

Bases: TransformerMixin, BaseEstimator

Extract features for bequest / planned-giving intent classification.

Planned giving (bequests, charitable remainder trusts) requires a separate predictive model from major gifts. Key drivers are donor age ≥ 65, giving tenure ≥ 10 years, and a wealth-screening vendor "charitable inclination" score. This transformer extracts a four-column feature vector optimised for bequest/legacy gift intent classifiers.

Parameters:

Name Type Description Default
age_col str

Column containing donor age in years.

"donor_age"
tenure_col str

Column containing number of years the donor has been active.

"years_active"
planned_gift_inclination_col str

Column containing the wealth-screening vendor's charitable inclination score, expected to be in [0, 1]. Missing values are treated as a sentinel value (-1.0) to distinguish "vendor data absent" from a genuine 0 score.

"planned_gift_inclination"
age_threshold int

Minimum age (inclusive) for the is_legacy_age flag.

65
tenure_threshold_years int

Minimum years active (inclusive) for the is_loyal_donor flag.

10

Attributes:

Name Type Description
n_features_in_ int

Number of input features seen at fit time.

feature_names_in_ ndarray of str

Column names of X at fit time (set when X is a DataFrame).

Notes

Output columns ~~~~~~~~~~~~~~ ========================= ================================================ Col Name Description ========================= ================================================ 0 is_legacy_age uint8: 1 if age >= age_threshold, else 0. NaN age → 0. 1 is_loyal_donor uint8: 1 if tenure >= tenure_threshold_years. NaN tenure → 0. 2 inclination_score float64: raw planned_gift_inclination value, clipped to [0, 1]. Missing → -1.0 sentinel (distinguishable from a genuine 0 score). 3 composite_score float64: is_legacy_age + is_loyal_donor + max(inclination_score, 0). Range [0.0, 3.0]. ========================= ================================================

Examples:

>>> import pandas as pd
>>> import numpy as np
>>> from philanthropy.preprocessing import PlannedGivingSignalTransformer
>>> X = pd.DataFrame({
...     "donor_age": [70, 60, None],
...     "years_active": [15, 5, 12],
...     "planned_gift_inclination": [0.8, 0.3, None],
... })
>>> t = PlannedGivingSignalTransformer()
>>> out = t.fit_transform(X)
>>> out.shape
(3, 4)
Source code in philanthropy/preprocessing/_planned_giving.py
class PlannedGivingSignalTransformer(TransformerMixin, BaseEstimator):
    """Extract features for bequest / planned-giving intent classification.

    Planned giving (bequests, charitable remainder trusts) requires a separate
    predictive model from major gifts. Key drivers are donor age ≥ 65, giving
    tenure ≥ 10 years, and a wealth-screening vendor "charitable inclination"
    score. This transformer extracts a four-column feature vector optimised for
    bequest/legacy gift intent classifiers.

    Parameters
    ----------
    age_col : str, default="donor_age"
        Column containing donor age in years.
    tenure_col : str, default="years_active"
        Column containing number of years the donor has been active.
    planned_gift_inclination_col : str, default="planned_gift_inclination"
        Column containing the wealth-screening vendor's charitable inclination
        score, expected to be in [0, 1]. Missing values are treated as a
        sentinel value (-1.0) to distinguish "vendor data absent" from a
        genuine 0 score.
    age_threshold : int, default=65
        Minimum age (inclusive) for the is_legacy_age flag.
    tenure_threshold_years : int, default=10
        Minimum years active (inclusive) for the is_loyal_donor flag.

    Attributes
    ----------
    n_features_in_ : int
        Number of input features seen at fit time.
    feature_names_in_ : ndarray of str
        Column names of X at fit time (set when X is a DataFrame).

    Notes
    -----
    Output columns
    ~~~~~~~~~~~~~~
    ========================= ================================================
    Col  Name                  Description
    ========================= ================================================
    0    ``is_legacy_age``     uint8: 1 if age >= age_threshold, else 0.
                               NaN age → 0.
    1    ``is_loyal_donor``    uint8: 1 if tenure >= tenure_threshold_years.
                               NaN tenure → 0.
    2    ``inclination_score`` float64: raw planned_gift_inclination value,
                               clipped to [0, 1]. Missing → -1.0 sentinel
                               (distinguishable from a genuine 0 score).
    3    ``composite_score``   float64: is_legacy_age + is_loyal_donor
                               + max(inclination_score, 0). Range [0.0, 3.0].
    ========================= ================================================

    Examples
    --------
    >>> import pandas as pd
    >>> import numpy as np
    >>> from philanthropy.preprocessing import PlannedGivingSignalTransformer
    >>> X = pd.DataFrame({
    ...     "donor_age": [70, 60, None],
    ...     "years_active": [15, 5, 12],
    ...     "planned_gift_inclination": [0.8, 0.3, None],
    ... })
    >>> t = PlannedGivingSignalTransformer()
    >>> out = t.fit_transform(X)
    >>> out.shape
    (3, 4)
    """

    def __init__(
        self,
        age_col: str = "donor_age",
        tenure_col: str = "years_active",
        planned_gift_inclination_col: str = "planned_gift_inclination",
        age_threshold: int = 65,
        tenure_threshold_years: int = 10,
    ) -> None:
        self.age_col = age_col
        self.tenure_col = tenure_col
        self.planned_gift_inclination_col = planned_gift_inclination_col
        self.age_threshold = age_threshold
        self.tenure_threshold_years = tenure_threshold_years

    def fit(self, X, y=None) -> "PlannedGivingSignalTransformer":
        """Validate input schema and record n_features_in_.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Donor-level feature matrix.
        y : ignored

        Returns
        -------
        self : PlannedGivingSignalTransformer
        """
        validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=True)
        return self

    def transform(self, X, y=None) -> np.ndarray:
        """Compute the 4-column planned-giving feature vector.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Donor-level feature matrix. Accepts pd.DataFrame (columns may or
            may not exist — missing columns are handled gracefully with NaN / 0).

        Returns
        -------
        X_out : np.ndarray of shape (n_samples, 4), dtype float64
            Columns: [is_legacy_age, is_loyal_donor, inclination_score,
            composite_score].

        Raises
        ------
        sklearn.exceptions.NotFittedError
            If :meth:`fit` has not been called yet.
        """
        check_is_fitted(self)
        validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=False)

        # Work with a DataFrame for convenient column access
        if isinstance(X, pd.DataFrame):
            df = X
        elif hasattr(self, "feature_names_in_"):
            df = pd.DataFrame(
                np.asarray(X, dtype=float), columns=self.feature_names_in_
            )
        else:
            df = pd.DataFrame(np.asarray(X, dtype=float))

        n = len(df)

        # --- col 0: is_legacy_age ---
        if self.age_col in df.columns:
            age = pd.to_numeric(df[self.age_col], errors="coerce")
            is_legacy_age = np.where(age.isna(), 0, (age >= self.age_threshold).astype(int))
        else:
            is_legacy_age = np.zeros(n, dtype=int)

        # --- col 1: is_loyal_donor ---
        if self.tenure_col in df.columns:
            tenure = pd.to_numeric(df[self.tenure_col], errors="coerce")
            is_loyal_donor = np.where(
                tenure.isna(), 0, (tenure >= self.tenure_threshold_years).astype(int)
            )
        else:
            is_loyal_donor = np.zeros(n, dtype=int)

        # --- col 2: inclination_score ---
        if self.planned_gift_inclination_col in df.columns:
            raw_incl = pd.to_numeric(
                df[self.planned_gift_inclination_col], errors="coerce"
            )
            inclination_score = np.where(
                raw_incl.isna(),
                -1.0,  # sentinel: vendor data absent
                np.clip(raw_incl.to_numpy(dtype=float), 0.0, 1.0),
            )
        else:
            inclination_score = np.full(n, -1.0, dtype=float)  # vendor data absent

        # --- col 3: composite_score ---
        # is_legacy_age + is_loyal_donor + max(inclination_score, 0)
        incl_clipped = np.maximum(inclination_score, 0.0)
        composite_score = is_legacy_age.astype(float) + is_loyal_donor.astype(float) + incl_clipped

        return np.column_stack(
            [
                is_legacy_age.astype(np.float64),
                is_loyal_donor.astype(np.float64),
                inclination_score.astype(np.float64),
                composite_score.astype(np.float64),
            ]
        )

    def get_feature_names_out(self, input_features=None):
        check_is_fitted(self)
        return np.array(
            ["is_legacy_age", "is_loyal_donor", "inclination_score", "composite_score"],
            dtype=object,
        )

    def __sklearn_tags__(self):
        tags = super().__sklearn_tags__()
        tags.input_tags.allow_nan = True
        # This transformer extracts named columns from mixed-type DataFrames
        # and handles non-numeric input gracefully. Setting string=True suppresses
        # check_dtype_object's strict TypeError requirement.
        tags.input_tags.string = True
        return tags

fit(X, y=None)

Validate input schema and record n_features_in_.

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)

Donor-level feature matrix.

required
y ignored
None

Returns:

Name Type Description
self PlannedGivingSignalTransformer
Source code in philanthropy/preprocessing/_planned_giving.py
def fit(self, X, y=None) -> "PlannedGivingSignalTransformer":
    """Validate input schema and record n_features_in_.

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)
        Donor-level feature matrix.
    y : ignored

    Returns
    -------
    self : PlannedGivingSignalTransformer
    """
    validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=True)
    return self

transform(X, y=None)

Compute the 4-column planned-giving feature vector.

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)

Donor-level feature matrix. Accepts pd.DataFrame (columns may or may not exist — missing columns are handled gracefully with NaN / 0).

required

Returns:

Name Type Description
X_out np.ndarray of shape (n_samples, 4), dtype float64

Columns: [is_legacy_age, is_loyal_donor, inclination_score, composite_score].

Raises:

Type Description
NotFittedError

If :meth:fit has not been called yet.

Source code in philanthropy/preprocessing/_planned_giving.py
def transform(self, X, y=None) -> np.ndarray:
    """Compute the 4-column planned-giving feature vector.

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)
        Donor-level feature matrix. Accepts pd.DataFrame (columns may or
        may not exist — missing columns are handled gracefully with NaN / 0).

    Returns
    -------
    X_out : np.ndarray of shape (n_samples, 4), dtype float64
        Columns: [is_legacy_age, is_loyal_donor, inclination_score,
        composite_score].

    Raises
    ------
    sklearn.exceptions.NotFittedError
        If :meth:`fit` has not been called yet.
    """
    check_is_fitted(self)
    validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=False)

    # Work with a DataFrame for convenient column access
    if isinstance(X, pd.DataFrame):
        df = X
    elif hasattr(self, "feature_names_in_"):
        df = pd.DataFrame(
            np.asarray(X, dtype=float), columns=self.feature_names_in_
        )
    else:
        df = pd.DataFrame(np.asarray(X, dtype=float))

    n = len(df)

    # --- col 0: is_legacy_age ---
    if self.age_col in df.columns:
        age = pd.to_numeric(df[self.age_col], errors="coerce")
        is_legacy_age = np.where(age.isna(), 0, (age >= self.age_threshold).astype(int))
    else:
        is_legacy_age = np.zeros(n, dtype=int)

    # --- col 1: is_loyal_donor ---
    if self.tenure_col in df.columns:
        tenure = pd.to_numeric(df[self.tenure_col], errors="coerce")
        is_loyal_donor = np.where(
            tenure.isna(), 0, (tenure >= self.tenure_threshold_years).astype(int)
        )
    else:
        is_loyal_donor = np.zeros(n, dtype=int)

    # --- col 2: inclination_score ---
    if self.planned_gift_inclination_col in df.columns:
        raw_incl = pd.to_numeric(
            df[self.planned_gift_inclination_col], errors="coerce"
        )
        inclination_score = np.where(
            raw_incl.isna(),
            -1.0,  # sentinel: vendor data absent
            np.clip(raw_incl.to_numpy(dtype=float), 0.0, 1.0),
        )
    else:
        inclination_score = np.full(n, -1.0, dtype=float)  # vendor data absent

    # --- col 3: composite_score ---
    # is_legacy_age + is_loyal_donor + max(inclination_score, 0)
    incl_clipped = np.maximum(inclination_score, 0.0)
    composite_score = is_legacy_age.astype(float) + is_loyal_donor.astype(float) + incl_clipped

    return np.column_stack(
        [
            is_legacy_age.astype(np.float64),
            is_loyal_donor.astype(np.float64),
            inclination_score.astype(np.float64),
            composite_score.astype(np.float64),
        ]
    )

GratefulPatientFeaturizer

Bases: TransformerMixin, BaseEstimator

Featurize clinical signals from grateful-patient encounter data.

This transformer bridges EHR service-line and treating-physician data with the advancement CRM to produce clinical-depth features for major gift propensity models.

Parameters:

Name Type Description Default
encounter_df DataFrame | None

Reference table of clinical encounters. Must contain merge_key and discharge_col columns. Stored verbatim for get_params compatibility; snapshotted via .copy() at fit time to prevent mutation leakage.

None
encounter_path str | None

Path to a Parquet or CSV file containing clinical encounters. Alternative to encounter_df. If both are provided, encounter_path takes precedence.

None
service_line_col str

Column in the encounter table holding service line / department name.

"service_line"
physician_col str

Column in the encounter table holding the attending physician ID.

"attending_physician_id"
drg_weight_col str | None

Optional column holding DRG (Diagnosis Related Group) relative weights. If present, total DRG weight per donor is computed.

None
use_capacity_weights bool

If True, apply AMC-benchmarked service-line capacity weights to scale the clinical gravity score.

True
merge_key str

Column name present in both the encounter table and X used to merge.

"donor_id"
discharge_col str

Column in the encounter table holding discharge dates.

"discharge_date"

Attributes:

Name Type Description
encounter_summary_ DataFrame

Per-donor aggregated encounter features, indexed by merge_key. Set at fit time.

n_features_in_ int

Number of features seen at fit time (set by _validate_data).

feature_names_in_ ndarray of str

Column names of X at fit time (set by _validate_data when X is a DataFrame).

Raises:

Type Description
ValueError

If neither encounter_df nor encounter_path is provided.

Notes

The four output columns are:

========================= ================================================ Column Description ========================= ================================================ clinical_gravity_score Encounter count × service-line capacity weight. distinct_service_lines Number of unique service lines. distinct_physicians Number of unique attending physicians. total_drg_weight Sum of DRG relative weights (NaN if unavailable). ========================= ================================================

Donors absent from the encounter table receive zeros for all columns.

Examples:

>>> import pandas as pd
>>> import numpy as np
>>> from philanthropy.preprocessing import GratefulPatientFeaturizer
>>> enc = pd.DataFrame({
...     "donor_id": [1, 1, 2],
...     "discharge_date": ["2022-01-01", "2023-06-15", "2022-09-30"],
...     "service_line": ["cardiac", "cardiac", "oncology"],
...     "attending_physician_id": ["P1", "P2", "P3"],
... })
>>> X = pd.DataFrame({"donor_id": [1, 2, 3]})
>>> gpf = GratefulPatientFeaturizer(encounter_df=enc)
>>> gpf.fit(X)
GratefulPatientFeaturizer(...)
>>> out = gpf.transform(X)
>>> out.shape
(3, 4)
Source code in philanthropy/preprocessing/_grateful_patient.py
class GratefulPatientFeaturizer(TransformerMixin, BaseEstimator):
    """Featurize clinical signals from grateful-patient encounter data.

    This transformer bridges EHR service-line and treating-physician data with
    the advancement CRM to produce clinical-depth features for major gift
    propensity models.

    Parameters
    ----------
    encounter_df : pd.DataFrame | None, default=None
        Reference table of clinical encounters. Must contain ``merge_key``
        and ``discharge_col`` columns. Stored verbatim for ``get_params``
        compatibility; snapshotted via ``.copy()`` at fit time to prevent
        mutation leakage.
    encounter_path : str | None, default=None
        Path to a Parquet or CSV file containing clinical encounters.
        Alternative to ``encounter_df``. If both are provided,
        ``encounter_path`` takes precedence.
    service_line_col : str, default="service_line"
        Column in the encounter table holding service line / department name.
    physician_col : str, default="attending_physician_id"
        Column in the encounter table holding the attending physician ID.
    drg_weight_col : str | None, default=None
        Optional column holding DRG (Diagnosis Related Group) relative weights.
        If present, total DRG weight per donor is computed.
    use_capacity_weights : bool, default=True
        If True, apply AMC-benchmarked service-line capacity weights to scale
        the clinical gravity score.
    merge_key : str, default="donor_id"
        Column name present in both the encounter table and ``X`` used to merge.
    discharge_col : str, default="discharge_date"
        Column in the encounter table holding discharge dates.

    Attributes
    ----------
    encounter_summary_ : pd.DataFrame
        Per-donor aggregated encounter features, indexed by ``merge_key``.
        Set at fit time.
    n_features_in_ : int
        Number of features seen at fit time (set by ``_validate_data``).
    feature_names_in_ : ndarray of str
        Column names of ``X`` at fit time (set by ``_validate_data`` when X
        is a DataFrame).

    Raises
    ------
    ValueError
        If neither ``encounter_df`` nor ``encounter_path`` is provided.

    Notes
    -----
    The four output columns are:

    ========================= ================================================
    Column                    Description
    ========================= ================================================
    ``clinical_gravity_score`` Encounter count × service-line capacity weight.
    ``distinct_service_lines`` Number of unique service lines.
    ``distinct_physicians``    Number of unique attending physicians.
    ``total_drg_weight``       Sum of DRG relative weights (NaN if unavailable).
    ========================= ================================================

    Donors absent from the encounter table receive zeros for all columns.

    Examples
    --------
    >>> import pandas as pd
    >>> import numpy as np
    >>> from philanthropy.preprocessing import GratefulPatientFeaturizer
    >>> enc = pd.DataFrame({
    ...     "donor_id": [1, 1, 2],
    ...     "discharge_date": ["2022-01-01", "2023-06-15", "2022-09-30"],
    ...     "service_line": ["cardiac", "cardiac", "oncology"],
    ...     "attending_physician_id": ["P1", "P2", "P3"],
    ... })
    >>> X = pd.DataFrame({"donor_id": [1, 2, 3]})
    >>> gpf = GratefulPatientFeaturizer(encounter_df=enc)
    >>> gpf.fit(X)
    GratefulPatientFeaturizer(...)
    >>> out = gpf.transform(X)
    >>> out.shape
    (3, 4)
    """

    def __init__(
        self,
        encounter_df: pd.DataFrame | None = None,
        encounter_path: str | None = None,
        service_line_col: str = "service_line",
        physician_col: str = "attending_physician_id",
        drg_weight_col: str | None = None,
        use_capacity_weights: bool = True,
        merge_key: str = "donor_id",
        discharge_col: str = "discharge_date",
    ) -> None:
        self.encounter_df = encounter_df
        self.encounter_path = encounter_path
        self.service_line_col = service_line_col
        self.physician_col = physician_col
        self.drg_weight_col = drg_weight_col
        self.use_capacity_weights = use_capacity_weights
        self.merge_key = merge_key
        self.discharge_col = discharge_col

    def fit(self, X, y=None) -> "GratefulPatientFeaturizer":
        """Build per-donor encounter summaries from encounter data.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Donor-level feature matrix. Used only for schema registration via
            ``_validate_data``; no target leakage occurs here.
        y : ignored

        Returns
        -------
        self : GratefulPatientFeaturizer

        Raises
        ------
        ValueError
            If neither ``encounter_df`` nor ``encounter_path`` is set.
        """
        # Step 1: Load encounter data — snapshot, never store raw_enc
        if self.encounter_path is not None:
            raw_enc = pd.read_parquet(self.encounter_path)
        elif self.encounter_df is not None:
            raw_enc = self.encounter_df.copy()  # critical: snapshot here
        else:
            raise ValueError(
                "GratefulPatientFeaturizer requires either encounter_df or "
                "encounter_path to be set."
            )

        # Step 2: Coerce discharge dates
        raw_enc = raw_enc.copy()
        raw_enc[self.discharge_col] = pd.to_datetime(
            raw_enc[self.discharge_col], errors="coerce"
        )

        # Step 3: Normalise service_line values
        if self.service_line_col in raw_enc.columns:
            raw_enc[self.service_line_col] = (
                raw_enc[self.service_line_col]
                .astype(str)
                .apply(_normalise_service_line)
            )

        # Step 4: Groupby merge_key
        grouped = raw_enc.groupby(self.merge_key)

        summary_parts: dict[str, pd.Series] = {}

        if self.service_line_col in raw_enc.columns:
            # Mode (most frequent) service line per donor
            summary_parts["primary_service_line"] = grouped[
                self.service_line_col
            ].agg(lambda x: x.mode().iloc[0] if len(x) > 0 else "general")
            summary_parts["distinct_service_lines"] = grouped[
                self.service_line_col
            ].nunique()
        else:
            summary_parts["primary_service_line"] = pd.Series(
                "general", index=grouped.groups.keys()
            )
            summary_parts["distinct_service_lines"] = pd.Series(
                0, dtype=int, index=grouped.groups.keys()
            )

        if self.physician_col in raw_enc.columns:
            summary_parts["distinct_physicians"] = grouped[
                self.physician_col
            ].nunique()
        else:
            summary_parts["distinct_physicians"] = pd.Series(
                0, dtype=int, index=grouped.groups.keys()
            )

        summary_parts["total_encounters"] = grouped[self.discharge_col].count()
        summary_parts["last_discharge"] = grouped[self.discharge_col].max()

        # Step 5: DRG weight column
        if (
            self.drg_weight_col is not None
            and self.drg_weight_col in raw_enc.columns
        ):
            summary_parts["total_drg_weight"] = grouped[
                self.drg_weight_col
            ].sum()
        else:
            # Sentinel: will become NaN for all donors
            keys = list(grouped.groups.keys())
            summary_parts["total_drg_weight"] = pd.Series(
                np.nan, index=keys, dtype=float
            )

        encounter_summary = pd.DataFrame(summary_parts)

        # Step 6: Clinical gravity score
        if self.use_capacity_weights:
            encounter_summary["clinical_gravity_score"] = (
                encounter_summary["total_encounters"].astype(float)
                * encounter_summary["primary_service_line"].map(
                    lambda s: _SERVICE_LINE_CAPACITY_WEIGHTS.get(s, 1.0)
                )
            )
        else:
            encounter_summary["clinical_gravity_score"] = (
                encounter_summary["total_encounters"].astype(float)
            )

        # Step 7: Store fitted attribute
        self.encounter_summary_ = encounter_summary

        # Step 8: Register feature schema
        validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=True)

        return self

    def transform(self, X, y=None) -> np.ndarray:
        """Merge clinical features into the donor feature matrix.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Donor-level feature matrix. Must contain ``merge_key`` column
            (or that column as the first column for ndarray input).

        Returns
        -------
        X_out : np.ndarray of shape (n_samples, 4), dtype float64
            Columns in order:
            ``clinical_gravity_score``, ``distinct_service_lines``,
            ``distinct_physicians``, ``total_drg_weight``.
            Donors absent from encounter table get 0.0 for all columns.
        """
        validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=False)
        check_is_fitted(self)

        _FEATURE_COLS = [
            "clinical_gravity_score",
            "distinct_service_lines",
            "distinct_physicians",
            "total_drg_weight",
        ]

        # Build a DataFrame to merge on merge_key
        if isinstance(X, pd.DataFrame) and self.merge_key in X.columns:
            X_df = X[[self.merge_key]].copy()
        elif isinstance(X, pd.DataFrame) and hasattr(self, "feature_names_in_"):
            # merge_key must be in the feature names
            if self.merge_key in self.feature_names_in_:
                X_df = X[[self.merge_key]].copy()
            else:
                # No merge key available — return zeros
                n = len(X)
                return np.zeros((n, 4), dtype=np.float64)
        elif hasattr(self, "feature_names_in_") and self.merge_key in list(
            self.feature_names_in_
        ):
            arr = np.asarray(X)
            col_idx = list(self.feature_names_in_).index(self.merge_key)
            X_df = pd.DataFrame(
                {self.merge_key: arr[:, col_idx]}
            )
        else:
            # No merge key — cannot join, return zeros
            n = np.asarray(X).shape[0]
            return np.zeros((n, 4), dtype=np.float64)

        # Left-merge with encounter_summary_
        merged = X_df.merge(
            self.encounter_summary_[_FEATURE_COLS],
            left_on=self.merge_key,
            right_index=True,
            how="left",
        )

        # Step 5: fillna(0.0) — unknown donors get zeros
        result = merged[_FEATURE_COLS].fillna(0.0)

        return result.to_numpy(dtype=np.float64)

    def get_feature_names_out(self, input_features=None):
        check_is_fitted(self)
        return np.array(
            [
                "clinical_gravity_score",
                "distinct_service_lines",
                "distinct_physicians",
                "total_drg_weight",
            ],
            dtype=object,
        )

    def __sklearn_tags__(self):
        tags = super().__sklearn_tags__()
        tags.input_tags.allow_nan = True
        return tags

fit(X, y=None)

Build per-donor encounter summaries from encounter data.

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)

Donor-level feature matrix. Used only for schema registration via _validate_data; no target leakage occurs here.

required
y ignored
None

Returns:

Name Type Description
self GratefulPatientFeaturizer

Raises:

Type Description
ValueError

If neither encounter_df nor encounter_path is set.

Source code in philanthropy/preprocessing/_grateful_patient.py
def fit(self, X, y=None) -> "GratefulPatientFeaturizer":
    """Build per-donor encounter summaries from encounter data.

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)
        Donor-level feature matrix. Used only for schema registration via
        ``_validate_data``; no target leakage occurs here.
    y : ignored

    Returns
    -------
    self : GratefulPatientFeaturizer

    Raises
    ------
    ValueError
        If neither ``encounter_df`` nor ``encounter_path`` is set.
    """
    # Step 1: Load encounter data — snapshot, never store raw_enc
    if self.encounter_path is not None:
        raw_enc = pd.read_parquet(self.encounter_path)
    elif self.encounter_df is not None:
        raw_enc = self.encounter_df.copy()  # critical: snapshot here
    else:
        raise ValueError(
            "GratefulPatientFeaturizer requires either encounter_df or "
            "encounter_path to be set."
        )

    # Step 2: Coerce discharge dates
    raw_enc = raw_enc.copy()
    raw_enc[self.discharge_col] = pd.to_datetime(
        raw_enc[self.discharge_col], errors="coerce"
    )

    # Step 3: Normalise service_line values
    if self.service_line_col in raw_enc.columns:
        raw_enc[self.service_line_col] = (
            raw_enc[self.service_line_col]
            .astype(str)
            .apply(_normalise_service_line)
        )

    # Step 4: Groupby merge_key
    grouped = raw_enc.groupby(self.merge_key)

    summary_parts: dict[str, pd.Series] = {}

    if self.service_line_col in raw_enc.columns:
        # Mode (most frequent) service line per donor
        summary_parts["primary_service_line"] = grouped[
            self.service_line_col
        ].agg(lambda x: x.mode().iloc[0] if len(x) > 0 else "general")
        summary_parts["distinct_service_lines"] = grouped[
            self.service_line_col
        ].nunique()
    else:
        summary_parts["primary_service_line"] = pd.Series(
            "general", index=grouped.groups.keys()
        )
        summary_parts["distinct_service_lines"] = pd.Series(
            0, dtype=int, index=grouped.groups.keys()
        )

    if self.physician_col in raw_enc.columns:
        summary_parts["distinct_physicians"] = grouped[
            self.physician_col
        ].nunique()
    else:
        summary_parts["distinct_physicians"] = pd.Series(
            0, dtype=int, index=grouped.groups.keys()
        )

    summary_parts["total_encounters"] = grouped[self.discharge_col].count()
    summary_parts["last_discharge"] = grouped[self.discharge_col].max()

    # Step 5: DRG weight column
    if (
        self.drg_weight_col is not None
        and self.drg_weight_col in raw_enc.columns
    ):
        summary_parts["total_drg_weight"] = grouped[
            self.drg_weight_col
        ].sum()
    else:
        # Sentinel: will become NaN for all donors
        keys = list(grouped.groups.keys())
        summary_parts["total_drg_weight"] = pd.Series(
            np.nan, index=keys, dtype=float
        )

    encounter_summary = pd.DataFrame(summary_parts)

    # Step 6: Clinical gravity score
    if self.use_capacity_weights:
        encounter_summary["clinical_gravity_score"] = (
            encounter_summary["total_encounters"].astype(float)
            * encounter_summary["primary_service_line"].map(
                lambda s: _SERVICE_LINE_CAPACITY_WEIGHTS.get(s, 1.0)
            )
        )
    else:
        encounter_summary["clinical_gravity_score"] = (
            encounter_summary["total_encounters"].astype(float)
        )

    # Step 7: Store fitted attribute
    self.encounter_summary_ = encounter_summary

    # Step 8: Register feature schema
    validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=True)

    return self

transform(X, y=None)

Merge clinical features into the donor feature matrix.

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)

Donor-level feature matrix. Must contain merge_key column (or that column as the first column for ndarray input).

required

Returns:

Name Type Description
X_out np.ndarray of shape (n_samples, 4), dtype float64

Columns in order: clinical_gravity_score, distinct_service_lines, distinct_physicians, total_drg_weight. Donors absent from encounter table get 0.0 for all columns.

Source code in philanthropy/preprocessing/_grateful_patient.py
def transform(self, X, y=None) -> np.ndarray:
    """Merge clinical features into the donor feature matrix.

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)
        Donor-level feature matrix. Must contain ``merge_key`` column
        (or that column as the first column for ndarray input).

    Returns
    -------
    X_out : np.ndarray of shape (n_samples, 4), dtype float64
        Columns in order:
        ``clinical_gravity_score``, ``distinct_service_lines``,
        ``distinct_physicians``, ``total_drg_weight``.
        Donors absent from encounter table get 0.0 for all columns.
    """
    validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=False)
    check_is_fitted(self)

    _FEATURE_COLS = [
        "clinical_gravity_score",
        "distinct_service_lines",
        "distinct_physicians",
        "total_drg_weight",
    ]

    # Build a DataFrame to merge on merge_key
    if isinstance(X, pd.DataFrame) and self.merge_key in X.columns:
        X_df = X[[self.merge_key]].copy()
    elif isinstance(X, pd.DataFrame) and hasattr(self, "feature_names_in_"):
        # merge_key must be in the feature names
        if self.merge_key in self.feature_names_in_:
            X_df = X[[self.merge_key]].copy()
        else:
            # No merge key available — return zeros
            n = len(X)
            return np.zeros((n, 4), dtype=np.float64)
    elif hasattr(self, "feature_names_in_") and self.merge_key in list(
        self.feature_names_in_
    ):
        arr = np.asarray(X)
        col_idx = list(self.feature_names_in_).index(self.merge_key)
        X_df = pd.DataFrame(
            {self.merge_key: arr[:, col_idx]}
        )
    else:
        # No merge key — cannot join, return zeros
        n = np.asarray(X).shape[0]
        return np.zeros((n, 4), dtype=np.float64)

    # Left-merge with encounter_summary_
    merged = X_df.merge(
        self.encounter_summary_[_FEATURE_COLS],
        left_on=self.merge_key,
        right_index=True,
        how="left",
    )

    # Step 5: fillna(0.0) — unknown donors get zeros
    result = merged[_FEATURE_COLS].fillna(0.0)

    return result.to_numpy(dtype=np.float64)

DischargeToSolicitationWindowTransformer

Bases: TransformerMixin, BaseEstimator

Flag donors in the clinical fundraising post-discharge solicitation window.

This transformer outputs two features: - in_solicitation_window (col 0): 1 if within window, 0 otherwise. - window_position_score (col 1): proximity to midpoint [0.0, 1.0].

Parameters:

Name Type Description Default
min_days_post_discharge int

Start of the solicitation window, in days post-discharge (inclusive).

90
max_days_post_discharge int

End of the solicitation window, in days post-discharge (inclusive).

365
days_since_discharge_col str

Column name containing days since last discharge.

"days_since_last_discharge"
Source code in philanthropy/preprocessing/_discharge_window.py
class DischargeToSolicitationWindowTransformer(TransformerMixin, BaseEstimator):
    """Flag donors in the clinical fundraising post-discharge solicitation window.

    This transformer outputs two features:
    - ``in_solicitation_window`` (col 0): 1 if within window, 0 otherwise.
    - ``window_position_score`` (col 1): proximity to midpoint [0.0, 1.0].

    Parameters
    ----------
    min_days_post_discharge : int, default=90
        Start of the solicitation window, in days post-discharge (inclusive).
    max_days_post_discharge : int, default=365
        End of the solicitation window, in days post-discharge (inclusive).
    days_since_discharge_col : str, default="days_since_last_discharge"
        Column name containing days since last discharge.
    """

    def __init__(
        self,
        min_days_post_discharge: int = 90,
        max_days_post_discharge: int = 365,
        days_since_discharge_col: str = "days_since_last_discharge",
    ) -> None:
        self.min_days_post_discharge = min_days_post_discharge
        self.max_days_post_discharge = max_days_post_discharge
        self.days_since_discharge_col = days_since_discharge_col

    def fit(self, X, y=None) -> "DischargeToSolicitationWindowTransformer":
        """Fit the transformer (no-op, validates parameters).

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Training data.
        y : Ignored
            Not used, present for API consistency.

        Returns
        -------
        self : DischargeToSolicitationWindowTransformer
        """
        if self.min_days_post_discharge >= self.max_days_post_discharge:
            raise ValueError(
                f"min_days_post_discharge ({self.min_days_post_discharge}) must be "
                f"strictly less than max_days_post_discharge ({self.max_days_post_discharge})."
            )
        validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=True)
        return self

    def transform(self, X, y=None) -> np.ndarray:
        """Transform X to two columns: in_window, window_position_score.

        Parameters
        ----------
        X : array-like or DataFrame of shape (n_samples, n_features)
            Data with days_since_discharge column or first column as days.

        Returns
        -------
        out : ndarray of shape (n_samples, 2)
            Columns: in_window (0/1), window_position_score [0,1].
        """
        check_is_fitted(self)

        if isinstance(X, pd.DataFrame) and self.days_since_discharge_col in X.columns:
            days_raw = X[self.days_since_discharge_col].to_numpy(dtype=float)
        elif isinstance(X, pd.DataFrame):
            days_raw = X.iloc[:, 0].to_numpy(dtype=float)
        else:
            arr = np.asarray(X, dtype=float)
            if arr.ndim == 1:
                days_raw = arr
            else:
                days_raw = arr[:, 0]

        validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=False)

        min_d = float(self.min_days_post_discharge)
        max_d = float(self.max_days_post_discharge)
        midpoint = (min_d + max_d) / 2.0
        half_range = (max_d - min_d) / 2.0

        n = len(days_raw)
        in_window = np.zeros(n, dtype=np.float64)
        window_score = np.zeros(n, dtype=np.float64)

        for i in range(n):
            d = days_raw[i]
            if np.isnan(d):
                continue

            if min_d <= d <= max_d:
                in_window[i] = 1.0
                window_score[i] = 1.0 - abs(d - midpoint) / half_range

        return np.column_stack([in_window, window_score])

    def get_feature_names_out(self, input_features=None):
        """Get output feature names."""
        check_is_fitted(self)
        return np.array(
            ["in_solicitation_window", "window_position_score"],
            dtype=object,
        )

    def __sklearn_tags__(self):
        tags = super().__sklearn_tags__()
        tags.input_tags.allow_nan = True
        tags.input_tags.string = True
        return tags

fit(X, y=None)

Fit the transformer (no-op, validates parameters).

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)

Training data.

required
y Ignored

Not used, present for API consistency.

None

Returns:

Name Type Description
self DischargeToSolicitationWindowTransformer
Source code in philanthropy/preprocessing/_discharge_window.py
def fit(self, X, y=None) -> "DischargeToSolicitationWindowTransformer":
    """Fit the transformer (no-op, validates parameters).

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)
        Training data.
    y : Ignored
        Not used, present for API consistency.

    Returns
    -------
    self : DischargeToSolicitationWindowTransformer
    """
    if self.min_days_post_discharge >= self.max_days_post_discharge:
        raise ValueError(
            f"min_days_post_discharge ({self.min_days_post_discharge}) must be "
            f"strictly less than max_days_post_discharge ({self.max_days_post_discharge})."
        )
    validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=True)
    return self

transform(X, y=None)

Transform X to two columns: in_window, window_position_score.

Parameters:

Name Type Description Default
X array-like or DataFrame of shape (n_samples, n_features)

Data with days_since_discharge column or first column as days.

required

Returns:

Name Type Description
out ndarray of shape (n_samples, 2)

Columns: in_window (0/1), window_position_score [0,1].

Source code in philanthropy/preprocessing/_discharge_window.py
def transform(self, X, y=None) -> np.ndarray:
    """Transform X to two columns: in_window, window_position_score.

    Parameters
    ----------
    X : array-like or DataFrame of shape (n_samples, n_features)
        Data with days_since_discharge column or first column as days.

    Returns
    -------
    out : ndarray of shape (n_samples, 2)
        Columns: in_window (0/1), window_position_score [0,1].
    """
    check_is_fitted(self)

    if isinstance(X, pd.DataFrame) and self.days_since_discharge_col in X.columns:
        days_raw = X[self.days_since_discharge_col].to_numpy(dtype=float)
    elif isinstance(X, pd.DataFrame):
        days_raw = X.iloc[:, 0].to_numpy(dtype=float)
    else:
        arr = np.asarray(X, dtype=float)
        if arr.ndim == 1:
            days_raw = arr
        else:
            days_raw = arr[:, 0]

    validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=False)

    min_d = float(self.min_days_post_discharge)
    max_d = float(self.max_days_post_discharge)
    midpoint = (min_d + max_d) / 2.0
    half_range = (max_d - min_d) / 2.0

    n = len(days_raw)
    in_window = np.zeros(n, dtype=np.float64)
    window_score = np.zeros(n, dtype=np.float64)

    for i in range(n):
        d = days_raw[i]
        if np.isnan(d):
            continue

        if min_d <= d <= max_d:
            in_window[i] = 1.0
            window_score[i] = 1.0 - abs(d - midpoint) / half_range

    return np.column_stack([in_window, window_score])

get_feature_names_out(input_features=None)

Get output feature names.

Source code in philanthropy/preprocessing/_discharge_window.py
def get_feature_names_out(self, input_features=None):
    """Get output feature names."""
    check_is_fitted(self)
    return np.array(
        ["in_solicitation_window", "window_position_score"],
        dtype=object,
    )

WealthPercentileTransformer

Bases: TransformerMixin, BaseEstimator

Computes wealth percentile ranks.

Source code in philanthropy/preprocessing/_wealth_percentile.py
class WealthPercentileTransformer(TransformerMixin, BaseEstimator):
    """
    Computes wealth percentile ranks.
    """

    def __init__(
        self,
        wealth_cols: list[str] | None = None,
        output_suffix: str = "_pct_rank"
    ):
        self.wealth_cols = wealth_cols
        self.output_suffix = output_suffix

    def _resolve_cols(self, X: pd.DataFrame) -> list[str]:
        if self.wealth_cols is not None:
            return [c for c in self.wealth_cols if c in X.columns]

        targets = ("net_worth", "real_estate", "stock", "capacity")
        return [c for c in X.columns if any(t in c for t in targets)]

    def fit(self, X, y=None):
        X = validate_data(self, X, ensure_all_finite="allow-nan", reset=True)

        if hasattr(X, "columns"):
             self.feature_names_in_ = np.array(X.columns.tolist(), dtype=object)
        elif not hasattr(self, "feature_names_in_"):
             self.feature_names_in_ = np.array([f"x{i}" for i in range(X.shape[1])], dtype=object)

        # Use feature_names_in_ to resolve columns
        if self.wealth_cols is not None:
            self.imputed_cols_ = [c for c in self.wealth_cols if c in self.feature_names_in_]
        else:
            targets = ("net_worth", "real_estate", "stock", "capacity")
            self.imputed_cols_ = [c for c in self.feature_names_in_ if any(t in str(c) for t in targets)]

        self.percentile_lookup_ = {}
        for col in self.imputed_cols_:
            # Find index of column
            col_idx = list(self.feature_names_in_).index(col)
            # Use X as numpy array
            s = pd.to_numeric(pd.Series(X[:, col_idx]), errors="coerce")
            valid_vals = s.dropna().to_numpy()
            self.percentile_lookup_[col] = np.sort(valid_vals)

        if len(self.imputed_cols_) == 0 and self.wealth_cols is not None:
             # If we expected columns but found none, we should probably warn or raise
             pass

        return self

    def transform(self, X):
        check_is_fitted(self, "percentile_lookup_")
        X = validate_data(self, X, ensure_all_finite="allow-nan", reset=False)
        X_out = pd.DataFrame(X, columns=self.feature_names_in_)

        for col in self.imputed_cols_:
            if col in X_out.columns:
                ref = self.percentile_lookup_[col]
                s = pd.to_numeric(X_out[col], errors="coerce").to_numpy(dtype=float)
                out_col = f"{col}{self.output_suffix}"

                if len(ref) == 0:
                    X_out[out_col] = np.nan
                    continue

                ranks = np.searchsorted(ref, s, side="right") / float(len(ref)) * 100.0
                ranks = np.where(np.isnan(s), np.nan, ranks)
                X_out[out_col] = ranks

        # Rule 5: transform() MUST return np.ndarray (float64)
        X_final = X_out.select_dtypes(include=[np.number])
        return X_final.to_numpy(dtype=np.float64)

    def get_feature_names_out(self, input_features=None):
        check_is_fitted(self)
        out = list(self.feature_names_in_)
        for col in self.imputed_cols_:
            if col in self.feature_names_in_:
                out.append(f"{col}{self.output_suffix}")
        return np.array(out, dtype=object)

    def _more_tags(self):
        return {"X_types": ["2darray", "dataframe"]}

    def __sklearn_tags__(self):
        tags = super().__sklearn_tags__()
        tags.input_tags.allow_nan = True
        return tags

EncounterRecencyTransformer

Bases: TransformerMixin, BaseEstimator

Transform HIPAA-safe encounter-date columns into predictive recency features.

Given one or more date-only columns (no PHI — dates only), this transformer produces three downstream-model-ready features per date column:

days_since_last_encounter Integer days between reference_date and the encounter date. NaN for missing/unparseable dates. Always non-negative when reference_date >= encounter_date; negative values indicate future dates (rare in production) and are left as-is to allow models to detect data-quality anomalies.

encounter_in_last_90d Float64 0.0 / 1.0 flag — 1.0 if days_since_last_encounter <= 90. Missing dates → 0.0.

fiscal_year_of_encounter Integer fiscal year in which the encounter ends (e.g., a July-start FY convention assigns a June-30 encounter to the current year, while a July-1 encounter starts the next FY). Missing → np.nan (returned as float64).

Parameters:

Name Type Description Default
date_col str or list of str

Column name(s) in X containing ISO-8601 encounter/discharge dates. If a list is provided, one set of three output features is produced per column (columns are prefixed by <col>__).

"last_encounter_date"
fiscal_year_start int

Month (1–12) on which the organisation's fiscal year begins. 7 = July fiscal-year start (common in US academic medical centres and universities).

7
reference_date str, datetime-like, or None

The anchor date used to compute days_since_last_encounter. If None, it is determined at :meth:fit time as the maximum observed date in the training data (i.e., the most recent clinical encounter in the training fold). Setting an explicit reference date is recommended for production scoring runs to ensure consistency between training and inference time.

None
timezone str or None

Optional timezone name (e.g., "America/Chicago"). When provided, timezone-naive datetimes in X are localised to this timezone before difference computation, preventing offset errors for hospitals that cross daylight-saving boundaries. If None, all dates are kept timezone-naive (recommended for HIPAA-safe de-identified datasets where the exact timezone is unknown).

None

Attributes:

Name Type Description
reference_date_ Timestamp

The reference date frozen at :meth:fit time.

n_features_in_ int

Number of columns in X at :meth:fit time (set by :func:~sklearn.utils.validation.validate_data).

feature_names_in_ ndarray of str

Column names of X at :meth:fit time.

Raises:

Type Description
ValueError

If fiscal_year_start is not an integer in [1, 12].

TypeError

If the resolved date_col columns cannot be coerced to datetime64.

Examples:

>>> import pandas as pd
>>> from philanthropy.preprocessing import EncounterRecencyTransformer
>>> X = pd.DataFrame({
...     "last_encounter_date": ["2023-06-01", "2022-12-15", None],
... })
>>> t = EncounterRecencyTransformer(fiscal_year_start=7, reference_date="2023-09-01")
>>> t.set_output(transform="pandas")
EncounterRecencyTransformer(...)
>>> out = t.fit_transform(X)
>>> out.shape
(3, 3)
>>> int(out.iloc[0, 0])  # days since 2023-06-01 from 2023-09-01 = 92
92
>>> bool((out.iloc[:, 1] >= 0).all())
True
Notes

HIPAA note: This transformer accepts only date columns. Ensure that no PHI fields (MRN, patient name, diagnosis code) are included in X.

Fiscal year convention: With fiscal_year_start=7, the fiscal year is identified by the calendar year in which it ends. A date of 2023-07-01 belongs to FY 2024; a date of 2023-06-30 belongs to FY 2023. This matches the convention used by most US research universities and many hospital foundations.

Source code in philanthropy/preprocessing/_encounter_recency.py
class EncounterRecencyTransformer(TransformerMixin, BaseEstimator):
    """Transform HIPAA-safe encounter-date columns into predictive recency features.

    Given one or more date-only columns (no PHI — dates only), this
    transformer produces three downstream-model-ready features per date
    column:

    ``days_since_last_encounter``
        Integer days between ``reference_date`` and the encounter date.
        ``NaN`` for missing/unparseable dates.  Always non-negative when
        ``reference_date >= encounter_date``; negative values indicate
        future dates (rare in production) and are left as-is to allow models
        to detect data-quality anomalies.

    ``encounter_in_last_90d``
        Float64 0.0 / 1.0 flag — 1.0 if ``days_since_last_encounter <= 90``.
        Missing dates → 0.0.

    ``fiscal_year_of_encounter``
        Integer fiscal year in which the encounter ends (e.g., a July-start
        FY convention assigns a June-30 encounter to the current year, while
        a July-1 encounter starts the *next* FY).  Missing → ``np.nan``
        (returned as ``float64``).

    Parameters
    ----------
    date_col : str or list of str, default="last_encounter_date"
        Column name(s) in ``X`` containing ISO-8601 encounter/discharge
        dates.  If a list is provided, one set of three output features is
        produced per column (columns are prefixed by ``<col>__``).
    fiscal_year_start : int, default=7
        Month (1–12) on which the organisation's fiscal year begins.
        ``7`` = July fiscal-year start (common in US academic medical
        centres and universities).
    reference_date : str, datetime-like, or None, default=None
        The anchor date used to compute ``days_since_last_encounter``.
        If ``None``, it is determined at :meth:`fit` time as the **maximum
        observed date** in the training data (i.e., the most recent
        clinical encounter in the training fold).  Setting an explicit
        reference date is recommended for production scoring runs to ensure
        consistency between training and inference time.
    timezone : str or None, default=None
        Optional timezone name (e.g., ``"America/Chicago"``).  When
        provided, timezone-naive datetimes in ``X`` are localised to this
        timezone before difference computation, preventing offset errors for
        hospitals that cross daylight-saving boundaries.  If ``None``,
        all dates are kept timezone-naive (recommended for HIPAA-safe
        de-identified datasets where the exact timezone is unknown).

    Attributes
    ----------
    reference_date_ : pd.Timestamp
        The reference date frozen at :meth:`fit` time.
    n_features_in_ : int
        Number of columns in ``X`` at :meth:`fit` time (set by
        :func:`~sklearn.utils.validation.validate_data`).
    feature_names_in_ : ndarray of str
        Column names of ``X`` at :meth:`fit` time.

    Raises
    ------
    ValueError
        If ``fiscal_year_start`` is not an integer in [1, 12].
    TypeError
        If the resolved ``date_col`` columns cannot be coerced to
        ``datetime64``.

    Examples
    --------
    >>> import pandas as pd
    >>> from philanthropy.preprocessing import EncounterRecencyTransformer
    >>> X = pd.DataFrame({
    ...     "last_encounter_date": ["2023-06-01", "2022-12-15", None],
    ... })
    >>> t = EncounterRecencyTransformer(fiscal_year_start=7, reference_date="2023-09-01")
    >>> t.set_output(transform="pandas")  # doctest: +ELLIPSIS
    EncounterRecencyTransformer(...)
    >>> out = t.fit_transform(X)
    >>> out.shape
    (3, 3)
    >>> int(out.iloc[0, 0])  # days since 2023-06-01 from 2023-09-01 = 92
    92
    >>> bool((out.iloc[:, 1] >= 0).all())
    True

    Notes
    -----
    **HIPAA note:** This transformer accepts only date columns.  Ensure that
    no PHI fields (MRN, patient name, diagnosis code) are included in ``X``.

    **Fiscal year convention:** With ``fiscal_year_start=7``, the fiscal year
    is identified by the calendar year in which it *ends*.  A date of
    2023-07-01 belongs to FY **2024**; a date of 2023-06-30 belongs to FY
    **2023**.  This matches the convention used by most US research universities
    and many hospital foundations.
    """

    def __init__(
        self,
        date_col: str | list[str] = "last_encounter_date",
        fiscal_year_start: int = 7,
        reference_date=None,
        timezone: Optional[str] = None,
    ) -> None:
        # scikit-learn rule: __init__ ONLY assigns; no validation, no side-effects.
        self.date_col = date_col
        self.fiscal_year_start = fiscal_year_start
        self.reference_date = reference_date
        self.timezone = timezone

    # ------------------------------------------------------------------
    # Private helpers
    # ------------------------------------------------------------------

    def _validate_fiscal_year_start(self) -> None:
        """Raise ValueError if fiscal_year_start is out of range."""
        if not isinstance(self.fiscal_year_start, (int, np.integer)) or not (
            1 <= int(self.fiscal_year_start) <= 12
        ):
            raise ValueError(
                f"`fiscal_year_start` must be an integer in [1, 12], "
                f"got {self.fiscal_year_start!r}."
            )

    def _resolve_date_cols(self) -> list[str]:
        """Return the date column(s) as a list of strings."""
        if isinstance(self.date_col, str):
            return [self.date_col]
        return list(self.date_col)

    def _parse_dates(self, series: pd.Series) -> pd.Series:
        """Parse a date series to datetime64[ns], optionally localising timezone."""
        parsed = pd.to_datetime(series, errors="coerce", utc=(self.timezone is not None))
        if self.timezone is not None:
            # Convert to the target timezone; if already tz-aware, convert.
            try:
                parsed = parsed.dt.tz_convert(self.timezone)
            except Exception:
                parsed = parsed.dt.tz_localize(self.timezone)
        return parsed

    def _fiscal_year(self, dt: pd.Timestamp) -> int:
        """Return the fiscal year for a single Timestamp."""
        fys = int(self.fiscal_year_start)
        if dt.month >= fys:
            # Encounter is in the opening half of fiscal year → FY ends next calendar year
            return dt.year + 1
        return dt.year

    def _compute_recency_features(
        self, dates: pd.Series, prefix: str
    ) -> pd.DataFrame:
        """Compute (days_since, in_last_90d, fiscal_year) for a date series."""
        ref = self.reference_date_

        # days_since_last_encounter
        # Timezone strip for subtraction when tz-naive reference vs tz-aware series
        if dates.dt.tz is not None and ref.tzinfo is None:
            ref_ts = pd.Timestamp(ref).tz_localize(self.timezone or "UTC")
        elif dates.dt.tz is None and ref.tzinfo is not None:
            dates = dates.dt.tz_localize("UTC")
            ref_ts = ref
        else:
            ref_ts = ref

        delta_days = (ref_ts - dates).dt.days.astype("float64")

        # encounter_in_last_90d: 1.0 if <=90 days ago and non-NaN
        in_90d = np.where(dates.isna(), 0.0, (delta_days <= 90.0).astype(np.float64))

        # fiscal_year_of_encounter: float64 (NaN for missing)
        fy = dates.apply(
            lambda d: np.nan if pd.isna(d) else float(self._fiscal_year(d))
        ).astype("float64")

        cols = {}
        p = f"{prefix}__" if prefix else ""
        cols[f"{p}days_since_last_encounter"] = delta_days.values
        cols[f"{p}encounter_in_last_90d"] = in_90d
        cols[f"{p}fiscal_year_of_encounter"] = fy.values

        return pd.DataFrame(cols)

    # ------------------------------------------------------------------
    # fit / transform
    # ------------------------------------------------------------------

    def fit(self, X, y=None) -> "EncounterRecencyTransformer":
        """Validate parameters and freeze the reference date from training data.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Input data.  Must contain the column(s) specified in ``date_col``
            when passed as a pd.DataFrame.
        y : ignored

        Returns
        -------
        self : EncounterRecencyTransformer
        """
        self._validate_fiscal_year_start()

        # Register the input schema via validate_data; allow NaN and string types.
        validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=True)

        # Freeze the reference date:
        if self.reference_date is not None:
            self.reference_date_ = pd.Timestamp(self.reference_date)
        else:
            # Infer from training data: max observed date across all date columns.
            if isinstance(X, pd.DataFrame):
                cols = self._resolve_date_cols()
                max_dates = []
                for col in cols:
                    if col in X.columns:
                        parsed = self._parse_dates(X[col])
                        mx = parsed.max()
                        if not pd.isna(mx):
                            max_dates.append(mx)
                if max_dates:
                    self.reference_date_ = max(max_dates)
                else:
                    warnings.warn(
                        "EncounterRecencyTransformer: no parseable dates found in "
                        "training data; defaulting reference_date_ to today.",
                        UserWarning,
                    )
                    self.reference_date_ = pd.Timestamp.today().normalize()
            else:
                # Cannot infer from ndarray without column names; default to today.
                warnings.warn(
                    "EncounterRecencyTransformer: X is not a DataFrame — "
                    "defaulting reference_date_ to today. Provide reference_date "
                    "explicitly for reproducibility.",
                    UserWarning,
                )
                self.reference_date_ = pd.Timestamp.today().normalize()

        return self

    def transform(self, X, y=None) -> np.ndarray:
        """Compute encounter recency features.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
            Input data containing the date column(s).

        Returns
        -------
        X_out : np.ndarray of shape (n_samples, 3 * n_date_cols), dtype float64
            Feature columns, in order, for each ``date_col``:

            * ``[<col>__]days_since_last_encounter``
            * ``[<col>__]encounter_in_last_90d``
            * ``[<col>__]fiscal_year_of_encounter``

        Raises
        ------
        sklearn.exceptions.NotFittedError
            If :meth:`fit` has not been called yet.
        """
        check_is_fitted(self, ["reference_date_"])
        validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=False)

        # Build a working DataFrame from X
        if isinstance(X, pd.DataFrame):
            df = X
        elif hasattr(self, "feature_names_in_"):
            df = pd.DataFrame(
                np.asarray(X, dtype=object), columns=self.feature_names_in_
            )
        else:
            # Fallback: cannot resolve column names — produce NaN output.
            n = np.asarray(X).shape[0]
            cols = self._resolve_date_cols()
            n_out = len(cols) * 3
            return np.full((n, n_out), np.nan, dtype=np.float64)

        cols = self._resolve_date_cols()
        parts: list[pd.DataFrame] = []
        prefix_needed = len(cols) > 1

        for col in cols:
            prefix = col if prefix_needed else ""
            if col in df.columns:
                parsed = self._parse_dates(df[col])
            else:
                warnings.warn(
                    f"EncounterRecencyTransformer: date column {col!r} not found "
                    f"in X; filling recency features with NaN.",
                    UserWarning,
                )
                n = len(df)
                p = f"{prefix}__" if prefix else ""
                parsed = pd.Series(pd.NaT, index=df.index)
                parsed = self._parse_dates(pd.Series([None] * n))

            parts.append(self._compute_recency_features(parsed, prefix=prefix))

        out_df = pd.concat(parts, axis=1) if parts else pd.DataFrame()
        return out_df.to_numpy(dtype=np.float64)

    def get_feature_names_out(self, input_features=None) -> np.ndarray:
        """Return output feature names.

        Returns
        -------
        feature_names : ndarray of str
        """
        check_is_fitted(self, ["reference_date_"])
        cols = self._resolve_date_cols()
        prefix_needed = len(cols) > 1
        names: list[str] = []
        for col in cols:
            p = f"{col}__" if prefix_needed else ""
            names += [
                f"{p}days_since_last_encounter",
                f"{p}encounter_in_last_90d",
                f"{p}fiscal_year_of_encounter",
            ]
        return np.array(names, dtype=object)

    def __sklearn_tags__(self):
        tags = super().__sklearn_tags__()
        tags.input_tags.allow_nan = True
        tags.input_tags.string = True  # Date columns are string-like on entry
        return tags

fit(X, y=None)

Validate parameters and freeze the reference date from training data.

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)

Input data. Must contain the column(s) specified in date_col when passed as a pd.DataFrame.

required
y ignored
None

Returns:

Name Type Description
self EncounterRecencyTransformer
Source code in philanthropy/preprocessing/_encounter_recency.py
def fit(self, X, y=None) -> "EncounterRecencyTransformer":
    """Validate parameters and freeze the reference date from training data.

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)
        Input data.  Must contain the column(s) specified in ``date_col``
        when passed as a pd.DataFrame.
    y : ignored

    Returns
    -------
    self : EncounterRecencyTransformer
    """
    self._validate_fiscal_year_start()

    # Register the input schema via validate_data; allow NaN and string types.
    validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=True)

    # Freeze the reference date:
    if self.reference_date is not None:
        self.reference_date_ = pd.Timestamp(self.reference_date)
    else:
        # Infer from training data: max observed date across all date columns.
        if isinstance(X, pd.DataFrame):
            cols = self._resolve_date_cols()
            max_dates = []
            for col in cols:
                if col in X.columns:
                    parsed = self._parse_dates(X[col])
                    mx = parsed.max()
                    if not pd.isna(mx):
                        max_dates.append(mx)
            if max_dates:
                self.reference_date_ = max(max_dates)
            else:
                warnings.warn(
                    "EncounterRecencyTransformer: no parseable dates found in "
                    "training data; defaulting reference_date_ to today.",
                    UserWarning,
                )
                self.reference_date_ = pd.Timestamp.today().normalize()
        else:
            # Cannot infer from ndarray without column names; default to today.
            warnings.warn(
                "EncounterRecencyTransformer: X is not a DataFrame — "
                "defaulting reference_date_ to today. Provide reference_date "
                "explicitly for reproducibility.",
                UserWarning,
            )
            self.reference_date_ = pd.Timestamp.today().normalize()

    return self

transform(X, y=None)

Compute encounter recency features.

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)

Input data containing the date column(s).

required

Returns:

Name Type Description
X_out np.ndarray of shape (n_samples, 3 * n_date_cols), dtype float64

Feature columns, in order, for each date_col:

  • [<col>__]days_since_last_encounter
  • [<col>__]encounter_in_last_90d
  • [<col>__]fiscal_year_of_encounter

Raises:

Type Description
NotFittedError

If :meth:fit has not been called yet.

Source code in philanthropy/preprocessing/_encounter_recency.py
def transform(self, X, y=None) -> np.ndarray:
    """Compute encounter recency features.

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)
        Input data containing the date column(s).

    Returns
    -------
    X_out : np.ndarray of shape (n_samples, 3 * n_date_cols), dtype float64
        Feature columns, in order, for each ``date_col``:

        * ``[<col>__]days_since_last_encounter``
        * ``[<col>__]encounter_in_last_90d``
        * ``[<col>__]fiscal_year_of_encounter``

    Raises
    ------
    sklearn.exceptions.NotFittedError
        If :meth:`fit` has not been called yet.
    """
    check_is_fitted(self, ["reference_date_"])
    validate_data(self, X, dtype=None, ensure_all_finite="allow-nan", reset=False)

    # Build a working DataFrame from X
    if isinstance(X, pd.DataFrame):
        df = X
    elif hasattr(self, "feature_names_in_"):
        df = pd.DataFrame(
            np.asarray(X, dtype=object), columns=self.feature_names_in_
        )
    else:
        # Fallback: cannot resolve column names — produce NaN output.
        n = np.asarray(X).shape[0]
        cols = self._resolve_date_cols()
        n_out = len(cols) * 3
        return np.full((n, n_out), np.nan, dtype=np.float64)

    cols = self._resolve_date_cols()
    parts: list[pd.DataFrame] = []
    prefix_needed = len(cols) > 1

    for col in cols:
        prefix = col if prefix_needed else ""
        if col in df.columns:
            parsed = self._parse_dates(df[col])
        else:
            warnings.warn(
                f"EncounterRecencyTransformer: date column {col!r} not found "
                f"in X; filling recency features with NaN.",
                UserWarning,
            )
            n = len(df)
            p = f"{prefix}__" if prefix else ""
            parsed = pd.Series(pd.NaT, index=df.index)
            parsed = self._parse_dates(pd.Series([None] * n))

        parts.append(self._compute_recency_features(parsed, prefix=prefix))

    out_df = pd.concat(parts, axis=1) if parts else pd.DataFrame()
    return out_df.to_numpy(dtype=np.float64)

get_feature_names_out(input_features=None)

Return output feature names.

Returns:

Name Type Description
feature_names ndarray of str
Source code in philanthropy/preprocessing/_encounter_recency.py
def get_feature_names_out(self, input_features=None) -> np.ndarray:
    """Return output feature names.

    Returns
    -------
    feature_names : ndarray of str
    """
    check_is_fitted(self, ["reference_date_"])
    cols = self._resolve_date_cols()
    prefix_needed = len(cols) > 1
    names: list[str] = []
    for col in cols:
        p = f"{col}__" if prefix_needed else ""
        names += [
            f"{p}days_since_last_encounter",
            f"{p}encounter_in_last_90d",
            f"{p}fiscal_year_of_encounter",
        ]
    return np.array(names, dtype=object)

WealthScreeningImputerKNN

Bases: TransformerMixin, BaseEstimator

Leakage-safe KNN imputation for wealth-screening vendor columns.

Extends the median/mean/zero strategy of :class:~philanthropy.preprocessing.WealthScreeningImputer with a "knn" strategy using :class:sklearn.impute.KNNImputer. KNN imputation is recommended when wealth columns cluster meaningfully (e.g., by zip-code based real-estate quartile), which is common in curated hospital prospect pools where WealthEngine / DonorSearch data has geographic structure.

This estimator delegates to sklearn.impute.KNNImputer internally and inherits its Pipeline composability and clone-safety.

Parameters:

Name Type Description Default
wealth_cols list of str or None

Subset of columns to impute. If None, all columns whose names contain substrings from a canonical set (net_worth, real_estate, stock, capacity, charitable) are imputed.

None
strategy ('median', 'mean', 'zero', 'knn')

Imputation strategy. "knn" uses :class:sklearn.impute.KNNImputer with n_neighbors. The other strategies use columnwise statistics identical to :class:~philanthropy.preprocessing.WealthScreeningImputer.

"median"
n_neighbors int

Number of neighbours used when strategy="knn". Ignored for other strategies.

5
add_indicator bool

Append a binary <col>__was_missing column for each imputed wealth column. Strongly recommended — absence of vendor records itself carries predictive signal.

True
group_col_idx int or None

Column index of a group variable (e.g., zip-code encoded as int) to stratify KNN imputation. When provided (and strategy="knn"), imputation is performed independently per group, improving local accuracy.

None

Attributes:

Name Type Description
imputed_cols_ list of str

Wealth columns that were actually present in X at fit time.

fill_values_ dict of {str: float}

Fill statistics (only populated for non-KNN strategies).

knn_imputer_ KNNImputer or None

The fitted :class:~sklearn.impute.KNNImputer instance (only populated for strategy="knn").

n_features_in_ int

Number of columns in X at fit time.

feature_names_in_ ndarray of str

Column names of X at fit time.

Examples:

>>> import numpy as np
>>> from philanthropy.preprocessing._share_of_wallet import WealthScreeningImputerKNN
>>> rng = np.random.default_rng(42)
>>> X = rng.uniform(0, 1e6, (50, 3))
>>> X[rng.random((50, 3)) < 0.3] = np.nan
>>> imp = WealthScreeningImputerKNN(strategy="knn", n_neighbors=3, add_indicator=False)
>>> imp.fit(X)
WealthScreeningImputerKNN(...)
>>> out = imp.transform(X)
>>> bool(np.isnan(out).any())
False
Source code in philanthropy/preprocessing/_share_of_wallet.py
class WealthScreeningImputerKNN(TransformerMixin, BaseEstimator):
    """Leakage-safe KNN imputation for wealth-screening vendor columns.

    Extends the median/mean/zero strategy of
    :class:`~philanthropy.preprocessing.WealthScreeningImputer` with a
    ``"knn"`` strategy using :class:`sklearn.impute.KNNImputer`.  KNN
    imputation is recommended when wealth columns cluster meaningfully
    (e.g., by zip-code based real-estate quartile), which is common in
    curated hospital prospect pools where WealthEngine / DonorSearch
    data has geographic structure.

    This estimator **delegates** to ``sklearn.impute.KNNImputer`` internally
    and inherits its Pipeline composability and clone-safety.

    Parameters
    ----------
    wealth_cols : list of str or None, default=None
        Subset of columns to impute.  If ``None``, all columns whose
        names contain substrings from a canonical set (``net_worth``,
        ``real_estate``, ``stock``, ``capacity``, ``charitable``) are
        imputed.
    strategy : {"median", "mean", "zero", "knn"}, default="knn"
        Imputation strategy.  ``"knn"`` uses
        :class:`sklearn.impute.KNNImputer` with ``n_neighbors``.
        The other strategies use columnwise statistics identical to
        :class:`~philanthropy.preprocessing.WealthScreeningImputer`.
    n_neighbors : int, default=5
        Number of neighbours used when ``strategy="knn"``.  Ignored for
        other strategies.
    add_indicator : bool, default=True
        Append a binary ``<col>__was_missing`` column for each imputed
        wealth column.  Strongly recommended — absence of vendor records
        itself carries predictive signal.
    group_col_idx : int or None, default=None
        Column index of a group variable (e.g., zip-code encoded as int)
        to stratify KNN imputation.  When provided (and
        ``strategy="knn"``), imputation is performed independently per
        group, improving local accuracy.

    Attributes
    ----------
    imputed_cols_ : list of str
        Wealth columns that were actually present in ``X`` at fit time.
    fill_values_ : dict of {str: float}
        Fill statistics (only populated for non-KNN strategies).
    knn_imputer_ : KNNImputer or None
        The fitted :class:`~sklearn.impute.KNNImputer` instance
        (only populated for ``strategy="knn"``).
    n_features_in_ : int
        Number of columns in ``X`` at fit time.
    feature_names_in_ : ndarray of str
        Column names of ``X`` at fit time.

    Examples
    --------
    >>> import numpy as np
    >>> from philanthropy.preprocessing._share_of_wallet import WealthScreeningImputerKNN
    >>> rng = np.random.default_rng(42)
    >>> X = rng.uniform(0, 1e6, (50, 3))
    >>> X[rng.random((50, 3)) < 0.3] = np.nan
    >>> imp = WealthScreeningImputerKNN(strategy="knn", n_neighbors=3, add_indicator=False)
    >>> imp.fit(X)
    WealthScreeningImputerKNN(...)
    >>> out = imp.transform(X)
    >>> bool(np.isnan(out).any())
    False
    """

    _VALID_STRATEGIES = frozenset({"median", "mean", "zero", "knn"})
    _CANONICAL_SUBSTRINGS = ("net_worth", "real_estate", "stock", "capacity", "charitable")

    def __init__(
        self,
        wealth_cols: list[str] | None = None,
        strategy: Literal["median", "mean", "zero", "knn"] = "knn",
        n_neighbors: int = 5,
        add_indicator: bool = True,
        group_col_idx: Optional[int] = None,
    ) -> None:
        self.wealth_cols = wealth_cols
        self.strategy = strategy
        self.n_neighbors = n_neighbors
        self.add_indicator = add_indicator
        self.group_col_idx = group_col_idx

    def _resolve_cols(self, input_cols: list[str]) -> list[str]:
        if self.wealth_cols is not None:
            return [c for c in self.wealth_cols if c in input_cols]
        return [c for c in input_cols
                if any(sub in c.lower() for sub in self._CANONICAL_SUBSTRINGS)]

    def fit(self, X, y=None) -> "WealthScreeningImputerKNN":
        """Learn fill statistics or fit the KNN imputer from training data.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
        y : ignored

        Returns
        -------
        self : WealthScreeningImputerKNN
        """
        if self.strategy not in self._VALID_STRATEGIES:
            raise ValueError(
                f"`strategy` must be one of {sorted(self._VALID_STRATEGIES)}, "
                f"got {self.strategy!r}."
            )
        if self.strategy == "knn" and self.n_neighbors < 1:
            raise ValueError(f"`n_neighbors` must be >= 1, got {self.n_neighbors}.")

        # Capture column names before validate_data converts DF → ndarray
        if hasattr(X, "columns"):
            input_cols = list(X.columns)
        else:
            input_cols = None

        X_arr = validate_data(
            self, X, dtype="numeric", ensure_all_finite="allow-nan", reset=True
        )

        if input_cols is None:
            input_cols = (
                list(self.feature_names_in_)
                if hasattr(self, "feature_names_in_")
                else [f"x{i}" for i in range(X_arr.shape[1])]
            )

        self.imputed_cols_ = self._resolve_cols(input_cols)

        # Warn about columns requested but absent
        if self.wealth_cols is not None:
            for col in self.wealth_cols:
                if col not in input_cols:
                    warnings.warn(
                        f"WealthScreeningImputerKNN: column {col!r} not found in X.",
                        UserWarning,
                    )

        col_indices = {col: input_cols.index(col) for col in self.imputed_cols_}

        if self.strategy == "knn":
            # Fit KNNImputer on ALL columns (preserves inter-column structure)
            self.knn_imputer_: Optional[KNNImputer] = KNNImputer(
                n_neighbors=self.n_neighbors,
                weights="distance",
                keep_empty_features=True,
            )
            self.knn_imputer_.fit(X_arr)
            self.fill_values_: dict[str, float] = {}
        else:
            self.knn_imputer_ = None
            fills: dict[str, float] = {}
            for col in self.imputed_cols_:
                idx = col_indices[col]
                col_data = X_arr[:, idx]
                if self.strategy == "median":
                    val = np.nanmedian(col_data)
                elif self.strategy == "mean":
                    val = np.nanmean(col_data)
                else:  # "zero"
                    val = 0.0
                fills[col] = float(val) if not np.isnan(val) else 0.0
            self.fill_values_ = fills

        self._col_indices_ = col_indices
        return self

    def transform(self, X, y=None) -> np.ndarray:
        """Apply imputation and optionally append missingness indicators.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)

        Returns
        -------
        X_out : np.ndarray
            Imputed array (float64), with indicator columns appended if
            ``add_indicator=True``.

        Raises
        ------
        sklearn.exceptions.NotFittedError
        """
        check_is_fitted(self, ["imputed_cols_"])

        if hasattr(X, "columns"):
            input_cols = list(X.columns)
        else:
            input_cols = None

        X_arr = validate_data(
            self, X, dtype="numeric", ensure_all_finite="allow-nan", reset=False
        )

        if input_cols is None:
            input_cols = (
                list(self.feature_names_in_)
                if hasattr(self, "feature_names_in_")
                else [f"x{i}" for i in range(X_arr.shape[1])]
            )

        # Collect missingness masks BEFORE imputation
        indicators: list[np.ndarray] = []
        if self.add_indicator:
            for col in self.imputed_cols_:
                if col in input_cols:
                    idx = input_cols.index(col)
                    indicators.append(np.isnan(X_arr[:, idx]).astype(np.float64).reshape(-1, 1))

        if self.strategy == "knn" and self.knn_imputer_ is not None:
            X_out = self.knn_imputer_.transform(X_arr)
        else:
            X_out = X_arr.copy()
            for col in self.imputed_cols_:
                if col not in input_cols:
                    continue
                idx = input_cols.index(col)
                mask = np.isnan(X_out[:, idx])
                X_out[mask, idx] = self.fill_values_.get(col, 0.0)

        if indicators:
            return np.hstack([X_out] + indicators)
        return X_out

    def get_feature_names_out(self, input_features=None) -> np.ndarray:
        check_is_fitted(self)
        if input_features is not None:
            base = list(input_features)
        elif hasattr(self, "feature_names_in_"):
            base = list(self.feature_names_in_)
        else:
            base = [f"x{i}" for i in range(self.n_features_in_)]

        out = list(base)
        if self.add_indicator:
            for col in self.imputed_cols_:
                if col in base:
                    out.append(f"{col}__was_missing")
        return np.array(out, dtype=object)

    def __sklearn_tags__(self):
        tags = super().__sklearn_tags__()
        tags.input_tags.allow_nan = True
        return tags

fit(X, y=None)

Learn fill statistics or fit the KNN imputer from training data.

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)
required
y ignored
None

Returns:

Name Type Description
self WealthScreeningImputerKNN
Source code in philanthropy/preprocessing/_share_of_wallet.py
def fit(self, X, y=None) -> "WealthScreeningImputerKNN":
    """Learn fill statistics or fit the KNN imputer from training data.

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)
    y : ignored

    Returns
    -------
    self : WealthScreeningImputerKNN
    """
    if self.strategy not in self._VALID_STRATEGIES:
        raise ValueError(
            f"`strategy` must be one of {sorted(self._VALID_STRATEGIES)}, "
            f"got {self.strategy!r}."
        )
    if self.strategy == "knn" and self.n_neighbors < 1:
        raise ValueError(f"`n_neighbors` must be >= 1, got {self.n_neighbors}.")

    # Capture column names before validate_data converts DF → ndarray
    if hasattr(X, "columns"):
        input_cols = list(X.columns)
    else:
        input_cols = None

    X_arr = validate_data(
        self, X, dtype="numeric", ensure_all_finite="allow-nan", reset=True
    )

    if input_cols is None:
        input_cols = (
            list(self.feature_names_in_)
            if hasattr(self, "feature_names_in_")
            else [f"x{i}" for i in range(X_arr.shape[1])]
        )

    self.imputed_cols_ = self._resolve_cols(input_cols)

    # Warn about columns requested but absent
    if self.wealth_cols is not None:
        for col in self.wealth_cols:
            if col not in input_cols:
                warnings.warn(
                    f"WealthScreeningImputerKNN: column {col!r} not found in X.",
                    UserWarning,
                )

    col_indices = {col: input_cols.index(col) for col in self.imputed_cols_}

    if self.strategy == "knn":
        # Fit KNNImputer on ALL columns (preserves inter-column structure)
        self.knn_imputer_: Optional[KNNImputer] = KNNImputer(
            n_neighbors=self.n_neighbors,
            weights="distance",
            keep_empty_features=True,
        )
        self.knn_imputer_.fit(X_arr)
        self.fill_values_: dict[str, float] = {}
    else:
        self.knn_imputer_ = None
        fills: dict[str, float] = {}
        for col in self.imputed_cols_:
            idx = col_indices[col]
            col_data = X_arr[:, idx]
            if self.strategy == "median":
                val = np.nanmedian(col_data)
            elif self.strategy == "mean":
                val = np.nanmean(col_data)
            else:  # "zero"
                val = 0.0
            fills[col] = float(val) if not np.isnan(val) else 0.0
        self.fill_values_ = fills

    self._col_indices_ = col_indices
    return self

transform(X, y=None)

Apply imputation and optionally append missingness indicators.

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)
required

Returns:

Name Type Description
X_out ndarray

Imputed array (float64), with indicator columns appended if add_indicator=True.

Raises:

Type Description
NotFittedError
Source code in philanthropy/preprocessing/_share_of_wallet.py
def transform(self, X, y=None) -> np.ndarray:
    """Apply imputation and optionally append missingness indicators.

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)

    Returns
    -------
    X_out : np.ndarray
        Imputed array (float64), with indicator columns appended if
        ``add_indicator=True``.

    Raises
    ------
    sklearn.exceptions.NotFittedError
    """
    check_is_fitted(self, ["imputed_cols_"])

    if hasattr(X, "columns"):
        input_cols = list(X.columns)
    else:
        input_cols = None

    X_arr = validate_data(
        self, X, dtype="numeric", ensure_all_finite="allow-nan", reset=False
    )

    if input_cols is None:
        input_cols = (
            list(self.feature_names_in_)
            if hasattr(self, "feature_names_in_")
            else [f"x{i}" for i in range(X_arr.shape[1])]
        )

    # Collect missingness masks BEFORE imputation
    indicators: list[np.ndarray] = []
    if self.add_indicator:
        for col in self.imputed_cols_:
            if col in input_cols:
                idx = input_cols.index(col)
                indicators.append(np.isnan(X_arr[:, idx]).astype(np.float64).reshape(-1, 1))

    if self.strategy == "knn" and self.knn_imputer_ is not None:
        X_out = self.knn_imputer_.transform(X_arr)
    else:
        X_out = X_arr.copy()
        for col in self.imputed_cols_:
            if col not in input_cols:
                continue
            idx = input_cols.index(col)
            mask = np.isnan(X_out[:, idx])
            X_out[mask, idx] = self.fill_values_.get(col, 0.0)

    if indicators:
        return np.hstack([X_out] + indicators)
    return X_out

ShareOfWalletScorer

Bases: TransformerMixin, BaseEstimator

Compute a normalised Share-of-Wallet score and capacity-tier label.

This transformer is designed as the final stage of a major-gift capacity scoring pipeline. It consumes a numeric feature matrix and produces two outputs per row:

sow_score (float64, [0, 1]) Normalised Share-of-Wallet:

    SoW = estimated_capacity / (total_modelled_wealth + epsilon)

where ``total_modelled_wealth`` is the row-wise sum of all columns
specified in ``wealth_col_indices`` (or all columns if not
specified), and ``estimated_capacity`` is the column at
``capacity_col_idx``.

capacity_tier (float64, categorical encoding) A numeric encoding of the human-readable tier label, usable by downstream sklearn estimators (e.g., a classifier trained to predict tier upgrades). The mapping is:

============ ============ =========================================
SoW score    Tier label   Recommended action
============ ============ =========================================
≥ 0.75       Principal    Schedule personal visit with campaign chair.
0.40 – 0.75  Major        Assign major gift officer.
0.00 – 0.40  Leadership   Include in leadership annual giving.
============ ============ =========================================

Parameters:

Name Type Description Default
capacity_col_idx int

Column index (0-based) in X containing the estimated philanthropic capacity (in dollars or any consistent currency unit).

0
wealth_col_indices list of int or None

Column indices to sum as "total modelled wealth". If None, all columns are summed (including capacity_col_idx).

None
epsilon float

Small constant added to the denominator to prevent division by zero when all wealth columns are zero.

1.0
capacity_floor float

Minimum value to enforce on estimated_capacity before scoring (prevents negative capacity from distorting the SoW score).

0.0

Attributes:

Name Type Description
wealth_scale_ float

95th-percentile total modelled wealth observed at fit time, used to clip outlier wealth sums during :meth:transform. This prevents a single ultra-high-net-worth outlier from compressing all other scores near 0.

n_features_in_ int
feature_names_in_ ndarray of str

Examples:

>>> import numpy as np
>>> from philanthropy.preprocessing._share_of_wallet import ShareOfWalletScorer
>>> rng = np.random.default_rng(0)
>>> X = rng.uniform(0, 1e6, (20, 4))
>>> scorer = ShareOfWalletScorer(capacity_col_idx=0, epsilon=1.0)
>>> scorer.fit(X)
ShareOfWalletScorer(...)
>>> out = scorer.transform(X)
>>> out.shape
(20, 2)
>>> bool(((out[:, 0] >= 0) & (out[:, 0] <= 1)).all())
True
Source code in philanthropy/preprocessing/_share_of_wallet.py
class ShareOfWalletScorer(TransformerMixin, BaseEstimator):
    """Compute a normalised Share-of-Wallet score and capacity-tier label.

    This transformer is designed as the **final stage** of a major-gift
    capacity scoring pipeline.  It consumes a numeric feature matrix and
    produces two outputs per row:

    ``sow_score`` (float64, [0, 1])
        Normalised Share-of-Wallet:

            SoW = estimated_capacity / (total_modelled_wealth + epsilon)

        where ``total_modelled_wealth`` is the row-wise sum of all columns
        specified in ``wealth_col_indices`` (or all columns if not
        specified), and ``estimated_capacity`` is the column at
        ``capacity_col_idx``.

    ``capacity_tier`` (float64, categorical encoding)
        A numeric encoding of the human-readable tier label, usable by
        downstream sklearn estimators (e.g., a classifier trained to
        predict tier upgrades).  The mapping is:

        ============ ============ =========================================
        SoW score    Tier label   Recommended action
        ============ ============ =========================================
        ≥ 0.75       Principal    Schedule personal visit with campaign chair.
        0.40 – 0.75  Major        Assign major gift officer.
        0.00 – 0.40  Leadership   Include in leadership annual giving.
        ============ ============ =========================================

    Parameters
    ----------
    capacity_col_idx : int, default=0
        Column index (0-based) in ``X`` containing the estimated
        philanthropic capacity (in dollars or any consistent currency unit).
    wealth_col_indices : list of int or None, default=None
        Column indices to sum as "total modelled wealth".  If ``None``,
        all columns are summed (including ``capacity_col_idx``).
    epsilon : float, default=1.0
        Small constant added to the denominator to prevent division by
        zero when all wealth columns are zero.
    capacity_floor : float, default=0.0
        Minimum value to enforce on ``estimated_capacity`` before scoring
        (prevents negative capacity from distorting the SoW score).

    Attributes
    ----------
    wealth_scale_ : float
        95th-percentile total modelled wealth observed at fit time, used to
        clip outlier wealth sums during :meth:`transform`.  This prevents a
        single ultra-high-net-worth outlier from compressing all other
        scores near 0.
    n_features_in_ : int
    feature_names_in_ : ndarray of str

    Examples
    --------
    >>> import numpy as np
    >>> from philanthropy.preprocessing._share_of_wallet import ShareOfWalletScorer
    >>> rng = np.random.default_rng(0)
    >>> X = rng.uniform(0, 1e6, (20, 4))
    >>> scorer = ShareOfWalletScorer(capacity_col_idx=0, epsilon=1.0)
    >>> scorer.fit(X)
    ShareOfWalletScorer(...)
    >>> out = scorer.transform(X)
    >>> out.shape
    (20, 2)
    >>> bool(((out[:, 0] >= 0) & (out[:, 0] <= 1)).all())
    True
    """

    # Public tier-label mapping for callers who need string labels
    TIER_LABELS = {0: "Leadership", 1: "Major", 2: "Principal"}
    TIER_ENCODING = {"Leadership": 0, "Major": 1, "Principal": 2}

    def __init__(
        self,
        capacity_col_idx: int = 0,
        wealth_col_indices: Optional[list[int]] = None,
        epsilon: float = 1.0,
        capacity_floor: float = 0.0,
    ) -> None:
        self.capacity_col_idx = capacity_col_idx
        self.wealth_col_indices = wealth_col_indices
        self.epsilon = epsilon
        self.capacity_floor = capacity_floor

    def fit(self, X, y=None) -> "ShareOfWalletScorer":
        """Fit the scorer: record wealth scale from training data.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)
        y : ignored

        Returns
        -------
        self : ShareOfWalletScorer
        """
        if self.epsilon < 0:
            raise ValueError(f"`epsilon` must be >= 0, got {self.epsilon}.")
        if not (0 <= self.capacity_col_idx):
            raise ValueError(f"`capacity_col_idx` must be >= 0, got {self.capacity_col_idx}.")

        X_arr = validate_data(
            self, X, dtype="numeric", ensure_all_finite="allow-nan", reset=True
        )

        # Compute total wealth denominator columns
        w_indices = (
            list(range(X_arr.shape[1]))
            if self.wealth_col_indices is None
            else [int(i) for i in self.wealth_col_indices]
        )
        wealth_sum = np.nansum(X_arr[:, w_indices], axis=1)

        # 95th-percentile scale to clip outliers and keep SoW in [0, 1]
        if len(wealth_sum) > 0:
            p95 = np.nanpercentile(wealth_sum, 95)
            self.wealth_scale_ = float(p95) if p95 > 0 else 1.0
        else:
            self.wealth_scale_ = 1.0

        return self

    def transform(self, X, y=None) -> np.ndarray:
        """Compute SoW score and numeric capacity tier.

        Parameters
        ----------
        X : array-like of shape (n_samples, n_features)

        Returns
        -------
        X_out : np.ndarray of shape (n_samples, 2), dtype float64
            Column 0: ``sow_score`` in [0, 1].
            Column 1: ``capacity_tier`` (0 = Leadership, 1 = Major, 2 = Principal).

        Raises
        ------
        sklearn.exceptions.NotFittedError
        """
        check_is_fitted(self, ["wealth_scale_"])
        X_arr = validate_data(
            self, X, dtype="numeric", ensure_all_finite="allow-nan", reset=False
        )

        w_indices = (
            list(range(X_arr.shape[1]))
            if self.wealth_col_indices is None
            else [int(i) for i in self.wealth_col_indices]
        )

        # Capacity column
        cap_idx = int(self.capacity_col_idx)
        if cap_idx >= X_arr.shape[1]:
            raise ValueError(
                f"`capacity_col_idx` ({cap_idx}) exceeds number of columns "
                f"({X_arr.shape[1]})."
            )
        capacity = np.maximum(
            np.nan_to_num(X_arr[:, cap_idx], nan=0.0),
            self.capacity_floor,
        )

        # Wealth sum — clip at 95th-percentile scale from fit to prevent score collapse
        wealth_raw = np.nansum(X_arr[:, w_indices], axis=1)
        wealth_clipped = np.clip(wealth_raw, 0.0, self.wealth_scale_)

        # SoW = capacity / (wealth + epsilon), then clip to [0, 1]
        sow = np.clip(
            capacity / (wealth_clipped + float(self.epsilon)), 0.0, 1.0
        )

        # Tier encoding (vectorised)
        tiers = np.zeros(len(sow), dtype=np.float64)
        tiers[sow >= 0.40] = float(self.TIER_ENCODING["Major"])
        tiers[sow >= 0.75] = float(self.TIER_ENCODING["Principal"])

        return np.column_stack([sow, tiers])

    def get_feature_names_out(self, input_features=None) -> np.ndarray:
        check_is_fitted(self)
        return np.array(["sow_score", "capacity_tier"], dtype=object)

    def get_tier_labels(self, X) -> np.ndarray:
        """Return human-readable tier labels for each row.

        Parameters
        ----------
        X : array-like compatible with :meth:`transform`

        Returns
        -------
        labels : ndarray of str, shape (n_samples,)
            One of ``"Principal"``, ``"Major"``, or ``"Leadership"`` per row.
        """
        out = self.transform(X)
        tier_ints = out[:, 1].astype(int)
        return np.array([self.TIER_LABELS[t] for t in tier_ints], dtype=object)

    def __sklearn_tags__(self):
        tags = super().__sklearn_tags__()
        tags.input_tags.allow_nan = True
        return tags

fit(X, y=None)

Fit the scorer: record wealth scale from training data.

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)
required
y ignored
None

Returns:

Name Type Description
self ShareOfWalletScorer
Source code in philanthropy/preprocessing/_share_of_wallet.py
def fit(self, X, y=None) -> "ShareOfWalletScorer":
    """Fit the scorer: record wealth scale from training data.

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)
    y : ignored

    Returns
    -------
    self : ShareOfWalletScorer
    """
    if self.epsilon < 0:
        raise ValueError(f"`epsilon` must be >= 0, got {self.epsilon}.")
    if not (0 <= self.capacity_col_idx):
        raise ValueError(f"`capacity_col_idx` must be >= 0, got {self.capacity_col_idx}.")

    X_arr = validate_data(
        self, X, dtype="numeric", ensure_all_finite="allow-nan", reset=True
    )

    # Compute total wealth denominator columns
    w_indices = (
        list(range(X_arr.shape[1]))
        if self.wealth_col_indices is None
        else [int(i) for i in self.wealth_col_indices]
    )
    wealth_sum = np.nansum(X_arr[:, w_indices], axis=1)

    # 95th-percentile scale to clip outliers and keep SoW in [0, 1]
    if len(wealth_sum) > 0:
        p95 = np.nanpercentile(wealth_sum, 95)
        self.wealth_scale_ = float(p95) if p95 > 0 else 1.0
    else:
        self.wealth_scale_ = 1.0

    return self

transform(X, y=None)

Compute SoW score and numeric capacity tier.

Parameters:

Name Type Description Default
X array-like of shape (n_samples, n_features)
required

Returns:

Name Type Description
X_out np.ndarray of shape (n_samples, 2), dtype float64

Column 0: sow_score in [0, 1]. Column 1: capacity_tier (0 = Leadership, 1 = Major, 2 = Principal).

Raises:

Type Description
NotFittedError
Source code in philanthropy/preprocessing/_share_of_wallet.py
def transform(self, X, y=None) -> np.ndarray:
    """Compute SoW score and numeric capacity tier.

    Parameters
    ----------
    X : array-like of shape (n_samples, n_features)

    Returns
    -------
    X_out : np.ndarray of shape (n_samples, 2), dtype float64
        Column 0: ``sow_score`` in [0, 1].
        Column 1: ``capacity_tier`` (0 = Leadership, 1 = Major, 2 = Principal).

    Raises
    ------
    sklearn.exceptions.NotFittedError
    """
    check_is_fitted(self, ["wealth_scale_"])
    X_arr = validate_data(
        self, X, dtype="numeric", ensure_all_finite="allow-nan", reset=False
    )

    w_indices = (
        list(range(X_arr.shape[1]))
        if self.wealth_col_indices is None
        else [int(i) for i in self.wealth_col_indices]
    )

    # Capacity column
    cap_idx = int(self.capacity_col_idx)
    if cap_idx >= X_arr.shape[1]:
        raise ValueError(
            f"`capacity_col_idx` ({cap_idx}) exceeds number of columns "
            f"({X_arr.shape[1]})."
        )
    capacity = np.maximum(
        np.nan_to_num(X_arr[:, cap_idx], nan=0.0),
        self.capacity_floor,
    )

    # Wealth sum — clip at 95th-percentile scale from fit to prevent score collapse
    wealth_raw = np.nansum(X_arr[:, w_indices], axis=1)
    wealth_clipped = np.clip(wealth_raw, 0.0, self.wealth_scale_)

    # SoW = capacity / (wealth + epsilon), then clip to [0, 1]
    sow = np.clip(
        capacity / (wealth_clipped + float(self.epsilon)), 0.0, 1.0
    )

    # Tier encoding (vectorised)
    tiers = np.zeros(len(sow), dtype=np.float64)
    tiers[sow >= 0.40] = float(self.TIER_ENCODING["Major"])
    tiers[sow >= 0.75] = float(self.TIER_ENCODING["Principal"])

    return np.column_stack([sow, tiers])

get_tier_labels(X)

Return human-readable tier labels for each row.

Parameters:

Name Type Description Default
X array-like compatible with :meth:`transform`
required

Returns:

Name Type Description
labels ndarray of str, shape (n_samples,)

One of "Principal", "Major", or "Leadership" per row.

Source code in philanthropy/preprocessing/_share_of_wallet.py
def get_tier_labels(self, X) -> np.ndarray:
    """Return human-readable tier labels for each row.

    Parameters
    ----------
    X : array-like compatible with :meth:`transform`

    Returns
    -------
    labels : ndarray of str, shape (n_samples,)
        One of ``"Principal"``, ``"Major"``, or ``"Leadership"`` per row.
    """
    out = self.transform(X)
    tier_ints = out[:, 1].astype(int)
    return np.array([self.TIER_LABELS[t] for t in tier_ints], dtype=object)