Skip to content

Rules API

Billing Pattern Rules

fraud_detection.rules.billing_patterns

Billing pattern analysis for insurance fraud detection.

This module implements rule-based detection of suspicious billing patterns that commonly indicate fraudulent activity. It analyzes temporal patterns, claim frequencies, and amount characteristics to identify anomalies such as:

  • Providers billing an implausible number of procedures per day
  • Patients receiving an unusual number of services in a short period
  • Suspicious weekend billing for non-emergency services
  • Claims with suspiciously round dollar amounts
  • Procedure unbundling (billing separately for bundled services)

Classes

BillingPatternRules

Detect suspicious billing patterns indicative of fraud.

This class implements a collection of rule-based checks that identify billing anomalies commonly associated with fraudulent claims. Each method adds one or more flag columns to the input DataFrame indicating whether the claim exhibits the suspicious pattern.

Parameters

spark : SparkSession Active Spark session for distributed processing. config : DetectionConfig Configuration object containing detection thresholds:

- ``max_daily_procedures_per_provider``: Maximum procedures a provider
  can reasonably bill in one day.
- ``max_claims_per_patient_per_day``: Maximum claims expected for a
  single patient per day.
Examples

rules = BillingPatternRules(spark, config) claims = rules.check_daily_procedure_limits(claims) claims = rules.check_round_amounts(claims) suspicious = claims.filter(claims.daily_procedure_limit_exceeded)

Source code in packages/fraud_detection/src/fraud_detection/rules/billing_patterns.py
class BillingPatternRules:
    """
    Detect suspicious billing patterns indicative of fraud.

    This class implements a collection of rule-based checks that identify
    billing anomalies commonly associated with fraudulent claims. Each method
    adds one or more flag columns to the input DataFrame indicating whether
    the claim exhibits the suspicious pattern.

    Parameters
    ----------
    spark : SparkSession
        Active Spark session for distributed processing.
    config : DetectionConfig
        Configuration object containing detection thresholds:

        - ``max_daily_procedures_per_provider``: Maximum procedures a provider
          can reasonably bill in one day.
        - ``max_claims_per_patient_per_day``: Maximum claims expected for a
          single patient per day.

    Examples
    --------
    >>> rules = BillingPatternRules(spark, config)
    >>> claims = rules.check_daily_procedure_limits(claims)
    >>> claims = rules.check_round_amounts(claims)
    >>> suspicious = claims.filter(claims.daily_procedure_limit_exceeded)
    """

    def __init__(self, spark: SparkSession, config: DetectionConfig) -> None:
        self.spark = spark
        self.config = config

    def check_daily_procedure_limits(self, claims: DataFrame) -> DataFrame:
        """
        Flag providers exceeding daily procedure limits.

        Identifies providers billing an implausibly high number of procedures
        on a single day, which may indicate phantom billing (billing for services
        not rendered) or upcoding schemes.

        A provider billing 100+ procedures per day is physically impossible for
        most service types and warrants investigation.

        Parameters
        ----------
        claims : DataFrame
            Input claims with ``provider_id`` and ``service_date`` columns.

        Returns
        -------
        DataFrame
            Claims with added columns:

            - ``daily_procedure_count`` : int - Total procedures by this provider on this date.
            - ``daily_procedure_limit_exceeded`` : bool - True if count exceeds configured limit.
        """
        window = Window.partitionBy("provider_id", "service_date")

        claims = claims.withColumn(
            "daily_procedure_count",
            F.count("*").over(window),
        )

        claims = claims.withColumn(
            "daily_procedure_limit_exceeded",
            F.col("daily_procedure_count") > F.lit(self.config.max_daily_procedures_per_provider),
        )

        return claims

    def check_patient_claim_frequency(self, claims: DataFrame) -> DataFrame:
        """
        Flag patients with abnormally high daily claim frequency.

        Identifies patients receiving an unusual number of services on the same
        day, which may indicate claim splitting (dividing one service into multiple
        claims for higher reimbursement) or duplicate billing.

        Parameters
        ----------
        claims : DataFrame
            Input claims with ``patient_id`` and ``service_date`` columns.

        Returns
        -------
        DataFrame
            Claims with added columns:

            - ``patient_daily_claims`` : int - Number of claims for this patient on this date.
            - ``patient_frequency_exceeded`` : bool - True if count exceeds configured limit.
        """
        window = Window.partitionBy("patient_id", "service_date")

        claims = claims.withColumn(
            "patient_daily_claims",
            F.count("*").over(window),
        )

        claims = claims.withColumn(
            "patient_frequency_exceeded",
            F.col("patient_daily_claims") > F.lit(self.config.max_claims_per_patient_per_day),
        )

        return claims

    def check_weekend_billing(self, claims: DataFrame) -> DataFrame:
        """
        Flag suspicious weekend billing patterns.

        Identifies providers with unusually high weekend billing volumes. Most
        medical practices are closed on weekends, so high weekend billing for
        routine (non-emergency) procedures may indicate fraudulent backdating
        of claims or fabricated services.

        A provider is flagged if they have weekend claims AND their overall
        weekend billing ratio exceeds 30% of total claims.

        Parameters
        ----------
        claims : DataFrame
            Input claims with ``provider_id`` and ``service_date`` columns.

        Returns
        -------
        DataFrame
            Claims with added columns:

            - ``day_of_week`` : int - Day of week (1=Sunday, 7=Saturday).
            - ``is_weekend`` : bool - True if service date falls on weekend.
            - ``provider_weekend_ratio`` : float - Proportion of provider's claims on weekends.
            - ``weekend_billing_flag`` : bool - True if weekend claim from high-weekend provider.
        """
        claims = claims.withColumn(
            "day_of_week",
            F.dayofweek("service_date"),
        )

        claims = claims.withColumn(
            "is_weekend",
            F.col("day_of_week").isin([1, 7]),
        )

        window = Window.partitionBy("provider_id")

        claims = claims.withColumn(
            "provider_weekend_ratio",
            F.avg(F.col("is_weekend").cast("double")).over(window),
        )

        claims = claims.withColumn(
            "weekend_billing_flag",
            (F.col("is_weekend")) & (F.col("provider_weekend_ratio") > 0.30),
        )

        return claims

    def check_round_amounts(self, claims: DataFrame) -> DataFrame:
        """
        Flag claims with suspiciously round charge amounts.

        Legitimate medical charges typically result in non-round amounts due to
        fee schedules, adjustments, and itemized billing. A high proportion of
        perfectly round amounts (e.g., $100, $500, $1000) may indicate estimated
        or fabricated charges rather than actual services rendered.

        Providers are flagged if they have round-hundred charges AND more than
        20% of their claims have round amounts.

        Parameters
        ----------
        claims : DataFrame
            Input claims with ``provider_id`` and ``charge_amount`` columns.

        Returns
        -------
        DataFrame
            Claims with added columns:

            - ``is_round_hundred`` : bool - True if amount is divisible by 100.
            - ``is_round_fifty`` : bool - True if amount is divisible by 50.
            - ``provider_round_ratio`` : float - Proportion of provider's claims with round amounts.
            - ``round_amount_flag`` : bool - True if round amount from high-round-ratio provider.
        """
        claims = claims.withColumn(
            "is_round_hundred",
            (F.col("charge_amount") % 100 == 0) & (F.col("charge_amount") > 0),
        )

        claims = claims.withColumn(
            "is_round_fifty",
            (F.col("charge_amount") % 50 == 0) & (F.col("charge_amount") > 0),
        )

        window = Window.partitionBy("provider_id")

        claims = claims.withColumn(
            "provider_round_ratio",
            F.avg(F.col("is_round_hundred").cast("double")).over(window),
        )

        claims = claims.withColumn(
            "round_amount_flag",
            (F.col("is_round_hundred")) & (F.col("provider_round_ratio") > 0.20),
        )

        return claims

    def check_procedure_unbundling(self, claims: DataFrame, bundled_procedures: DataFrame) -> DataFrame:
        """
        Detect procedure unbundling fraud.

        Unbundling occurs when a provider bills separately for procedures that
        should be billed together as a single comprehensive service at a lower
        combined rate. This is a common fraud scheme to increase reimbursement.

        For example, a complete blood panel should be billed as one procedure,
        not as individual tests for each blood component.

        Parameters
        ----------
        claims : DataFrame
            Input claims with ``patient_id``, ``provider_id``, ``service_date``,
            and ``procedure_code`` columns.
        bundled_procedures : DataFrame
            Reference table defining procedure bundles with columns:

            - ``bundled_code`` : str - The correct bundled procedure code.
            - ``unbundled_code_1`` : str - First component code when unbundled.
            - ``unbundled_code_2`` : str - Second component code when unbundled.

        Returns
        -------
        DataFrame
            Claims with added columns:

            - ``procedures_same_day`` : array<str> - All procedure codes for this
              patient/provider/date combination.
            - ``unbundling_flag`` : bool - True if unbundled procedure pair detected.
        """
        window = Window.partitionBy("patient_id", "service_date", "provider_id")

        claims = claims.withColumn(
            "procedures_same_day",
            F.collect_set("procedure_code").over(window),
        )

        claims = claims.join(
            bundled_procedures,
            F.array_contains(F.col("procedures_same_day"), F.col("unbundled_code_1"))
            & F.array_contains(F.col("procedures_same_day"), F.col("unbundled_code_2")),
            "left",
        )

        claims = claims.withColumn(
            "unbundling_flag",
            F.col("bundled_code").isNotNull(),
        )

        return claims
