Skip to content

Medications

The medications module provides tools for medication lookup, treatment identification, episode detection, and adherence calculation.

When to Use

Use the medications module when you need to:

  • Look up NDC codes or J-codes by generic name
  • Identify treatment claims for specific medications
  • Categorize medications into therapeutic classes
  • Identify treatment episodes and switches
  • Calculate adherence metrics (PDC)
  • Classify payer types

Quick Example

from alx_heor.database import RedshiftConnection
from alx_heor.medications import (
    lookup_medications,
    get_treatment_claims,
    identify_treatment_episodes,
    calculate_pdc,
)

conn = RedshiftConnection().connect()

# Look up C5 inhibitors by generic name
meds = lookup_medications(
    conn,
    schema="iqvia_pharmetrics_2024q3",
    generic_names=["eculizumab", "ravulizumab"],
)
print(f"Found {len(meds.ndc_codes)} NDC codes, {len(meds.procedure_codes)} J-codes")

# Get treatment claims for cohort
df_rx = get_treatment_claims(
    conn,
    source="iqvia",
    schema="iqvia_pharmetrics_2024q3",
    patient_ids=df_cohort["pat_id"].tolist(),
    generic_names=["eculizumab", "ravulizumab", "efgartigimod"],
    start_year=2015,
    end_year=2024,
)

# Identify treatment episodes
df_episodes = identify_treatment_episodes(
    df_rx,
    patient_id_col="pat_id",
    date_col="from_dt",
    drug_col="generic_name",
    gap_days=60,  # New episode if >60 day gap
)

# Calculate PDC for each episode
df_pdc = calculate_pdc(
    df_episodes,
    patient_id_col="pat_id",
    start_col="episode_start",
    end_col="episode_end",
    days_supply_col="days_supply",
)

Common Patterns

Medication Lookup

# Look up by generic name
result = lookup_medications(
    conn,
    schema="iqvia_pharmetrics_2024q3",
    generic_names=["eculizumab"],
)

# Access NDC codes and J-codes
ndc_list = result.ndc_codes       # ['12345678901', '12345678902', ...]
jcode_list = result.procedure_codes  # ['J1300', 'J1303', ...]

Treatment Categorization

from alx_heor.medications import assign_medication_category

# Define categories
categories = {
    "C5_INHIBITOR": ["eculizumab", "ravulizumab"],
    "FCRN_INHIBITOR": ["efgartigimod", "rozanolixizumab"],
    "CHOLINESTERASE": ["pyridostigmine", "neostigmine"],
}

df_rx = assign_medication_category(
    df_rx,
    category_mapping=categories,
    drug_col="generic_name",
    output_col="drug_class",
)

Payer Classification

from alx_heor.medications import classify_payer_type

df_payer = classify_payer_type(
    conn,
    source="iqvia",
    schema="iqvia_pharmetrics_2024q3",
    patient_ids=df_cohort["pat_id"].tolist(),
)
# Returns: pat_id, pay_type, payer_type (COMMERCIAL, MEDICARE, MEDICAID, OTHER)
  • cohort - Uses medications via MedicationCriteria
  • claims - Lower-level claims access
  • config - Column mappings for NDC, procedure codes

medications

Medication lookup and treatment analysis utilities for pharmacoepidemiology.

This module supports medication-focused analyses in RWE studies. In claims databases, medications are identified through two coding systems: - NDC codes: National Drug Codes for pharmacy dispensings (11-digit codes) - J-codes: Healthcare Common Procedure Codes (HCPCS) for physician-administered drugs (typically biologics, infusions, injections)

Understanding these coding systems is essential because expensive specialty drugs (like C5 inhibitors for gMG) are often administered in physician offices and appear as J-codes rather than NDC codes in pharmacy claims.

Key Concepts:

NDC codes: Identify specific drug products at the manufacturer/packager level. Format: 11 digits (5-4-2 or 4-4-2 variations). Found in pharmacy claims.

J-codes: Part of HCPCS Level II, identify drugs administered by healthcare providers (infusions, injections). Format: J + 4 digits (e.g., J1300 for eculizumab). Found in medical claims.

Treatment episodes: Contiguous periods of treatment separated by gaps. A gap > 45 days typically indicates treatment discontinuation.

PDC (Proportion of Days Covered): Standard adherence metric. PDC = (Days with medication available) / (Observation period). PDC ≥ 0.8 is considered "adherent" for chronic conditions.

Core Functions:

  • lookup_medications: Find NDC/J-codes from generic drug names
  • get_treatment_claims: Query claims for specific medications
  • identify_treatment_episodes: Group claims into treatment episodes
  • calculate_pdc: Calculate medication adherence
  • classify_payer_type: Classify patients by insurance type

Typical Workflow:

  1. lookup_medications() - Find codes for drugs of interest
  2. get_treatment_claims() - Query claims with those codes
  3. assign_medication_category() - Label claims by drug
  4. identify_treatment_episodes() - Group into treatment periods
  5. calculate_pdc() - Measure adherence (if relevant)
Example

Analyze C5 inhibitor treatment in gMG cohort:

from alx_heor.medications import ( ... lookup_medications, ... get_treatment_claims, ... identify_treatment_episodes, ... calculate_pdc, ... ) result = lookup_medications( ... conn, ... source='iqvia', ... schema='iqvia_pharmetrics_2024q3', ... generic_names=['eculizumab', 'ravulizumab', 'efgartigimod'], ... ) print(result.summary()) Medication Lookup Summary ================================================== NDC codes found: 45 - eculizumab: 12 - ravulizumab: 18 - efgartigimod: 15 J-codes found: 3 - eculizumab: 1 - ravulizumab: 1 - efgartigimod: 1 df_rx = get_treatment_claims( ... conn, ... source='iqvia', ... schema='iqvia_pharmetrics_2024q3', ... patient_ids=df_cohort['pat_id'].tolist(), ... start_year=2015, ... end_year=2024, ... ndc_codes=result.ndc_list, ... jcodes=result.jcode_list, ... ) print(f"Treatment claims: {len(df_rx):,}") df_episodes = identify_treatment_episodes( ... df_rx, ... patient_id_col='pat_id', ... gap_days=45, ... ) print(f"Patients with treatment: {df_episodes['patient_id'].nunique():,}")