Functions
check_daily_procedure_limits(claims)

Flag providers exceeding daily procedure limits.

Identifies providers billing an implausibly high number of procedures on a single day, which may indicate phantom billing (billing for services not rendered) or upcoding schemes.

A provider billing 100+ procedures per day is physically impossible for most service types and warrants investigation.

Parameters

claims : DataFrame Input claims with provider_id and service_date columns.

Returns

DataFrame Claims with added columns:

- ``daily_procedure_count`` : int - Total procedures by this provider on this date.
- ``daily_procedure_limit_exceeded`` : bool - True if count exceeds configured limit.
Source code in packages/fraud_detection/src/fraud_detection/rules/billing_patterns.py
def check_daily_procedure_limits(self, claims: DataFrame) -> DataFrame:
    """
    Flag providers exceeding daily procedure limits.

    Identifies providers billing an implausibly high number of procedures
    on a single day, which may indicate phantom billing (billing for services
    not rendered) or upcoding schemes.

    A provider billing 100+ procedures per day is physically impossible for
    most service types and warrants investigation.

    Parameters
    ----------
    claims : DataFrame
        Input claims with ``provider_id`` and ``service_date`` columns.

    Returns
    -------
    DataFrame
        Claims with added columns:

        - ``daily_procedure_count`` : int - Total procedures by this provider on this date.
        - ``daily_procedure_limit_exceeded`` : bool - True if count exceeds configured limit.
    """
    window = Window.partitionBy("provider_id", "service_date")

    claims = claims.withColumn(
        "daily_procedure_count",
        F.count("*").over(window),
    )

    claims = claims.withColumn(
        "daily_procedure_limit_exceeded",
        F.col("daily_procedure_count") > F.lit(self.config.max_daily_procedures_per_provider),
    )

    return claims
check_patient_claim_frequency(claims)

Flag patients with abnormally high daily claim frequency.

Identifies patients receiving an unusual number of services on the same day, which may indicate claim splitting (dividing one service into multiple claims for higher reimbursement) or duplicate billing.

Parameters

claims : DataFrame Input claims with patient_id and service_date columns.

Returns

DataFrame Claims with added columns:

- ``patient_daily_claims`` : int - Number of claims for this patient on this date.
- ``patient_frequency_exceeded`` : bool - True if count exceeds configured limit.
Source code in packages/fraud_detection/src/fraud_detection/rules/billing_patterns.py
def check_patient_claim_frequency(self, claims: DataFrame) -> DataFrame:
    """
    Flag patients with abnormally high daily claim frequency.

    Identifies patients receiving an unusual number of services on the same
    day, which may indicate claim splitting (dividing one service into multiple
    claims for higher reimbursement) or duplicate billing.

    Parameters
    ----------
    claims : DataFrame
        Input claims with ``patient_id`` and ``service_date`` columns.

    Returns
    -------
    DataFrame
        Claims with added columns:

        - ``patient_daily_claims`` : int - Number of claims for this patient on this date.
        - ``patient_frequency_exceeded`` : bool - True if count exceeds configured limit.
    """
    window = Window.partitionBy("patient_id", "service_date")

    claims = claims.withColumn(
        "patient_daily_claims",
        F.count("*").over(window),
    )

    claims = claims.withColumn(
        "patient_frequency_exceeded",
        F.col("patient_daily_claims") > F.lit(self.config.max_claims_per_patient_per_day),
    )

    return claims