See Also

cohort.MedicationCriteria : Medication-based inclusion/exclusion criteria claims.get_claims : Query claims by diagnosis (rather than medication)

Notes
  • J-codes are more reliable for identifying biologics than NDC codes
  • NDC codes change frequently (new manufacturers, repackagers)
  • Always verify lookup results against known drug products
  • Exclude hyaluronidase formulations if studying the active drug alone

MedicationLookupResult dataclass

Result from medication lookup.

Attributes:

Name Type Description
df_ndc DataFrame

NDC codes with generic names and strengths.

df_jcodes DataFrame

J-codes/procedure codes with descriptions.

ndc_list list[str]

List of NDC codes for SQL IN clauses.

jcode_list list[str]

List of J-codes for SQL IN clauses.

Source code in alx_heor\medications\__init__.py
@dataclass
class MedicationLookupResult:
    """Result from medication lookup.

    Attributes
    ----------
    df_ndc : pd.DataFrame
        NDC codes with generic names and strengths.
    df_jcodes : pd.DataFrame
        J-codes/procedure codes with descriptions.
    ndc_list : list[str]
        List of NDC codes for SQL IN clauses.
    jcode_list : list[str]
        List of J-codes for SQL IN clauses.
    """

    df_ndc: pd.DataFrame
    df_jcodes: pd.DataFrame
    ndc_list: list[str] = field(default_factory=list)
    jcode_list: list[str] = field(default_factory=list)

    def summary(self) -> str:
        """Generate human-readable summary of lookup results."""
        lines = ["Medication Lookup Summary", "=" * 50]

        lines.append(f"\nNDC codes found: {len(self.df_ndc)}")
        if not self.df_ndc.empty and "generic_category" in self.df_ndc.columns:
            by_category = self.df_ndc["generic_category"].value_counts()
            for cat, count in by_category.items():
                lines.append(f"  - {cat}: {count}")

        lines.append(f"\nJ-codes found: {len(self.df_jcodes)}")
        if not self.df_jcodes.empty and "generic_category" in self.df_jcodes.columns:
            by_category = self.df_jcodes["generic_category"].value_counts()
            for cat, count in by_category.items():
                lines.append(f"  - {cat}: {count}")

        return "\n".join(lines)

summary

summary() -> str

Generate human-readable summary of lookup results.

Source code in alx_heor\medications\__init__.py
def summary(self) -> str:
    """Generate human-readable summary of lookup results."""
    lines = ["Medication Lookup Summary", "=" * 50]

    lines.append(f"\nNDC codes found: {len(self.df_ndc)}")
    if not self.df_ndc.empty and "generic_category" in self.df_ndc.columns:
        by_category = self.df_ndc["generic_category"].value_counts()
        for cat, count in by_category.items():
            lines.append(f"  - {cat}: {count}")

    lines.append(f"\nJ-codes found: {len(self.df_jcodes)}")
    if not self.df_jcodes.empty and "generic_category" in self.df_jcodes.columns:
        by_category = self.df_jcodes["generic_category"].value_counts()
        for cat, count in by_category.items():
            lines.append(f"  - {cat}: {count}")

    return "\n".join(lines)

TreatmentEpisode dataclass

A treatment episode for a patient.

Attributes:

Name Type Description
patient_id str

Patient identifier.

medication str

Medication name/category.

episode_num int

Episode number (1-indexed).

start_date Timestamp

Episode start date.

end_date Timestamp

Episode end date.

num_claims int

Number of claims in episode.

duration_days int

Duration in days.

Source code in alx_heor\medications\__init__.py
@dataclass
class TreatmentEpisode:
    """A treatment episode for a patient.

    Attributes
    ----------
    patient_id : str
        Patient identifier.
    medication : str
        Medication name/category.
    episode_num : int
        Episode number (1-indexed).
    start_date : pd.Timestamp
        Episode start date.
    end_date : pd.Timestamp
        Episode end date.
    num_claims : int
        Number of claims in episode.
    duration_days : int
        Duration in days.
    """

    patient_id: str
    medication: str
    episode_num: int
    start_date: pd.Timestamp
    end_date: pd.Timestamp
    num_claims: int
    duration_days: int

lookup_medications

lookup_medications(conn: RedshiftConnection, source: str, schema: str, generic_names: list[str], exclude_keywords: list[str] | None = None, include_jcodes_only: bool = True) -> MedicationLookupResult

Look up NDC codes and J-codes for medications by generic name.

This function searches the database's medication lookup tables (rx_lookup for NDC codes, pr_lookup for procedure/J-codes) to find all codes matching the specified generic drug names. This is essential because: - A single drug may have dozens of NDC codes (different manufacturers, strengths) - J-codes identify physician-administered drugs (biologics, infusions) - Manual code lookup is error-prone and time-consuming

The function returns both the full DataFrames (for review) and convenient lists (for SQL IN clauses).

Workflow:

  1. lookup_medications() <-- you are here
  2. get_treatment_claims() - Query claims with found codes
  3. assign_medication_category() - Label claims by drug
  4. identify_treatment_episodes() - Group into treatment periods

Parameters:

Name Type Description Default
conn RedshiftConnection

Active database connection. Must be connected before calling.

required
source str

Data source name: 'iqvia', 'optum', 'komodo'.

required
schema str

Database schema name (e.g., 'iqvia_pharmetrics_2024q3').

required
generic_names list[str]

List of generic medication names to search. Case-insensitive. Examples: ['eculizumab', 'ravulizumab'] for C5 inhibitors.

required
exclude_keywords list[str]

Keywords to exclude from results. Case-insensitive. Example: ['HYALU'] to exclude hyaluronidase co-formulations.

None
include_jcodes_only bool

If True, only include J-codes (not other HCPCS codes like C-codes). J-codes are the most reliable for drug identification.

True

Returns:

Type Description
MedicationLookupResult

Dataclass containing: - df_ndc: DataFrame of matching NDC codes with generic_name, strength - df_jcodes: DataFrame of matching J-codes with descriptions - ndc_list: List of NDC codes (for SQL IN clauses) - jcode_list: List of J-codes (for SQL IN clauses) - summary(): Method to print human-readable summary

See Also

get_treatment_claims : Next step - query claims with found codes cohort.MedicationCriteria : Use medication codes in cohort criteria

Notes
  • Always review df_ndc and df_jcodes to verify correct drugs were found
  • NDC codes change frequently - verify against current FDA database
  • Exclude hyaluronidase for biologics that have subcutaneous formulations
  • J-codes are more stable than NDC codes over time

Examples:

Look up C5 inhibitors for gMG treatment analysis:

>>> result = lookup_medications(
...     conn,
...     source='iqvia',
...     schema='iqvia_pharmetrics_2024q3',
...     generic_names=['eculizumab', 'ravulizumab', 'efgartigimod'],
... )
>>> print(result.summary())
>>> print(f"Found {len(result.ndc_list)} NDC codes and {len(result.jcode_list)} J-codes")

Exclude hyaluronidase co-formulations (for pure drug analysis):

>>> result = lookup_medications(
...     conn,
...     source='iqvia',
...     schema='iqvia_pharmetrics_2024q3',
...     generic_names=['eculizumab'],
...     exclude_keywords=['HYALU'],  # Exclude hyaluronidase formulations
... )

Review the found codes before using:

>>> result = lookup_medications(conn, source='iqvia', ...)
>>> print(result.df_ndc[['ndc', 'generic_name', 'strength']].head(10))
>>> print(result.df_jcodes[['procedure_cd', 'procedure_desc']])

Use codes in SQL query:

>>> ndc_str = ', '.join(f"'{c}'" for c in result.ndc_list)
>>> sql = f"SELECT * FROM claims WHERE ndc IN ({ndc_str})"
Source code in alx_heor\medications\__init__.py
def lookup_medications(
    conn: RedshiftConnection,
    source: str,
    schema: str,
    generic_names: list[str],
    exclude_keywords: list[str] | None = None,
    include_jcodes_only: bool = True,
) -> MedicationLookupResult:
    """Look up NDC codes and J-codes for medications by generic name.

    This function searches the database's medication lookup tables (rx_lookup
    for NDC codes, pr_lookup for procedure/J-codes) to find all codes matching
    the specified generic drug names. This is essential because:
    - A single drug may have dozens of NDC codes (different manufacturers, strengths)
    - J-codes identify physician-administered drugs (biologics, infusions)
    - Manual code lookup is error-prone and time-consuming

    The function returns both the full DataFrames (for review) and convenient
    lists (for SQL IN clauses).

    **Workflow:**

    1. **lookup_medications()** <-- you are here
    2. get_treatment_claims() - Query claims with found codes
    3. assign_medication_category() - Label claims by drug
    4. identify_treatment_episodes() - Group into treatment periods

    Parameters
    ----------
    conn : RedshiftConnection
        Active database connection. Must be connected before calling.
    source : str
        Data source name: 'iqvia', 'optum', 'komodo'.
    schema : str
        Database schema name (e.g., 'iqvia_pharmetrics_2024q3').
    generic_names : list[str]
        List of generic medication names to search. Case-insensitive.
        Examples: ['eculizumab', 'ravulizumab'] for C5 inhibitors.
    exclude_keywords : list[str], optional
        Keywords to exclude from results. Case-insensitive.
        Example: ['HYALU'] to exclude hyaluronidase co-formulations.
    include_jcodes_only : bool, default=True
        If True, only include J-codes (not other HCPCS codes like C-codes).
        J-codes are the most reliable for drug identification.

    Returns
    -------
    MedicationLookupResult
        Dataclass containing:
        - df_ndc: DataFrame of matching NDC codes with generic_name, strength
        - df_jcodes: DataFrame of matching J-codes with descriptions
        - ndc_list: List of NDC codes (for SQL IN clauses)
        - jcode_list: List of J-codes (for SQL IN clauses)
        - summary(): Method to print human-readable summary

    See Also
    --------
    get_treatment_claims : Next step - query claims with found codes
    cohort.MedicationCriteria : Use medication codes in cohort criteria

    Notes
    -----
    - Always review df_ndc and df_jcodes to verify correct drugs were found
    - NDC codes change frequently - verify against current FDA database
    - Exclude hyaluronidase for biologics that have subcutaneous formulations
    - J-codes are more stable than NDC codes over time

    Examples
    --------
    Look up C5 inhibitors for gMG treatment analysis:

    >>> result = lookup_medications(
    ...     conn,
    ...     source='iqvia',
    ...     schema='iqvia_pharmetrics_2024q3',
    ...     generic_names=['eculizumab', 'ravulizumab', 'efgartigimod'],
    ... )
    >>> print(result.summary())
    >>> print(f"Found {len(result.ndc_list)} NDC codes and {len(result.jcode_list)} J-codes")

    Exclude hyaluronidase co-formulations (for pure drug analysis):

    >>> result = lookup_medications(
    ...     conn,
    ...     source='iqvia',
    ...     schema='iqvia_pharmetrics_2024q3',
    ...     generic_names=['eculizumab'],
    ...     exclude_keywords=['HYALU'],  # Exclude hyaluronidase formulations
    ... )

    Review the found codes before using:

    >>> result = lookup_medications(conn, source='iqvia', ...)
    >>> print(result.df_ndc[['ndc', 'generic_name', 'strength']].head(10))
    >>> print(result.df_jcodes[['procedure_cd', 'procedure_desc']])

    Use codes in SQL query:

    >>> ndc_str = ', '.join(f"'{c}'" for c in result.ndc_list)
    >>> sql = f"SELECT * FROM claims WHERE ndc IN ({ndc_str})"
    """
    # Build regex pattern for SIMILAR TO
    pattern = "|".join(name.lower() for name in generic_names)

    # Get lookup table names from config
    config = get_source_config(source)
    rx_lookup_table = config.get("rx_lookup_table", "rx_lookup")
    pr_lookup_table = config.get("pr_lookup_table", "pr_lookup")

    # Query NDC lookup table
    sql_ndc = f"""
        SELECT ndc, generic_name, strength
        FROM {schema}.{rx_lookup_table}
        WHERE LOWER(generic_name) SIMILAR TO '%({pattern})%'
    """
    df_ndc = conn.query(sql_ndc)

    # Query procedure/J-code lookup table
    jcode_filter = "AND procedure LIKE 'J%'" if include_jcodes_only else ""
    sql_jcode = f"""
        SELECT procedure_cd, procedure, procedure_desc
        FROM {schema}.{pr_lookup_table}
        WHERE LOWER(procedure_desc) SIMILAR TO '%({pattern})%'
        {jcode_filter}
    """
    df_jcodes = conn.query(sql_jcode)

    # Assign generic category
    df_ndc = _assign_generic_category(df_ndc, generic_names, "generic_name")
    df_jcodes = _assign_generic_category(df_jcodes, generic_names, "procedure_desc")

    # Apply exclusions
    if exclude_keywords:
        exclude_pattern = "|".join(exclude_keywords)
        if not df_ndc.empty:
            df_ndc = df_ndc[
                ~df_ndc["generic_name"].str.contains(exclude_pattern, case=False, na=False)
            ]
        if not df_jcodes.empty:
            df_jcodes = df_jcodes[
                ~df_jcodes["procedure_desc"].str.contains(
                    exclude_pattern, case=False, na=False
                )
            ]

    # Build lists for SQL
    ndc_list = df_ndc["ndc"].tolist() if not df_ndc.empty else []
    jcode_list = df_jcodes["procedure_cd"].tolist() if not df_jcodes.empty else []

    return MedicationLookupResult(
        df_ndc=df_ndc,
        df_jcodes=df_jcodes,
        ndc_list=ndc_list,
        jcode_list=jcode_list,
    )