check_procedure_unbundling(claims, bundled_procedures)

Detect procedure unbundling fraud.

Unbundling occurs when a provider bills separately for procedures that should be billed together as a single comprehensive service at a lower combined rate. This is a common fraud scheme to increase reimbursement.

For example, a complete blood panel should be billed as one procedure, not as individual tests for each blood component.

Parameters

claims : DataFrame Input claims with patient_id, provider_id, service_date, and procedure_code columns. bundled_procedures : DataFrame Reference table defining procedure bundles with columns:

- ``bundled_code`` : str - The correct bundled procedure code.
- ``unbundled_code_1`` : str - First component code when unbundled.
- ``unbundled_code_2`` : str - Second component code when unbundled.
Returns

DataFrame Claims with added columns:

- ``procedures_same_day`` : array<str> - All procedure codes for this
  patient/provider/date combination.
- ``unbundling_flag`` : bool - True if unbundled procedure pair detected.
Source code in packages/fraud_detection/src/fraud_detection/rules/billing_patterns.py
def check_procedure_unbundling(self, claims: DataFrame, bundled_procedures: DataFrame) -> DataFrame:
    """
    Detect procedure unbundling fraud.

    Unbundling occurs when a provider bills separately for procedures that
    should be billed together as a single comprehensive service at a lower
    combined rate. This is a common fraud scheme to increase reimbursement.

    For example, a complete blood panel should be billed as one procedure,
    not as individual tests for each blood component.

    Parameters
    ----------
    claims : DataFrame
        Input claims with ``patient_id``, ``provider_id``, ``service_date``,
        and ``procedure_code`` columns.
    bundled_procedures : DataFrame
        Reference table defining procedure bundles with columns:

        - ``bundled_code`` : str - The correct bundled procedure code.
        - ``unbundled_code_1`` : str - First component code when unbundled.
        - ``unbundled_code_2`` : str - Second component code when unbundled.

    Returns
    -------
    DataFrame
        Claims with added columns:

        - ``procedures_same_day`` : array<str> - All procedure codes for this
          patient/provider/date combination.
        - ``unbundling_flag`` : bool - True if unbundled procedure pair detected.
    """
    window = Window.partitionBy("patient_id", "service_date", "provider_id")

    claims = claims.withColumn(
        "procedures_same_day",
        F.collect_set("procedure_code").over(window),
    )

    claims = claims.join(
        bundled_procedures,
        F.array_contains(F.col("procedures_same_day"), F.col("unbundled_code_1"))
        & F.array_contains(F.col("procedures_same_day"), F.col("unbundled_code_2")),
        "left",
    )

    claims = claims.withColumn(
        "unbundling_flag",
        F.col("bundled_code").isNotNull(),
    )

    return claims
check_round_amounts(claims)

Flag claims with suspiciously round charge amounts.

Legitimate medical charges typically result in non-round amounts due to fee schedules, adjustments, and itemized billing. A high proportion of perfectly round amounts (e.g., $100, $500, $1000) may indicate estimated or fabricated charges rather than actual services rendered.

Providers are flagged if they have round-hundred charges AND more than 20% of their claims have round amounts.

Parameters

claims : DataFrame Input claims with provider_id and charge_amount columns.

Returns

DataFrame Claims with added columns:

- ``is_round_hundred`` : bool - True if amount is divisible by 100.
- ``is_round_fifty`` : bool - True if amount is divisible by 50.
- ``provider_round_ratio`` : float - Proportion of provider's claims with round amounts.
- ``round_amount_flag`` : bool - True if round amount from high-round-ratio provider.
Source code in packages/fraud_detection/src/fraud_detection/rules/billing_patterns.py
def check_round_amounts(self, claims: DataFrame) -> DataFrame:
    """
    Flag claims with suspiciously round charge amounts.

    Legitimate medical charges typically result in non-round amounts due to
    fee schedules, adjustments, and itemized billing. A high proportion of
    perfectly round amounts (e.g., $100, $500, $1000) may indicate estimated
    or fabricated charges rather than actual services rendered.

    Providers are flagged if they have round-hundred charges AND more than
    20% of their claims have round amounts.

    Parameters
    ----------
    claims : DataFrame
        Input claims with ``provider_id`` and ``charge_amount`` columns.

    Returns
    -------
    DataFrame
        Claims with added columns:

        - ``is_round_hundred`` : bool - True if amount is divisible by 100.
        - ``is_round_fifty`` : bool - True if amount is divisible by 50.
        - ``provider_round_ratio`` : float - Proportion of provider's claims with round amounts.
        - ``round_amount_flag`` : bool - True if round amount from high-round-ratio provider.
    """
    claims = claims.withColumn(
        "is_round_hundred",
        (F.col("charge_amount") % 100 == 0) & (F.col("charge_amount") > 0),
    )

    claims = claims.withColumn(
        "is_round_fifty",
        (F.col("charge_amount") % 50 == 0) & (F.col("charge_amount") > 0),
    )

    window = Window.partitionBy("provider_id")

    claims = claims.withColumn(
        "provider_round_ratio",
        F.avg(F.col("is_round_hundred").cast("double")).over(window),
    )

    claims = claims.withColumn(
        "round_amount_flag",
        (F.col("is_round_hundred")) & (F.col("provider_round_ratio") > 0.20),
    )

    return claims
check_weekend_billing(claims)

Flag suspicious weekend billing patterns.

Identifies providers with unusually high weekend billing volumes. Most medical practices are closed on weekends, so high weekend billing for routine (non-emergency) procedures may indicate fraudulent backdating of claims or fabricated services.

A provider is flagged if they have weekend claims AND their overall weekend billing ratio exceeds 30% of total claims.