build_medication_sql_filter

build_medication_sql_filter(ndc_codes: list[str] | None = None, jcodes: list[str] | None = None, ndc_column: str = 'ndc', proc_column: str = 'proc1') -> str

Build SQL WHERE clause for medication filtering.

Parameters:

Name Type Description Default
ndc_codes list[str]

List of NDC codes to filter.

None
jcodes list[str]

List of J-codes to filter.

None
ndc_column str

Column name for NDC codes (default: 'ndc').

'ndc'
proc_column str

Column name for procedure codes (default: 'proc1').

'proc1'

Returns:

Type Description
str

SQL WHERE clause (without 'WHERE' keyword).

Example

sql_filter = build_medication_sql_filter( ... ndc_codes=['12345', '67890'], ... jcodes=['J1300', 'J9332'], ... ) sql = f"SELECT * FROM claims WHERE {sql_filter}"

Source code in alx_heor\medications\__init__.py
def build_medication_sql_filter(
    ndc_codes: list[str] | None = None,
    jcodes: list[str] | None = None,
    ndc_column: str = "ndc",
    proc_column: str = "proc1",
) -> str:
    """Build SQL WHERE clause for medication filtering.

    Parameters
    ----------
    ndc_codes : list[str], optional
        List of NDC codes to filter.
    jcodes : list[str], optional
        List of J-codes to filter.
    ndc_column : str
        Column name for NDC codes (default: 'ndc').
    proc_column : str
        Column name for procedure codes (default: 'proc1').

    Returns
    -------
    str
        SQL WHERE clause (without 'WHERE' keyword).

    Example
    -------
    >>> sql_filter = build_medication_sql_filter(
    ...     ndc_codes=['12345', '67890'],
    ...     jcodes=['J1300', 'J9332'],
    ... )
    >>> sql = f"SELECT * FROM claims WHERE {sql_filter}"
    """
    conditions = []

    if ndc_codes:
        ndc_str = ", ".join(f"'{code}'" for code in ndc_codes)
        conditions.append(f"{ndc_column} IN ({ndc_str})")

    if jcodes:
        jcode_str = ", ".join(f"'{code}'" for code in jcodes)
        conditions.append(f"{proc_column} IN ({jcode_str})")

    if not conditions:
        return "1=1"  # No filter

    return " OR ".join(conditions)

get_treatment_claims

get_treatment_claims(conn: RedshiftConnection, source: str, schema: str, patient_ids: list[str], start_year: int, end_year: int, ndc_codes: list[str] | None = None, jcodes: list[str] | None = None, columns: list[str] | None = None) -> pd.DataFrame

Get treatment claims for specified patients and medications.

Parameters:

Name Type Description Default
conn RedshiftConnection

Active database connection.

required
source str

Data source name: 'iqvia', 'optum', 'komodo'.

required
schema str

Database schema name.

required
patient_ids list[str]

List of patient IDs.

required
start_year int

Start year for claims tables.

required
end_year int

End year for claims tables.

required
ndc_codes list[str]

NDC codes to filter.

None
jcodes list[str]