Parameters

claims : DataFrame Input claims with provider_id and service_date columns.

Returns

DataFrame Claims with added columns:

- ``day_of_week`` : int - Day of week (1=Sunday, 7=Saturday).
- ``is_weekend`` : bool - True if service date falls on weekend.
- ``provider_weekend_ratio`` : float - Proportion of provider's claims on weekends.
- ``weekend_billing_flag`` : bool - True if weekend claim from high-weekend provider.
Source code in packages/fraud_detection/src/fraud_detection/rules/billing_patterns.py
def check_weekend_billing(self, claims: DataFrame) -> DataFrame:
    """
    Flag suspicious weekend billing patterns.

    Identifies providers with unusually high weekend billing volumes. Most
    medical practices are closed on weekends, so high weekend billing for
    routine (non-emergency) procedures may indicate fraudulent backdating
    of claims or fabricated services.

    A provider is flagged if they have weekend claims AND their overall
    weekend billing ratio exceeds 30% of total claims.

    Parameters
    ----------
    claims : DataFrame
        Input claims with ``provider_id`` and ``service_date`` columns.

    Returns
    -------
    DataFrame
        Claims with added columns:

        - ``day_of_week`` : int - Day of week (1=Sunday, 7=Saturday).
        - ``is_weekend`` : bool - True if service date falls on weekend.
        - ``provider_weekend_ratio`` : float - Proportion of provider's claims on weekends.
        - ``weekend_billing_flag`` : bool - True if weekend claim from high-weekend provider.
    """
    claims = claims.withColumn(
        "day_of_week",
        F.dayofweek("service_date"),
    )

    claims = claims.withColumn(
        "is_weekend",
        F.col("day_of_week").isin([1, 7]),
    )

    window = Window.partitionBy("provider_id")

    claims = claims.withColumn(
        "provider_weekend_ratio",
        F.avg(F.col("is_weekend").cast("double")).over(window),
    )

    claims = claims.withColumn(
        "weekend_billing_flag",
        (F.col("is_weekend")) & (F.col("provider_weekend_ratio") > 0.30),
    )

    return claims

Geographic Rules

fraud_detection.rules.geographic

Geographic anomaly detection for insurance fraud analysis.

This module identifies suspicious geographic patterns in claims data that may indicate fraudulent activity. Geographic analysis is particularly effective at detecting:

  • Identity theft: Claims submitted using stolen patient information, where the victim lives far from the billing provider.
  • Phantom billing: Services billed for patients who could not have reasonably traveled to the provider location.
  • Fraud rings: Organized schemes where patients are recruited from specific geographic areas to submit false claims.
  • Impossible travel: Patients appearing at multiple distant locations on the same day, indicating identity misuse.

Classes

GeographicRules

Detect geographic anomalies indicative of insurance fraud.

Analyzes spatial relationships between patients and providers to identify claims that are geographically implausible. Supports both precise coordinate-based distance calculations and state-level heuristics when coordinates are unavailable.

Parameters

spark : SparkSession Active Spark session for distributed processing. config : DetectionConfig Configuration object containing:

- ``max_provider_patient_distance_miles``: Maximum reasonable distance
  between patient and provider before flagging.
Examples

geo_rules = GeographicRules(spark, config) claims = geo_rules.check_state_mismatch(claims) claims = geo_rules.check_provider_patient_distance(claims) out_of_area = claims.filter(claims.distance_exceeded)