J-codes to filter.

None
columns list[str]

Columns to select. If None, uses defaults.

None

Returns:

Type Description
DataFrame

Treatment claims for the specified patients and medications.

Example

df_rx = get_treatment_claims( ... conn, source='iqvia', schema='iqvia_pharmetrics_2024q3', ... patient_ids=['P001', 'P002'], ... start_year=2020, end_year=2024, ... ndc_codes=['12345'], jcodes=['J1300'], ... )

Source code in alx_heor\medications\__init__.py
def get_treatment_claims(
    conn: RedshiftConnection,
    source: str,
    schema: str,
    patient_ids: list[str],
    start_year: int,
    end_year: int,
    ndc_codes: list[str] | None = None,
    jcodes: list[str] | None = None,
    columns: list[str] | None = None,
) -> pd.DataFrame:
    """Get treatment claims for specified patients and medications.

    Parameters
    ----------
    conn : RedshiftConnection
        Active database connection.
    source : str
        Data source name: 'iqvia', 'optum', 'komodo'.
    schema : str
        Database schema name.
    patient_ids : list[str]
        List of patient IDs.
    start_year : int
        Start year for claims tables.
    end_year : int
        End year for claims tables.
    ndc_codes : list[str], optional
        NDC codes to filter.
    jcodes : list[str], optional
        J-codes to filter.
    columns : list[str], optional
        Columns to select. If None, uses defaults.

    Returns
    -------
    pd.DataFrame
        Treatment claims for the specified patients and medications.

    Example
    -------
    >>> df_rx = get_treatment_claims(
    ...     conn, source='iqvia', schema='iqvia_pharmetrics_2024q3',
    ...     patient_ids=['P001', 'P002'],
    ...     start_year=2020, end_year=2024,
    ...     ndc_codes=['12345'], jcodes=['J1300'],
    ... )
    """
    config = get_source_config(source)
    patient_id_col = config["columns"]["patient_id"]
    service_date_col = config["columns"]["service_date"]
    ndc_col = config["columns"]["ndc"]
    proc_col = config["columns"].get("procedure_code", "proc1")
    claims_pattern = config["claims_table_pattern"]

    # Default columns
    if columns is None:
        columns = [patient_id_col, service_date_col, ndc_col, proc_col]
        if "service_end_date" in config["columns"]:
            columns.append(config["columns"]["service_end_date"])

    # Build medication filter
    med_filter = build_medication_sql_filter(
        ndc_codes=ndc_codes,
        jcodes=jcodes,
        ndc_column=ndc_col,
        proc_column=proc_col,
    )

    # Build patient filter
    patient_str = ", ".join(f"'{pid}'" for pid in patient_ids)
    patient_filter = f"{patient_id_col} IN ({patient_str})"

    # Build UNION of claims tables
    cols_str = ", ".join(columns)
    union_parts = []
    for year in range(start_year, end_year + 1):
        table = claims_pattern.format(year=year)
        union_parts.append(
            f"SELECT {cols_str} FROM {schema}.{table} "
            f"WHERE {patient_filter} AND ({med_filter})"
        )

    sql = " UNION ALL ".join(union_parts)
    return conn.query(sql)

assign_medication_category

assign_medication_category(df_claims: DataFrame, medication_map: dict[str, str], ndc_column: str = 'ndc', proc_column: str = 'proc1', category_column: str = 'medication') -> pd.DataFrame

Assign medication category to claims based on NDC or procedure codes.

Parameters:

Name Type Description Default
df_claims DataFrame

Claims data with NDC and/or procedure columns.

required
medication_map dict[str, str]

Mapping of codes to medication names. Example: {'J1300': 'eculizumab', '12345678901': 'eculizumab'}

required
ndc_column str

Column name for NDC codes.

'ndc'
proc_column str

Column name for procedure codes.

'proc1'
category_column str

Name for the new category column.

'medication'

Returns:

Type Description
DataFrame

Claims with medication category column added.

Example

med_map = {'J1300': 'eculizumab', 'J9332': 'efgartigimod'} df = assign_medication_category(df_claims, med_map)

Source code in alx_heor\medications\__init__.py
def assign_medication_category(
    df_claims: pd.DataFrame,
    medication_map: dict[str, str],
    ndc_column: str = "ndc",
    proc_column: str = "proc1",
    category_column: str = "medication",
) -> pd.DataFrame:
    """Assign medication category to claims based on NDC or procedure codes.

    Parameters
    ----------
    df_claims : pd.DataFrame
        Claims data with NDC and/or procedure columns.
    medication_map : dict[str, str]
        Mapping of codes to medication names.
        Example: {'J1300': 'eculizumab', '12345678901': 'eculizumab'}
    ndc_column : str
        Column name for NDC codes.
    proc_column : str
        Column name for procedure codes.
    category_column : str
        Name for the new category column.

    Returns
    -------
    pd.DataFrame
        Claims with medication category column added.

    Example
    -------
    >>> med_map = {'J1300': 'eculizumab', 'J9332': 'efgartigimod'}
    >>> df = assign_medication_category(df_claims, med_map)
    """
    df = df_claims.copy()

    # Create category column
    df[category_column] = "Unknown"

    for code, medication in medication_map.items():
        # Check NDC column
        if ndc_column in df.columns:
            mask = df[ndc_column].astype(str) == str(code)
            df.loc[mask, category_column] = medication

        # Check procedure column
        if proc_column in df.columns:
            mask = df[proc_column].astype(str) == str(code)
            df.loc[mask, category_column] = medication

    return df

identify_treatment_episodes

identify_treatment_episodes(df_claims: DataFrame, patient_id_col: str = 'pat_id', date_col: str = 'from_dt', medication_col: str = 'medication', gap_days: int = 45) -> pd.DataFrame

Identify treatment episodes from claims data.