Source code in packages/fraud_detection/src/fraud_detection/rules/geographic.py
class GeographicRules:
    """
    Detect geographic anomalies indicative of insurance fraud.

    Analyzes spatial relationships between patients and providers to identify
    claims that are geographically implausible. Supports both precise coordinate-based
    distance calculations and state-level heuristics when coordinates are unavailable.

    Parameters
    ----------
    spark : SparkSession
        Active Spark session for distributed processing.
    config : DetectionConfig
        Configuration object containing:

        - ``max_provider_patient_distance_miles``: Maximum reasonable distance
          between patient and provider before flagging.

    Examples
    --------
    >>> geo_rules = GeographicRules(spark, config)
    >>> claims = geo_rules.check_state_mismatch(claims)
    >>> claims = geo_rules.check_provider_patient_distance(claims)
    >>> out_of_area = claims.filter(claims.distance_exceeded)
    """

    def __init__(self, spark: SparkSession, config: DetectionConfig) -> None:
        self.spark = spark
        self.config = config

    def check_provider_patient_distance(self, claims: DataFrame) -> DataFrame:
        """
        Flag claims where patient and provider locations are unusually distant.

        Patients typically receive care from providers within a reasonable travel
        distance. Claims involving distant providers may indicate identity theft
        (someone using a stolen identity far from the victim's home) or phantom
        billing (billing for services never rendered).

        If latitude/longitude coordinates are available, calculates precise
        haversine (great-circle) distance. Otherwise, falls back to state-level
        mismatch detection.

        Parameters
        ----------
        claims : DataFrame
            Input claims. For distance calculation, requires columns:
            ``patient_lat``, ``patient_lon``, ``provider_lat``, ``provider_lon``.

        Returns
        -------
        DataFrame
            Claims with added columns:

            - ``distance_miles`` : float - Calculated distance (if coordinates available).
            - ``distance_exceeded`` : bool - True if distance exceeds configured maximum.
        """
        has_coordinates = all(col in claims.columns for col in ["patient_lat", "patient_lon", "provider_lat", "provider_lon"])

        if has_coordinates:
            claims = claims.withColumn(
                "distance_miles",
                self._haversine_distance(
                    F.col("patient_lat"),
                    F.col("patient_lon"),
                    F.col("provider_lat"),
                    F.col("provider_lon"),
                ),
            )

            claims = claims.withColumn(
                "distance_exceeded",
                F.col("distance_miles") > F.lit(self.config.max_provider_patient_distance_miles),
            )
        else:
            claims = claims.withColumn("distance_exceeded", F.lit(False))

        return claims

    def _haversine_distance(self, lat1: F.Column, lon1: F.Column, lat2: F.Column, lon2: F.Column) -> F.Column:
        """
        Calculate great-circle distance between two points using the haversine formula.

        The haversine formula determines the shortest distance over the earth's
        surface between two points specified by latitude and longitude, accounting
        for the earth's spherical shape.

        Parameters
        ----------
        lat1 : Column
            Latitude of first point in degrees.
        lon1 : Column
            Longitude of first point in degrees.
        lat2 : Column
            Latitude of second point in degrees.
        lon2 : Column
            Longitude of second point in degrees.

        Returns
        -------
        Column
            Distance in miles between the two points.

        Notes
        -----
        Uses Earth's mean radius of 3,959 miles. Accuracy is typically within
        0.5% for most practical distances.
        """
        earth_radius_miles = 3959.0

        lat1_rad = F.radians(lat1)
        lat2_rad = F.radians(lat2)
        delta_lat = F.radians(lat2 - lat1)
        delta_lon = F.radians(lon2 - lon1)

        a = F.sin(delta_lat / 2) ** 2 + F.cos(lat1_rad) * F.cos(lat2_rad) * F.sin(delta_lon / 2) ** 2
        c = 2 * F.asin(F.sqrt(a))

        return earth_radius_miles * c

    def check_state_mismatch(self, claims: DataFrame) -> DataFrame:
        """
        Flag claims where patient and provider are in different states.

        While cross-state healthcare is legitimate in border areas or for
        specialized care, out-of-state claims are statistically more likely
        to be fraudulent and warrant additional scrutiny, especially when
        combined with other risk indicators.

        Parameters
        ----------
        claims : DataFrame
            Input claims with ``patient_state`` and ``provider_state`` columns.

        Returns
        -------
        DataFrame
            Claims with added column:

            - ``state_mismatch`` : bool - True if patient and provider states differ.
        """
        if "patient_state" not in claims.columns or "provider_state" not in claims.columns:
            claims = claims.withColumn("state_mismatch", F.lit(False))
            return claims

        claims = claims.withColumn(
            "state_mismatch",
            (F.col("patient_state") != F.col("provider_state")) & F.col("patient_state").isNotNull() & F.col("provider_state").isNotNull(),
        )

        return claims

    def check_geographic_clustering(self, claims: DataFrame) -> DataFrame:
        """
        Detect suspicious geographic concentration of a provider's patients.

        Legitimate providers typically serve patients from a natural geographic
        distribution reflecting their location and specialty. A provider with
        a high volume of patients concentrated in a single distant state may
        indicate an organized fraud ring recruiting patients from a specific area.

        Flags providers with 100+ patients where all patients come from a single
        state different from the provider's state.

        Parameters
        ----------
        claims : DataFrame
            Input claims with ``provider_id``, ``patient_id``, ``patient_state``,
            and ``provider_state`` columns.

        Returns
        -------
        DataFrame
            Claims with added columns:

            - ``unique_patient_states`` : int - Number of distinct states patients come from.
            - ``total_patients`` : int - Total patient count for this provider.
            - ``geographic_clustering_flag`` : bool - True if suspicious clustering detected.
        """
        window = Window.partitionBy("provider_id")

        claims = claims.withColumn(
            "unique_patient_states",
            F.size(F.collect_set("patient_state").over(window)),
        )

        claims = claims.withColumn(
            "total_patients",
            F.count("patient_id").over(window),
        )

        claims = claims.withColumn(
            "geographic_clustering_flag",
            (F.col("total_patients") > 100) & (F.col("unique_patient_states") == 1) & (F.col("patient_state") != F.col("provider_state")),
        )

        return claims

    def check_impossible_travel(self, claims: DataFrame) -> DataFrame:
        """
        Detect physically impossible travel patterns for patients.

        Identifies patients who have claims from multiple distant provider
        locations on the same day that would require impossible travel speeds.
        This is a strong indicator of identity theft (multiple people using
        the same patient identity) or systematic billing fraud.

        Parameters
        ----------
        claims : DataFrame
            Input claims with ``patient_id``, ``service_date``, and optionally
            ``provider_lat``, ``provider_lon`` for precise detection.

        Returns
        -------
        DataFrame
            Claims with added columns:

            - ``provider_locations`` : array<struct> - All provider coordinates visited same day.
            - ``num_providers_same_day`` : int - Count of distinct provider locations.
            - ``impossible_travel_flag`` : bool - True if patient visited >3 providers same day.

        Notes
        -----
        Current implementation uses a simple threshold of >3 provider locations
        per day as a heuristic. A more sophisticated approach would calculate
        pairwise distances and required travel speeds, but this requires a UDF
        for efficient computation.
        """
        required_cols = ["patient_lat", "patient_lon", "provider_lat", "provider_lon"]
        if not all(col in claims.columns for col in required_cols):
            claims = claims.withColumn("impossible_travel_flag", F.lit(False))
            return claims

        window = Window.partitionBy("patient_id", "service_date")

        claims = claims.withColumn(
            "provider_locations",
            F.collect_list(F.struct(F.col("provider_lat").alias("lat"), F.col("provider_lon").alias("lon"))).over(window),
        )

        claims = claims.withColumn(
            "num_providers_same_day",
            F.size("provider_locations"),
        )

        claims = claims.withColumn(
            "impossible_travel_flag",
            F.col("num_providers_same_day") > 3,
        )

        return claims
Functions
check_geographic_clustering(claims)

Detect suspicious geographic concentration of a provider's patients.

Legitimate providers typically serve patients from a natural geographic distribution reflecting their location and specialty. A provider with a high volume of patients concentrated in a single distant state may indicate an organized fraud ring recruiting patients from a specific area.

Flags providers with 100+ patients where all patients come from a single state different from the provider's state.

Parameters

claims : DataFrame Input claims with provider_id, patient_id, patient_state, and provider_state columns.

Returns

DataFrame Claims with added columns:

- ``unique_patient_states`` : int - Number of distinct states patients come from.
- ``total_patients`` : int - Total patient count for this provider.
- ``geographic_clustering_flag`` : bool - True if suspicious clustering detected.
Source code in packages/fraud_detection/src/fraud_detection/rules/geographic.py
def check_geographic_clustering(self, claims: DataFrame) -> DataFrame:
    """
    Detect suspicious geographic concentration of a provider's patients.

    Legitimate providers typically serve patients from a natural geographic
    distribution reflecting their location and specialty. A provider with
    a high volume of patients concentrated in a single distant state may
    indicate an organized fraud ring recruiting patients from a specific area.

    Flags providers with 100+ patients where all patients come from a single
    state different from the provider's state.

    Parameters
    ----------
    claims : DataFrame
        Input claims with ``provider_id``, ``patient_id``, ``patient_state``,
        and ``provider_state`` columns.

    Returns
    -------
    DataFrame
        Claims with added columns:

        - ``unique_patient_states`` : int - Number of distinct states patients come from.
        - ``total_patients`` : int - Total patient count for this provider.
        - ``geographic_clustering_flag`` : bool - True if suspicious clustering detected.
    """
    window = Window.partitionBy("provider_id")

    claims = claims.withColumn(
        "unique_patient_states",
        F.size(F.collect_set("patient_state").over(window)),
    )

    claims = claims.withColumn(
        "total_patients",
        F.count("patient_id").over(window),
    )

    claims = claims.withColumn(
        "geographic_clustering_flag",
        (F.col("total_patients") > 100) & (F.col("unique_patient_states") == 1) & (F.col("patient_state") != F.col("provider_state")),
    )

    return claims
check_impossible_travel(claims)

Detect physically impossible travel patterns for patients.

Identifies patients who have claims from multiple distant provider locations on the same day that would require impossible travel speeds. This is a strong indicator of identity theft (multiple people using the same patient identity) or systematic billing fraud.

Parameters

claims : DataFrame Input claims with patient_id, service_date, and optionally provider_lat, provider_lon for precise detection.

Returns

DataFrame Claims with added columns:

- ``provider_locations`` : array<struct> - All provider coordinates visited same day.
- ``num_providers_same_day`` : int - Count of distinct provider locations.
- ``impossible_travel_flag`` : bool - True if patient visited >3 providers same day.
Notes

Current implementation uses a simple threshold of >3 provider locations per day as a heuristic. A more sophisticated approach would calculate pairwise distances and required travel speeds, but this requires a UDF for efficient computation.

Source code in packages/fraud_detection/src/fraud_detection/rules/geographic.py
def check_impossible_travel(self, claims: DataFrame) -> DataFrame:
    """
    Detect physically impossible travel patterns for patients.

    Identifies patients who have claims from multiple distant provider
    locations on the same day that would require impossible travel speeds.
    This is a strong indicator of identity theft (multiple people using
    the same patient identity) or systematic billing fraud.

    Parameters
    ----------
    claims : DataFrame
        Input claims with ``patient_id``, ``service_date``, and optionally
        ``provider_lat``, ``provider_lon`` for precise detection.

    Returns
    -------
    DataFrame
        Claims with added columns:

        - ``provider_locations`` : array<struct> - All provider coordinates visited same day.
        - ``num_providers_same_day`` : int - Count of distinct provider locations.
        - ``impossible_travel_flag`` : bool - True if patient visited >3 providers same day.

    Notes
    -----
    Current implementation uses a simple threshold of >3 provider locations
    per day as a heuristic. A more sophisticated approach would calculate
    pairwise distances and required travel speeds, but this requires a UDF
    for efficient computation.
    """
    required_cols = ["patient_lat", "patient_lon", "provider_lat", "provider_lon"]
    if not all(col in claims.columns for col in required_cols):
        claims = claims.withColumn("impossible_travel_flag", F.lit(False))
        return claims

    window = Window.partitionBy("patient_id", "service_date")

    claims = claims.withColumn(
        "provider_locations",
        F.collect_list(F.struct(F.col("provider_lat").alias("lat"), F.col("provider_lon").alias("lon"))).over(window),
    )

    claims = claims.withColumn(
        "num_providers_same_day",
        F.size("provider_locations"),
    )

    claims = claims.withColumn(
        "impossible_travel_flag",
        F.col("num_providers_same_day") > 3,
    )

    return claims
check_provider_patient_distance(claims)

Flag claims where patient and provider locations are unusually distant.