A new episode starts when there's a gap > gap_days between claims.

Parameters:

Name Type Description Default
df_claims DataFrame

Claims data with patient, date, and medication columns.

required
patient_id_col str

Patient ID column name.

'pat_id'
date_col str

Service date column name.

'from_dt'
medication_col str

Medication category column name.

'medication'
gap_days int

Maximum gap (days) within an episode. Default 45.

45

Returns:

Type Description
DataFrame

Episode summary with columns: - patient_id, medication, episode_num - start_date, end_date, num_claims, duration_days

Example

df_episodes = identify_treatment_episodes( ... df_rx, patient_id_col='pat_id', gap_days=45, ... ) df_first = df_episodes[df_episodes['episode_num'] == 1]

Source code in alx_heor\medications\__init__.py
def identify_treatment_episodes(
    df_claims: pd.DataFrame,
    patient_id_col: str = "pat_id",
    date_col: str = "from_dt",
    medication_col: str = "medication",
    gap_days: int = 45,
) -> pd.DataFrame:
    """Identify treatment episodes from claims data.

    A new episode starts when there's a gap > gap_days between claims.

    Parameters
    ----------
    df_claims : pd.DataFrame
        Claims data with patient, date, and medication columns.
    patient_id_col : str
        Patient ID column name.
    date_col : str
        Service date column name.
    medication_col : str
        Medication category column name.
    gap_days : int
        Maximum gap (days) within an episode. Default 45.

    Returns
    -------
    pd.DataFrame
        Episode summary with columns:
        - patient_id, medication, episode_num
        - start_date, end_date, num_claims, duration_days

    Example
    -------
    >>> df_episodes = identify_treatment_episodes(
    ...     df_rx, patient_id_col='pat_id', gap_days=45,
    ... )
    >>> df_first = df_episodes[df_episodes['episode_num'] == 1]
    """
    df = df_claims.copy()
    df[date_col] = pd.to_datetime(df[date_col])
    df = df.sort_values([patient_id_col, medication_col, date_col])

    # Calculate days since previous claim
    df["prev_date"] = df.groupby([patient_id_col, medication_col])[date_col].shift(1)
    df["days_since_prev"] = (df[date_col] - df["prev_date"]).dt.days

    # Mark episode starts (first claim or gap > threshold)
    df["new_episode"] = (df["days_since_prev"].isna()) | (df["days_since_prev"] > gap_days)

    # Assign episode numbers
    df["episode_num"] = df.groupby([patient_id_col, medication_col])["new_episode"].cumsum()

    # Aggregate to episode level
    episodes = (
        df.groupby([patient_id_col, medication_col, "episode_num"])
        .agg(
            start_date=(date_col, "min"),
            end_date=(date_col, "max"),
            num_claims=(date_col, "count"),
        )
        .reset_index()
    )

    episodes["duration_days"] = (episodes["end_date"] - episodes["start_date"]).dt.days + 1

    # Rename columns
    episodes = episodes.rename(columns={patient_id_col: "patient_id"})

    return episodes

get_first_treatment_date

get_first_treatment_date(df_claims: DataFrame, patient_id_col: str = 'pat_id', date_col: str = 'from_dt', medication_col: str | None = None) -> pd.DataFrame

Get first treatment date for each patient (optionally by medication).

Parameters:

Name Type Description Default
df_claims DataFrame

Claims data.

required
patient_id_col str

Patient ID column name.

'pat_id'
date_col str

Service date column name.

'from_dt'
medication_col str

If provided, get first date per patient per medication.

None

Returns:

Type Description
DataFrame

First treatment date per patient (and medication if specified).

Example

df_first = get_first_treatment_date(df_rx, patient_id_col='pat_id') df_cohort = df_cohort.merge(df_first, on='pat_id', how='left')

Source code in alx_heor\medications\__init__.py
def get_first_treatment_date(
    df_claims: pd.DataFrame,
    patient_id_col: str = "pat_id",
    date_col: str = "from_dt",
    medication_col: str | None = None,
) -> pd.DataFrame:
    """Get first treatment date for each patient (optionally by medication).

    Parameters
    ----------
    df_claims : pd.DataFrame
        Claims data.
    patient_id_col : str
        Patient ID column name.
    date_col : str
        Service date column name.
    medication_col : str, optional
        If provided, get first date per patient per medication.

    Returns
    -------
    pd.DataFrame
        First treatment date per patient (and medication if specified).

    Example
    -------
    >>> df_first = get_first_treatment_date(df_rx, patient_id_col='pat_id')
    >>> df_cohort = df_cohort.merge(df_first, on='pat_id', how='left')
    """
    df = df_claims.copy()
    df[date_col] = pd.to_datetime(df[date_col])

    group_cols = [patient_id_col]
    if medication_col:
        group_cols.append(medication_col)

    result = (
        df.groupby(group_cols)[date_col]
        .min()
        .reset_index()
        .rename(columns={date_col: "first_treatment_date"})
    )

    return result

calculate_pdc

calculate_pdc(df_claims: DataFrame, patient_id_col: str = 'pat_id', date_col: str = 'from_dt', days_supply_col: str | None = None, observation_window: int = 365, index_date_col: str | None = None) -> pd.DataFrame

Calculate Proportion of Days Covered (PDC) for each patient.

PDC is the standard medication adherence metric in pharmacoepidemiology. It measures what proportion of the observation period a patient had medication available (based on dispensing records and days supply).

PDC = (Days with medication available) / (Observation window)

Clinical Interpretation:

  • PDC ≥ 0.80 (80%): Generally considered "adherent"
  • PDC 0.50-0.79: Partial adherence
  • PDC < 0.50: Non-adherent

The 80% threshold is used because clinical outcomes typically improve significantly above this level for chronic conditions.

Calculation Method:

For each patient: 1. Define observation window (e.g., 365 days from first fill or index) 2. For each dispensing, mark days covered based on days_supply 3. Handle overlapping supplies (no double-counting) 4. PDC = unique covered days / observation window

Parameters:

Name Type Description Default
df_claims DataFrame

Treatment claims data with dates and optional days supply. Output from get_treatment_claims().

required
patient_id_col str

Patient ID column name.

'pat_id'
date_col str

Service date column name.

'from_dt'
days_supply_col str

Days supply column. If None or not in DataFrame, assumes 30-day supply per claim (common for biologics).

None
observation_window int

Number of days to observe (typically 365 for annual adherence).

365
index_date_col str

If provided, observation starts from this date column. Otherwise uses first treatment date as day 0.

None

Returns:

Type Description
DataFrame

One row per patient with columns: - patient_id: Patient identifier - pdc: Proportion of Days Covered (0.0 to 1.0) - covered_days: Number of days with medication available - observation_days: Length of observation window - observation_start: Start date of observation - observation_end: End date of observation

See Also

get_treatment_claims : Get treatment claims for PDC calculation identify_treatment_episodes : Alternative approach for episodic treatment

Notes
  • PDC handles overlapping supplies by not double-counting days
  • For injectable biologics, days_supply may not be captured - assume 30
  • PDC is typically calculated for chronic, maintenance medications
  • Early discontinuation will result in low PDC (as intended)

Examples:

Calculate annual PDC for gMG treatment:

>>> df_pdc = calculate_pdc(
...     df_rx,
...     patient_id_col='pat_id',
...     observation_window=365,
... )
>>> print(f"Mean PDC: {df_pdc['pdc'].mean():.2%}")
>>> print(f"Median PDC: {df_pdc['pdc'].median():.2%}")

Filter to adherent patients (PDC >= 80%):

>>> adherent = df_pdc[df_pdc['pdc'] >= 0.80]
>>> print(f"Adherent patients: {len(adherent):,} ({len(adherent)/len(df_pdc)*100:.1f}%)")

Stratify by adherence level:

>>> df_pdc['adherence_category'] = pd.cut(
...     df_pdc['pdc'],
...     bins=[0, 0.50, 0.80, 1.0],
...     labels=['Non-adherent (<50%)', 'Partial (50-79%)', 'Adherent (≥80%)'],
... )
>>> print(df_pdc['adherence_category'].value_counts())

Calculate from index date (not first fill):

>>> df_with_index = df_rx.merge(df_cohort[['pat_id', 'index_date']], on='pat_id')
>>> df_pdc = calculate_pdc(
...     df_with_index,
...     index_date_col='index_date',
...     observation_window=365,
... )
Source code in alx_heor\medications\__init__.py
def calculate_pdc(
    df_claims: pd.DataFrame,
    patient_id_col: str = "pat_id",
    date_col: str = "from_dt",
    days_supply_col: str | None = None,
    observation_window: int = 365,
    index_date_col: str | None = None,
) -> pd.DataFrame:
    """Calculate Proportion of Days Covered (PDC) for each patient.

    PDC is the standard medication adherence metric in pharmacoepidemiology.
    It measures what proportion of the observation period a patient had
    medication available (based on dispensing records and days supply).

    PDC = (Days with medication available) / (Observation window)

    **Clinical Interpretation:**

    - PDC ≥ 0.80 (80%): Generally considered "adherent"
    - PDC 0.50-0.79: Partial adherence
    - PDC < 0.50: Non-adherent

    The 80% threshold is used because clinical outcomes typically improve
    significantly above this level for chronic conditions.

    **Calculation Method:**

    For each patient:
    1. Define observation window (e.g., 365 days from first fill or index)
    2. For each dispensing, mark days covered based on days_supply
    3. Handle overlapping supplies (no double-counting)
    4. PDC = unique covered days / observation window

    Parameters
    ----------
    df_claims : pd.DataFrame
        Treatment claims data with dates and optional days supply.
        Output from get_treatment_claims().
    patient_id_col : str, default='pat_id'
        Patient ID column name.
    date_col : str, default='from_dt'
        Service date column name.
    days_supply_col : str, optional
        Days supply column. If None or not in DataFrame, assumes
        30-day supply per claim (common for biologics).
    observation_window : int, default=365
        Number of days to observe (typically 365 for annual adherence).
    index_date_col : str, optional
        If provided, observation starts from this date column.
        Otherwise uses first treatment date as day 0.

    Returns
    -------
    pd.DataFrame
        One row per patient with columns:
        - patient_id: Patient identifier
        - pdc: Proportion of Days Covered (0.0 to 1.0)
        - covered_days: Number of days with medication available
        - observation_days: Length of observation window
        - observation_start: Start date of observation
        - observation_end: End date of observation

    See Also
    --------
    get_treatment_claims : Get treatment claims for PDC calculation
    identify_treatment_episodes : Alternative approach for episodic treatment

    Notes
    -----
    - PDC handles overlapping supplies by not double-counting days
    - For injectable biologics, days_supply may not be captured - assume 30
    - PDC is typically calculated for chronic, maintenance medications
    - Early discontinuation will result in low PDC (as intended)

    Examples
    --------
    Calculate annual PDC for gMG treatment:

    >>> df_pdc = calculate_pdc(
    ...     df_rx,
    ...     patient_id_col='pat_id',
    ...     observation_window=365,
    ... )
    >>> print(f"Mean PDC: {df_pdc['pdc'].mean():.2%}")
    >>> print(f"Median PDC: {df_pdc['pdc'].median():.2%}")

    Filter to adherent patients (PDC >= 80%):

    >>> adherent = df_pdc[df_pdc['pdc'] >= 0.80]
    >>> print(f"Adherent patients: {len(adherent):,} ({len(adherent)/len(df_pdc)*100:.1f}%)")

    Stratify by adherence level:

    >>> df_pdc['adherence_category'] = pd.cut(
    ...     df_pdc['pdc'],
    ...     bins=[0, 0.50, 0.80, 1.0],
    ...     labels=['Non-adherent (<50%)', 'Partial (50-79%)', 'Adherent (≥80%)'],
    ... )
    >>> print(df_pdc['adherence_category'].value_counts())

    Calculate from index date (not first fill):

    >>> df_with_index = df_rx.merge(df_cohort[['pat_id', 'index_date']], on='pat_id')
    >>> df_pdc = calculate_pdc(
    ...     df_with_index,
    ...     index_date_col='index_date',
    ...     observation_window=365,
    ... )
    """
    df = df_claims.copy()
    df[date_col] = pd.to_datetime(df[date_col])

    # Default days supply
    if days_supply_col is None or days_supply_col not in df.columns:
        df["_days_supply"] = 30
        days_supply_col = "_days_supply"

    # Fill missing days supply with default
    df[days_supply_col] = df[days_supply_col].fillna(30).astype(int)

    # Calculate observation start per patient (vectorized)
    if index_date_col and index_date_col in df.columns:
        df["_obs_start"] = pd.to_datetime(df[index_date_col])
    else:
        df["_obs_start"] = df.groupby(patient_id_col)[date_col].transform("min")

    # Calculate day offset from observation start (vectorized)
    df["_day_offset"] = (df[date_col] - df["_obs_start"]).dt.days

    # Use vectorized approach per patient
    results = []

    for patient_id, group in df.groupby(patient_id_col):
        obs_start = group["_obs_start"].iloc[0]
        obs_end = obs_start + pd.Timedelta(days=observation_window - 1)

        # Vectorized: get arrays of day offsets and supplies
        offsets = group["_day_offset"].values
        supplies = group[days_supply_col].values

        # Build covered days using vectorized numpy operations
        covered = np.zeros(observation_window, dtype=bool)

        # Vectorized fill: for each claim, mark days as covered
        for offset, supply in zip(offsets, supplies):
            start_idx = max(0, int(offset))
            end_idx = min(observation_window, int(offset) + int(supply))
            if start_idx < end_idx:
                covered[start_idx:end_idx] = True

        covered_days = int(covered.sum())
        pdc = covered_days / observation_window

        results.append({
            "patient_id": patient_id,
            "pdc": round(pdc, 4),
            "covered_days": covered_days,
            "observation_days": observation_window,
            "observation_start": obs_start,
            "observation_end": obs_end,
        })

    return pd.DataFrame(results)