Patients typically receive care from providers within a reasonable travel distance. Claims involving distant providers may indicate identity theft (someone using a stolen identity far from the victim's home) or phantom billing (billing for services never rendered).

If latitude/longitude coordinates are available, calculates precise haversine (great-circle) distance. Otherwise, falls back to state-level mismatch detection.

Parameters

claims : DataFrame Input claims. For distance calculation, requires columns: patient_lat, patient_lon, provider_lat, provider_lon.

Returns

DataFrame Claims with added columns:

- ``distance_miles`` : float - Calculated distance (if coordinates available).
- ``distance_exceeded`` : bool - True if distance exceeds configured maximum.
Source code in packages/fraud_detection/src/fraud_detection/rules/geographic.py
def check_provider_patient_distance(self, claims: DataFrame) -> DataFrame:
    """
    Flag claims where patient and provider locations are unusually distant.

    Patients typically receive care from providers within a reasonable travel
    distance. Claims involving distant providers may indicate identity theft
    (someone using a stolen identity far from the victim's home) or phantom
    billing (billing for services never rendered).

    If latitude/longitude coordinates are available, calculates precise
    haversine (great-circle) distance. Otherwise, falls back to state-level
    mismatch detection.

    Parameters
    ----------
    claims : DataFrame
        Input claims. For distance calculation, requires columns:
        ``patient_lat``, ``patient_lon``, ``provider_lat``, ``provider_lon``.

    Returns
    -------
    DataFrame
        Claims with added columns:

        - ``distance_miles`` : float - Calculated distance (if coordinates available).
        - ``distance_exceeded`` : bool - True if distance exceeds configured maximum.
    """
    has_coordinates = all(col in claims.columns for col in ["patient_lat", "patient_lon", "provider_lat", "provider_lon"])

    if has_coordinates:
        claims = claims.withColumn(
            "distance_miles",
            self._haversine_distance(
                F.col("patient_lat"),
                F.col("patient_lon"),
                F.col("provider_lat"),
                F.col("provider_lon"),
            ),
        )

        claims = claims.withColumn(
            "distance_exceeded",
            F.col("distance_miles") > F.lit(self.config.max_provider_patient_distance_miles),
        )
    else:
        claims = claims.withColumn("distance_exceeded", F.lit(False))

    return claims
check_state_mismatch(claims)

Flag claims where patient and provider are in different states.

While cross-state healthcare is legitimate in border areas or for specialized care, out-of-state claims are statistically more likely to be fraudulent and warrant additional scrutiny, especially when combined with other risk indicators.

Parameters

claims : DataFrame Input claims with patient_state and provider_state columns.

Returns

DataFrame Claims with added column:

- ``state_mismatch`` : bool - True if patient and provider states differ.
Source code in packages/fraud_detection/src/fraud_detection/rules/geographic.py
def check_state_mismatch(self, claims: DataFrame) -> DataFrame:
    """
    Flag claims where patient and provider are in different states.

    While cross-state healthcare is legitimate in border areas or for
    specialized care, out-of-state claims are statistically more likely
    to be fraudulent and warrant additional scrutiny, especially when
    combined with other risk indicators.

    Parameters
    ----------
    claims : DataFrame
        Input claims with ``patient_state`` and ``provider_state`` columns.

    Returns
    -------
    DataFrame
        Claims with added column:

        - ``state_mismatch`` : bool - True if patient and provider states differ.
    """
    if "patient_state" not in claims.columns or "provider_state" not in claims.columns:
        claims = claims.withColumn("state_mismatch", F.lit(False))
        return claims

    claims = claims.withColumn(
        "state_mismatch",
        (F.col("patient_state") != F.col("provider_state")) & F.col("patient_state").isNotNull() & F.col("provider_state").isNotNull(),
    )

    return claims

Duplicate Detection

fraud_detection.rules.duplicates

Duplicate claim detection for insurance fraud analysis.

This module identifies both exact and near-duplicate claims, which are common indicators of fraudulent billing practices such as double-billing or claim resubmission with minor modifications to avoid detection.

Classes

DuplicateDetector

Detect duplicate and near-duplicate insurance claims.

This detector identifies two types of duplicates:

  1. Exact duplicates: Claims with identical key fields (patient, provider, procedure, date, and amount). These often indicate accidental or intentional double-billing.

  2. Near-duplicates: Claims that are highly similar but not identical, potentially indicating resubmission with minor changes to evade detection. Uses configurable similarity thresholds and time windows.

Parameters

spark : SparkSession Active Spark session for distributed processing. config : DetectionConfig Configuration object containing detection thresholds: - duplicate_similarity_threshold: Minimum similarity score (0-1) for near-duplicate detection. - duplicate_time_window_days: Maximum days between service dates for claims to be considered potential near-duplicates.

Examples

detector = DuplicateDetector(spark, config) flagged_claims = detector.detect(claims_df) duplicates = flagged_claims.filter(flagged_claims.is_duplicate == True)

Source code in packages/fraud_detection/src/fraud_detection/rules/duplicates.py
class DuplicateDetector:
    """
    Detect duplicate and near-duplicate insurance claims.

    This detector identifies two types of duplicates:

    1. **Exact duplicates**: Claims with identical key fields (patient, provider,
       procedure, date, and amount). These often indicate accidental or intentional
       double-billing.

    2. **Near-duplicates**: Claims that are highly similar but not identical,
       potentially indicating resubmission with minor changes to evade detection.
       Uses configurable similarity thresholds and time windows.

    Parameters
    ----------
    spark : SparkSession
        Active Spark session for distributed processing.
    config : DetectionConfig
        Configuration object containing detection thresholds:
        - ``duplicate_similarity_threshold``: Minimum similarity score (0-1) for
          near-duplicate detection.
        - ``duplicate_time_window_days``: Maximum days between service dates for
          claims to be considered potential near-duplicates.

    Examples
    --------
    >>> detector = DuplicateDetector(spark, config)
    >>> flagged_claims = detector.detect(claims_df)
    >>> duplicates = flagged_claims.filter(flagged_claims.is_duplicate == True)
    """

    def __init__(self, spark: SparkSession, config: DetectionConfig) -> None:
        self.spark = spark
        self.config = config

    def detect(self, claims: DataFrame) -> DataFrame:
        """
        Run full duplicate detection pipeline on claims data.

        Sequentially applies exact and near-duplicate detection, then combines
        results into unified duplicate flags. A claim is marked as duplicate if
        it matches either criterion.

        Parameters
        ----------
        claims : DataFrame
            Input claims with required columns: ``claim_id``, ``patient_id``,
            ``provider_id``, ``procedure_code``, ``service_date``, ``charge_amount``.

        Returns
        -------
        DataFrame
            Original claims with additional columns:

            - ``is_duplicate`` : bool - True if claim is any type of duplicate.
            - ``duplicate_of`` : str - claim_id of the original claim this duplicates.
            - ``is_exact_duplicate`` : bool - True if exact field match.
            - ``is_near_duplicate`` : bool - True if similarity-based match.
        """
        claims = self._detect_exact_duplicates(claims)
        claims = self._detect_near_duplicates(claims)

        claims = claims.withColumn(
            "is_duplicate",
            F.col("is_exact_duplicate") | F.col("is_near_duplicate"),
        )

        claims = claims.withColumn(
            "duplicate_of",
            F.coalesce(
                F.col("exact_duplicate_of"),
                F.col("near_duplicate_of"),
            ),
        )

        return claims

    def _detect_exact_duplicates(self, claims: DataFrame) -> DataFrame:
        """
        Identify claims with identical key fields.

        Creates a composite key from patient, provider, procedure, date, and amount,
        then uses window functions to find and rank duplicates within each key group.
        The first claim (by claim_id order) is considered the original; subsequent
        claims are flagged as duplicates.

        Parameters
        ----------
        claims : DataFrame
            Input claims DataFrame.

        Returns
        -------
        DataFrame
            Claims with added columns:

            - ``is_exact_duplicate`` : bool - True for duplicates (not the first occurrence).
            - ``exact_duplicate_of`` : str - claim_id of the first claim in the duplicate group.
            - ``duplicate_key`` : str - Composite key used for matching.
            - ``duplicate_rank`` : int - Position within duplicate group (1 = original).
        """
        key_fields = [
            "patient_id",
            "provider_id",
            "procedure_code",
            "service_date",
            "charge_amount",
        ]

        claims = claims.withColumn(
            "duplicate_key",
            F.concat_ws("|", *[F.col(c).cast("string") for c in key_fields]),
        )

        window = Window.partitionBy("duplicate_key").orderBy("claim_id")

        claims = claims.withColumn(
            "duplicate_rank",
            F.row_number().over(window),
        )

        claims = claims.withColumn(
            "first_claim_in_group",
            F.first("claim_id").over(window),
        )

        claims = claims.withColumn(
            "is_exact_duplicate",
            F.col("duplicate_rank") > 1,
        )

        claims = claims.withColumn(
            "exact_duplicate_of",
            F.when(
                F.col("is_exact_duplicate"),
                F.col("first_claim_in_group"),
            ).otherwise(F.lit(None)),
        )

        return claims

    def _detect_near_duplicates(self, claims: DataFrame) -> DataFrame:
        """
        Identify highly similar claims that may indicate modified resubmissions.

        Performs a self-join to compare claims from the same patient-provider pair
        within a configurable time window. Calculates a weighted similarity score
        based on procedure code match (60%) and charge amount similarity (40%).
        Claims exceeding the similarity threshold are flagged as near-duplicates.

        This catches fraud patterns where claims are resubmitted with minor changes
        (e.g., slightly different amounts or dates) to avoid exact-match detection.

        Parameters
        ----------
        claims : DataFrame
            Input claims, must already have ``is_exact_duplicate`` column from
            prior exact duplicate detection (exact duplicates are excluded from
            near-duplicate analysis to avoid double-flagging).

        Returns
        -------
        DataFrame
            Claims with added columns:

            - ``is_near_duplicate`` : bool - True if similarity score exceeds threshold.
            - ``near_duplicate_of`` : str - claim_id of the most similar earlier claim.

        Notes
        -----
        Similarity calculation:

        - **Procedure match** (weight 0.6): Binary - 1.0 if codes match, 0.0 otherwise.
        - **Charge similarity** (weight 0.4): ``1 - |amount1 - amount2| / max(amount1, amount2)``

        Only the highest-similarity match is retained per claim to avoid cascading
        duplicate chains.
        """
        time_window_days = self.config.duplicate_time_window_days

        claims_left = claims.alias("left")
        claims_right = claims.alias("right")

        join_condition = (
            (F.col("left.patient_id") == F.col("right.patient_id"))
            & (F.col("left.provider_id") == F.col("right.provider_id"))
            & (F.col("left.claim_id") != F.col("right.claim_id"))
            & (F.col("left.claim_id") > F.col("right.claim_id"))
            & (F.abs(F.datediff(F.col("left.service_date"), F.col("right.service_date"))) <= time_window_days)
            & (~F.col("left.is_exact_duplicate"))
        )

        potential_duplicates = claims_left.join(claims_right, join_condition, "left")

        potential_duplicates = potential_duplicates.withColumn(
            "procedure_match",
            (F.col("left.procedure_code") == F.col("right.procedure_code")).cast("double"),
        )

        potential_duplicates = potential_duplicates.withColumn(
            "charge_similarity",
            F.lit(1.0)
            - F.abs(F.col("left.charge_amount") - F.col("right.charge_amount"))
            / F.greatest(F.col("left.charge_amount"), F.col("right.charge_amount")),
        )

        potential_duplicates = potential_duplicates.withColumn(
            "similarity_score",
            (F.col("procedure_match") * 0.6 + F.col("charge_similarity") * 0.4),
        )

        threshold = self.config.duplicate_similarity_threshold

        near_duplicates = potential_duplicates.filter(F.col("similarity_score") >= threshold).select(
            F.col("left.claim_id").alias("claim_id"),
            F.col("right.claim_id").alias("near_duplicate_of"),
            F.col("similarity_score"),
        )

        window = Window.partitionBy("claim_id").orderBy(F.desc("similarity_score"))

        near_duplicates = near_duplicates.withColumn("rank", F.row_number().over(window)).filter(F.col("rank") == 1).drop("rank", "similarity_score")

        claims = claims.join(near_duplicates, "claim_id", "left")

        claims = claims.withColumn(
            "is_near_duplicate",
            F.col("near_duplicate_of").isNotNull(),
        )

        return claims
Functions
detect(claims)

Run full duplicate detection pipeline on claims data.

Sequentially applies exact and near-duplicate detection, then combines results into unified duplicate flags. A claim is marked as duplicate if it matches either criterion.

Parameters

claims : DataFrame Input claims with required columns: claim_id, patient_id, provider_id, procedure_code, service_date, charge_amount.

Returns

DataFrame Original claims with additional columns:

- ``is_duplicate`` : bool - True if claim is any type of duplicate.
- ``duplicate_of`` : str - claim_id of the original claim this duplicates.
- ``is_exact_duplicate`` : bool - True if exact field match.
- ``is_near_duplicate`` : bool - True if similarity-based match.
Source code in packages/fraud_detection/src/fraud_detection/rules/duplicates.py
def detect(self, claims: DataFrame) -> DataFrame:
    """
    Run full duplicate detection pipeline on claims data.

    Sequentially applies exact and near-duplicate detection, then combines
    results into unified duplicate flags. A claim is marked as duplicate if
    it matches either criterion.

    Parameters
    ----------
    claims : DataFrame
        Input claims with required columns: ``claim_id``, ``patient_id``,
        ``provider_id``, ``procedure_code``, ``service_date``, ``charge_amount``.

    Returns
    -------
    DataFrame
        Original claims with additional columns:

        - ``is_duplicate`` : bool - True if claim is any type of duplicate.
        - ``duplicate_of`` : str - claim_id of the original claim this duplicates.
        - ``is_exact_duplicate`` : bool - True if exact field match.
        - ``is_near_duplicate`` : bool - True if similarity-based match.
    """
    claims = self._detect_exact_duplicates(claims)
    claims = self._detect_near_duplicates(claims)

    claims = claims.withColumn(
        "is_duplicate",
        F.col("is_exact_duplicate") | F.col("is_near_duplicate"),
    )

    claims = claims.withColumn(
        "duplicate_of",
        F.coalesce(
            F.col("exact_duplicate_of"),
            F.col("near_duplicate_of"),
        ),
    )

    return claims