classify_payer_type

classify_payer_type(df_enrollment: DataFrame, patient_id_col: str = 'pat_id', payer_col: str = 'pay_type', index_date_col: str | None = None, payer_mapping: dict[str, list[str]] | None = None) -> pd.DataFrame

Classify patients by payer type from enrollment data.

Parameters:

Name Type Description Default
df_enrollment DataFrame

Enrollment data with payer type column.

required
patient_id_col str

Patient ID column name.

'pat_id'
payer_col str

Payer type column name.

'pay_type'
index_date_col str

If provided, uses payer at index date month. Otherwise uses most common payer.

None
payer_mapping dict

Custom mapping of payer categories to codes. Default: IQVIA mapping (C/S=Commercial, R/T/A=Medicare, M=Medicaid).

None

Returns:

Type Description
DataFrame

One row per patient with payer_type classification.

Example

df_payer = classify_payer_type(df_enrollment) df_cohort = df_cohort.merge(df_payer, on='pat_id')

Source code in alx_heor\medications\__init__.py
def classify_payer_type(
    df_enrollment: pd.DataFrame,
    patient_id_col: str = "pat_id",
    payer_col: str = "pay_type",
    index_date_col: str | None = None,
    payer_mapping: dict[str, list[str]] | None = None,
) -> pd.DataFrame:
    """Classify patients by payer type from enrollment data.

    Parameters
    ----------
    df_enrollment : pd.DataFrame
        Enrollment data with payer type column.
    patient_id_col : str
        Patient ID column name.
    payer_col : str
        Payer type column name.
    index_date_col : str, optional
        If provided, uses payer at index date month.
        Otherwise uses most common payer.
    payer_mapping : dict, optional
        Custom mapping of payer categories to codes.
        Default: IQVIA mapping (C/S=Commercial, R/T/A=Medicare, M=Medicaid).

    Returns
    -------
    pd.DataFrame
        One row per patient with payer_type classification.

    Example
    -------
    >>> df_payer = classify_payer_type(df_enrollment)
    >>> df_cohort = df_cohort.merge(df_payer, on='pat_id')
    """
    df = df_enrollment.copy()

    # Default IQVIA payer mapping
    if payer_mapping is None:
        payer_mapping = {
            "COMMERCIAL": ["C", "S"],
            "MEDICARE": ["R", "T", "A"],
            "MEDICAID": ["M"],
        }

    # If index date provided, filter to that month
    if index_date_col and index_date_col in df.columns and "month_id" in df.columns:
        df[index_date_col] = pd.to_datetime(df[index_date_col])
        df["_index_ym"] = df[index_date_col].dt.strftime("%Y%m")
        df = df[df["_index_ym"] == df["month_id"].astype(str)]

    # Get one payer per patient (first occurrence after sorting)
    df = df.sort_values([patient_id_col, payer_col])
    df_payer = df.drop_duplicates(subset=patient_id_col, keep="first").copy()

    # Apply mapping
    conditions = [
        df_payer[payer_col].isin(codes) for codes in payer_mapping.values()
    ]
    choices = list(payer_mapping.keys())
    df_payer["payer_type"] = np.select(conditions, choices, default="OTHER")

    return df_payer[[patient_id_col, payer_col, "payer_type"]]