Skip to content

Statistics API

Outlier Detection

fraud_detection.statistics.outliers

Statistical outlier detection for insurance fraud analysis.

This module provides multiple statistical methods to identify anomalous claim amounts that deviate significantly from expected patterns. Outlier detection is fundamental to fraud detection because fraudulent claims often involve:

  • Inflated charges: Billing significantly more than the typical rate for a procedure.
  • Unusual patterns: Providers consistently charging above or below market rates.
  • Temporal anomalies: Sudden unexplained spikes in billing amounts.

Two primary statistical methods are implemented:

  1. Z-score method: Measures how many standard deviations a value is from the mean. Best for normally distributed data.

  2. IQR method: Uses quartiles to define outlier boundaries. More robust to extreme values and non-normal distributions.

Classes

OutlierDetector

Detect statistical outliers in insurance claims data.

Provides multiple outlier detection methods optimized for fraud analysis, supporting both global analysis and group-based detection (e.g., by procedure code). All methods add boolean flag columns indicating outlier status.

Parameters

spark : SparkSession Active Spark session for distributed processing. config : DetectionConfig Configuration object containing detection thresholds:

- ``outlier_zscore_threshold``: Number of standard deviations for Z-score method.
- ``outlier_iqr_multiplier``: IQR multiplier (typically 1.5 for outliers, 3.0 for extreme).
Examples

detector = OutlierDetector(spark, config) claims = detector.detect_zscore_outliers(claims, "charge_amount", "is_outlier") claims = detector.detect_procedure_outliers(claims) outliers = claims.filter(claims.is_outlier)

Source code in packages/fraud_detection/src/fraud_detection/statistics/outliers.py
class OutlierDetector:
    """
    Detect statistical outliers in insurance claims data.

    Provides multiple outlier detection methods optimized for fraud analysis,
    supporting both global analysis and group-based detection (e.g., by procedure
    code). All methods add boolean flag columns indicating outlier status.

    Parameters
    ----------
    spark : SparkSession
        Active Spark session for distributed processing.
    config : DetectionConfig
        Configuration object containing detection thresholds:

        - ``outlier_zscore_threshold``: Number of standard deviations for Z-score method.
        - ``outlier_iqr_multiplier``: IQR multiplier (typically 1.5 for outliers, 3.0 for extreme).

    Examples
    --------
    >>> detector = OutlierDetector(spark, config)
    >>> claims = detector.detect_zscore_outliers(claims, "charge_amount", "is_outlier")
    >>> claims = detector.detect_procedure_outliers(claims)
    >>> outliers = claims.filter(claims.is_outlier)
    """

    def __init__(self, spark: SparkSession, config: DetectionConfig) -> None:
        self.spark = spark
        self.config = config

    def detect_zscore_outliers(
        self,
        df: DataFrame,
        column: str,
        output_column: str,
        group_by: list[str] | None = None,
    ) -> DataFrame:
        """
        Identify outliers using the Z-score (standard score) method.

        The Z-score measures how many standard deviations a value is from the
        mean. Values with absolute Z-scores exceeding the configured threshold
        are flagged as outliers.

        Z-score is most effective when data is approximately normally distributed.
        For skewed distributions (common with financial data), consider using the
        IQR method instead.

        Parameters
        ----------
        df : DataFrame
            Input DataFrame containing the column to analyze.
        column : str
            Name of the numeric column to check for outliers.
        output_column : str
            Name for the boolean flag column to be added.
        group_by : list[str], optional
            Columns to partition by for group-wise statistics. For example,
            passing ``["procedure_code"]`` calculates separate means and
            standard deviations for each procedure, making the detection
            context-aware.

        Returns
        -------
        DataFrame
            Input DataFrame with added boolean column ``output_column`` where
            True indicates the value is an outlier.

        Notes
        -----
        The formula used is: ``z = (x - μ) / σ``

        A claim is flagged if ``|z| > threshold``. When standard deviation is
        zero (all values identical), Z-score is set to 0 to avoid division errors.
        """
        threshold = self.config.outlier_zscore_threshold

        if group_by:
            window = Window.partitionBy(group_by)
        else:
            window = Window.partitionBy(F.lit(1))

        df = df.withColumn(f"_{column}_mean", F.mean(column).over(window))
        df = df.withColumn(f"_{column}_stddev", F.stddev(column).over(window))

        df = df.withColumn(
            f"_{column}_zscore",
            F.when(
                F.col(f"_{column}_stddev") > 0,
                (F.col(column) - F.col(f"_{column}_mean")) / F.col(f"_{column}_stddev"),
            ).otherwise(F.lit(0.0)),
        )

        df = df.withColumn(
            output_column,
            F.abs(F.col(f"_{column}_zscore")) > threshold,
        )

        df = df.drop(f"_{column}_mean", f"_{column}_stddev", f"_{column}_zscore")

        return df

    def detect_iqr_outliers(
        self,
        df: DataFrame,
        column: str,
        output_column: str,
        group_by: list[str] | None = None,
    ) -> DataFrame:
        """
        Identify outliers using the Interquartile Range (IQR) method.

        IQR-based detection is more robust than Z-score for skewed distributions
        and is less sensitive to extreme outliers when calculating bounds. Values
        below ``Q1 - k*IQR`` or above ``Q3 + k*IQR`` are flagged as outliers.

        Parameters
        ----------
        df : DataFrame
            Input DataFrame containing the column to analyze.
        column : str
            Name of the numeric column to check for outliers.
        output_column : str
            Name for the boolean flag column to be added.
        group_by : list[str], optional
            Columns to partition by for group-wise quartile calculation.

        Returns
        -------
        DataFrame
            Input DataFrame with added boolean column ``output_column`` where
            True indicates the value is an outlier.

        Notes
        -----
        IQR = Q3 - Q1 (interquartile range)

        - Lower bound: Q1 - (multiplier * IQR)
        - Upper bound: Q3 + (multiplier * IQR)

        Common multiplier values:

        - 1.5: Standard outliers (Tukey's method)
        - 3.0: Extreme outliers only
        """
        multiplier = self.config.outlier_iqr_multiplier

        if group_by:
            percentiles_df = df.groupBy(group_by).agg(
                F.percentile_approx(column, 0.25).alias("_q1"),
                F.percentile_approx(column, 0.75).alias("_q3"),
            )
            df = df.join(percentiles_df, group_by, "left")
        else:

            select_first: Row | None = df.select(
                F.percentile_approx(column, 0.25),
                F.percentile_approx(column, 0.75),
            ).first()
            if not select_first:
                raise ValueError("No data to calculate percentiles")
            q1: float = select_first[0]
            q3: float = select_first[1]
            df = df.withColumn("_q1", F.lit(q1))
            df = df.withColumn("_q3", F.lit(q3))

        df = df.withColumn("_iqr", F.col("_q3") - F.col("_q1"))
        df = df.withColumn("_lower_bound", F.col("_q1") - (multiplier * F.col("_iqr")))
        df = df.withColumn("_upper_bound", F.col("_q3") + (multiplier * F.col("_iqr")))

        df = df.withColumn(
            output_column,
            (F.col(column) < F.col("_lower_bound")) | (F.col(column) > F.col("_upper_bound")),
        )

        df = df.drop("_q1", "_q3", "_iqr", "_lower_bound", "_upper_bound")

        return df

    def detect_procedure_outliers(
        self,
        df: DataFrame,
        charge_column: str = "charge_amount",
        procedure_column: str = "procedure_code",
    ) -> DataFrame:
        """
        Detect charge outliers within each procedure code.

        Charges for the same procedure should fall within a predictable range.
        A charge that is an outlier globally might be normal for a complex
        procedure, while a seemingly normal charge might be fraudulent for
        a simple procedure. This method contextualizes outlier detection by
        procedure type.

        Parameters
        ----------
        df : DataFrame
            Input claims DataFrame.
        charge_column : str, default "charge_amount"
            Column containing charge amounts.
        procedure_column : str, default "procedure_code"
            Column containing procedure codes for grouping.

        Returns
        -------
        DataFrame
            Claims with added column:

            - ``procedure_charge_outlier`` : bool - True if charge is an outlier for this procedure.
        """
        df = self.detect_zscore_outliers(
            df,
            column=charge_column,
            output_column="procedure_charge_outlier",
            group_by=[procedure_column],
        )

        return df

    def detect_provider_outliers(
        self,
        df: DataFrame,
        charge_column: str = "charge_amount",
        provider_column: str = "provider_id",
        procedure_column: str = "procedure_code",
    ) -> DataFrame:
        """
        Identify providers with systematically unusual billing patterns.

        Compares each provider's average charges per procedure against market
        averages. Providers consistently billing significantly above or below
        market rates may be engaged in upcoding, unbundling, or other schemes.

        Parameters
        ----------
        df : DataFrame
            Input claims DataFrame.
        charge_column : str, default "charge_amount"
            Column containing charge amounts.
        provider_column : str, default "provider_id"
            Column containing provider identifiers.
        procedure_column : str, default "procedure_code"
            Column containing procedure codes.

        Returns
        -------
        DataFrame
            Claims with added columns:

            - ``provider_avg_charge`` : float - Provider's average charge for this procedure.
            - ``charge_deviation_ratio`` : float - Ratio of provider avg to market avg.
            - ``provider_billing_outlier`` : bool - True if ratio >2.0 or <0.5.
        """
        provider_avg = df.groupBy(provider_column, procedure_column).agg(
            F.avg(charge_column).alias("provider_avg_charge"),
            F.count("*").alias("provider_procedure_count"),
        )

        market_avg = df.groupBy(procedure_column).agg(
            F.avg(charge_column).alias("market_avg_charge"),
            F.stddev(charge_column).alias("market_stddev_charge"),
        )

        combined = provider_avg.join(market_avg, procedure_column)

        combined = combined.withColumn(
            "charge_deviation_ratio",
            F.when(F.col("market_avg_charge") > 0, F.col("provider_avg_charge") / F.col("market_avg_charge")).otherwise(F.lit(1.0)),
        )

        combined = combined.withColumn(
            "provider_billing_outlier",
            (F.col("charge_deviation_ratio") > 2.0) | (F.col("charge_deviation_ratio") < 0.5),
        )

        df = df.join(
            combined.select(
                provider_column,
                procedure_column,
                "provider_avg_charge",
                "charge_deviation_ratio",
                "provider_billing_outlier",
            ),
            [provider_column, procedure_column],
            "left",
        )

        return df

    def detect_temporal_outliers(
        self,
        df: DataFrame,
        charge_column: str = "charge_amount",
        date_column: str = "service_date",
    ) -> DataFrame:
        """
        Detect sudden spikes in provider billing patterns over time.

        Identifies claims where the charge amount significantly exceeds the
        provider's recent historical average. Sudden unexplained billing
        increases may indicate the start of a fraud scheme.

        Uses a 4-week rolling average as the baseline and flags charges
        exceeding 3x this average.

        Parameters
        ----------
        df : DataFrame
            Input claims DataFrame.
        charge_column : str, default "charge_amount"
            Column containing charge amounts.
        date_column : str, default "service_date"
            Column containing service dates.

        Returns
        -------
        DataFrame
            Claims with added column:

            - ``temporal_spike_flag`` : bool - True if charge >3x rolling average.

        Notes
        -----
        The rolling window looks at the previous 4 weeks of data for each
        provider. Claims in the first 4 weeks of a provider's history will
        not have a baseline for comparison and will not be flagged.
        """
        df = df.withColumn("_week", F.weekofyear(date_column))
        df = df.withColumn("_year", F.year(date_column))

        window = Window.partitionBy("provider_id").orderBy("_year", "_week")

        df = df.withColumn(
            "_rolling_avg",
            F.avg(charge_column).over(window.rowsBetween(-4, -1)),
        )

        df = df.withColumn(
            "temporal_spike_flag",
            F.when(
                F.col("_rolling_avg").isNotNull() & (F.col("_rolling_avg") > 0),
                F.col(charge_column) > (3 * F.col("_rolling_avg")),
            ).otherwise(F.lit(False)),
        )

        df = df.drop("_week", "_year", "_rolling_avg")

        return df
Functions
detect_iqr_outliers(df, column, output_column, group_by=None)

Identify outliers using the Interquartile Range (IQR) method.

IQR-based detection is more robust than Z-score for skewed distributions and is less sensitive to extreme outliers when calculating bounds. Values below Q1 - k*IQR or above Q3 + k*IQR are flagged as outliers.

Parameters

df : DataFrame Input DataFrame containing the column to analyze. column : str Name of the numeric column to check for outliers. output_column : str Name for the boolean flag column to be added. group_by : list[str], optional Columns to partition by for group-wise quartile calculation.

Returns

DataFrame Input DataFrame with added boolean column output_column where True indicates the value is an outlier.

Notes

IQR = Q3 - Q1 (interquartile range)

  • Lower bound: Q1 - (multiplier * IQR)
  • Upper bound: Q3 + (multiplier * IQR)

Common multiplier values:

  • 1.5: Standard outliers (Tukey's method)
  • 3.0: Extreme outliers only
Source code in packages/fraud_detection/src/fraud_detection/statistics/outliers.py
def detect_iqr_outliers(
    self,
    df: DataFrame,
    column: str,
    output_column: str,
    group_by: list[str] | None = None,
) -> DataFrame:
    """
    Identify outliers using the Interquartile Range (IQR) method.

    IQR-based detection is more robust than Z-score for skewed distributions
    and is less sensitive to extreme outliers when calculating bounds. Values
    below ``Q1 - k*IQR`` or above ``Q3 + k*IQR`` are flagged as outliers.

    Parameters
    ----------
    df : DataFrame
        Input DataFrame containing the column to analyze.
    column : str
        Name of the numeric column to check for outliers.
    output_column : str
        Name for the boolean flag column to be added.
    group_by : list[str], optional
        Columns to partition by for group-wise quartile calculation.

    Returns
    -------
    DataFrame
        Input DataFrame with added boolean column ``output_column`` where
        True indicates the value is an outlier.

    Notes
    -----
    IQR = Q3 - Q1 (interquartile range)

    - Lower bound: Q1 - (multiplier * IQR)
    - Upper bound: Q3 + (multiplier * IQR)

    Common multiplier values:

    - 1.5: Standard outliers (Tukey's method)
    - 3.0: Extreme outliers only
    """
    multiplier = self.config.outlier_iqr_multiplier

    if group_by:
        percentiles_df = df.groupBy(group_by).agg(
            F.percentile_approx(column, 0.25).alias("_q1"),
            F.percentile_approx(column, 0.75).alias("_q3"),
        )
        df = df.join(percentiles_df, group_by, "left")
    else:

        select_first: Row | None = df.select(
            F.percentile_approx(column, 0.25),
            F.percentile_approx(column, 0.75),
        ).first()
        if not select_first:
            raise ValueError("No data to calculate percentiles")
        q1: float = select_first[0]
        q3: float = select_first[1]
        df = df.withColumn("_q1", F.lit(q1))
        df = df.withColumn("_q3", F.lit(q3))

    df = df.withColumn("_iqr", F.col("_q3") - F.col("_q1"))
    df = df.withColumn("_lower_bound", F.col("_q1") - (multiplier * F.col("_iqr")))
    df = df.withColumn("_upper_bound", F.col("_q3") + (multiplier * F.col("_iqr")))

    df = df.withColumn(
        output_column,
        (F.col(column) < F.col("_lower_bound")) | (F.col(column) > F.col("_upper_bound")),
    )

    df = df.drop("_q1", "_q3", "_iqr", "_lower_bound", "_upper_bound")

    return df
detect_procedure_outliers(df, charge_column='charge_amount', procedure_column='procedure_code')

Detect charge outliers within each procedure code.

Charges for the same procedure should fall within a predictable range. A charge that is an outlier globally might be normal for a complex procedure, while a seemingly normal charge might be fraudulent for a simple procedure. This method contextualizes outlier detection by procedure type.

Parameters

df : DataFrame Input claims DataFrame. charge_column : str, default "charge_amount" Column containing charge amounts. procedure_column : str, default "procedure_code" Column containing procedure codes for grouping.

Returns

DataFrame Claims with added column:

- ``procedure_charge_outlier`` : bool - True if charge is an outlier for this procedure.
Source code in packages/fraud_detection/src/fraud_detection/statistics/outliers.py
def detect_procedure_outliers(
    self,
    df: DataFrame,
    charge_column: str = "charge_amount",
    procedure_column: str = "procedure_code",
) -> DataFrame:
    """
    Detect charge outliers within each procedure code.

    Charges for the same procedure should fall within a predictable range.
    A charge that is an outlier globally might be normal for a complex
    procedure, while a seemingly normal charge might be fraudulent for
    a simple procedure. This method contextualizes outlier detection by
    procedure type.

    Parameters
    ----------
    df : DataFrame
        Input claims DataFrame.
    charge_column : str, default "charge_amount"
        Column containing charge amounts.
    procedure_column : str, default "procedure_code"
        Column containing procedure codes for grouping.

    Returns
    -------
    DataFrame
        Claims with added column:

        - ``procedure_charge_outlier`` : bool - True if charge is an outlier for this procedure.
    """
    df = self.detect_zscore_outliers(
        df,
        column=charge_column,
        output_column="procedure_charge_outlier",
        group_by=[procedure_column],
    )

    return df
detect_provider_outliers(df, charge_column='charge_amount', provider_column='provider_id', procedure_column='procedure_code')

Identify providers with systematically unusual billing patterns.

Compares each provider's average charges per procedure against market averages. Providers consistently billing significantly above or below market rates may be engaged in upcoding, unbundling, or other schemes.

Parameters

df : DataFrame Input claims DataFrame. charge_column : str, default "charge_amount" Column containing charge amounts. provider_column : str, default "provider_id" Column containing provider identifiers. procedure_column : str, default "procedure_code" Column containing procedure codes.

Returns

DataFrame Claims with added columns:

- ``provider_avg_charge`` : float - Provider's average charge for this procedure.
- ``charge_deviation_ratio`` : float - Ratio of provider avg to market avg.
- ``provider_billing_outlier`` : bool - True if ratio >2.0 or <0.5.
Source code in packages/fraud_detection/src/fraud_detection/statistics/outliers.py
def detect_provider_outliers(
    self,
    df: DataFrame,
    charge_column: str = "charge_amount",
    provider_column: str = "provider_id",
    procedure_column: str = "procedure_code",
) -> DataFrame:
    """
    Identify providers with systematically unusual billing patterns.

    Compares each provider's average charges per procedure against market
    averages. Providers consistently billing significantly above or below
    market rates may be engaged in upcoding, unbundling, or other schemes.

    Parameters
    ----------
    df : DataFrame
        Input claims DataFrame.
    charge_column : str, default "charge_amount"
        Column containing charge amounts.
    provider_column : str, default "provider_id"
        Column containing provider identifiers.
    procedure_column : str, default "procedure_code"
        Column containing procedure codes.

    Returns
    -------
    DataFrame
        Claims with added columns:

        - ``provider_avg_charge`` : float - Provider's average charge for this procedure.
        - ``charge_deviation_ratio`` : float - Ratio of provider avg to market avg.
        - ``provider_billing_outlier`` : bool - True if ratio >2.0 or <0.5.
    """
    provider_avg = df.groupBy(provider_column, procedure_column).agg(
        F.avg(charge_column).alias("provider_avg_charge"),
        F.count("*").alias("provider_procedure_count"),
    )

    market_avg = df.groupBy(procedure_column).agg(
        F.avg(charge_column).alias("market_avg_charge"),
        F.stddev(charge_column).alias("market_stddev_charge"),
    )

    combined = provider_avg.join(market_avg, procedure_column)

    combined = combined.withColumn(
        "charge_deviation_ratio",
        F.when(F.col("market_avg_charge") > 0, F.col("provider_avg_charge") / F.col("market_avg_charge")).otherwise(F.lit(1.0)),
    )

    combined = combined.withColumn(
        "provider_billing_outlier",
        (F.col("charge_deviation_ratio") > 2.0) | (F.col("charge_deviation_ratio") < 0.5),
    )

    df = df.join(
        combined.select(
            provider_column,
            procedure_column,
            "provider_avg_charge",
            "charge_deviation_ratio",
            "provider_billing_outlier",
        ),
        [provider_column, procedure_column],
        "left",
    )

    return df
detect_temporal_outliers(df, charge_column='charge_amount', date_column='service_date')

Detect sudden spikes in provider billing patterns over time.

Identifies claims where the charge amount significantly exceeds the provider's recent historical average. Sudden unexplained billing increases may indicate the start of a fraud scheme.

Uses a 4-week rolling average as the baseline and flags charges exceeding 3x this average.

Parameters

df : DataFrame Input claims DataFrame. charge_column : str, default "charge_amount" Column containing charge amounts. date_column : str, default "service_date" Column containing service dates.

Returns

DataFrame Claims with added column:

- ``temporal_spike_flag`` : bool - True if charge >3x rolling average.
Notes

The rolling window looks at the previous 4 weeks of data for each provider. Claims in the first 4 weeks of a provider's history will not have a baseline for comparison and will not be flagged.

Source code in packages/fraud_detection/src/fraud_detection/statistics/outliers.py
def detect_temporal_outliers(
    self,
    df: DataFrame,
    charge_column: str = "charge_amount",
    date_column: str = "service_date",
) -> DataFrame:
    """
    Detect sudden spikes in provider billing patterns over time.

    Identifies claims where the charge amount significantly exceeds the
    provider's recent historical average. Sudden unexplained billing
    increases may indicate the start of a fraud scheme.

    Uses a 4-week rolling average as the baseline and flags charges
    exceeding 3x this average.

    Parameters
    ----------
    df : DataFrame
        Input claims DataFrame.
    charge_column : str, default "charge_amount"
        Column containing charge amounts.
    date_column : str, default "service_date"
        Column containing service dates.

    Returns
    -------
    DataFrame
        Claims with added column:

        - ``temporal_spike_flag`` : bool - True if charge >3x rolling average.

    Notes
    -----
    The rolling window looks at the previous 4 weeks of data for each
    provider. Claims in the first 4 weeks of a provider's history will
    not have a baseline for comparison and will not be flagged.
    """
    df = df.withColumn("_week", F.weekofyear(date_column))
    df = df.withColumn("_year", F.year(date_column))

    window = Window.partitionBy("provider_id").orderBy("_year", "_week")

    df = df.withColumn(
        "_rolling_avg",
        F.avg(charge_column).over(window.rowsBetween(-4, -1)),
    )

    df = df.withColumn(
        "temporal_spike_flag",
        F.when(
            F.col("_rolling_avg").isNotNull() & (F.col("_rolling_avg") > 0),
            F.col(charge_column) > (3 * F.col("_rolling_avg")),
        ).otherwise(F.lit(False)),
    )

    df = df.drop("_week", "_year", "_rolling_avg")

    return df
detect_zscore_outliers(df, column, output_column, group_by=None)

Identify outliers using the Z-score (standard score) method.

The Z-score measures how many standard deviations a value is from the mean. Values with absolute Z-scores exceeding the configured threshold are flagged as outliers.

Z-score is most effective when data is approximately normally distributed. For skewed distributions (common with financial data), consider using the IQR method instead.

Parameters

df : DataFrame Input DataFrame containing the column to analyze. column : str Name of the numeric column to check for outliers. output_column : str Name for the boolean flag column to be added. group_by : list[str], optional Columns to partition by for group-wise statistics. For example, passing ["procedure_code"] calculates separate means and standard deviations for each procedure, making the detection context-aware.

Returns

DataFrame Input DataFrame with added boolean column output_column where True indicates the value is an outlier.

Notes

The formula used is: z = (x - μ) / σ

A claim is flagged if |z| > threshold. When standard deviation is zero (all values identical), Z-score is set to 0 to avoid division errors.

Source code in packages/fraud_detection/src/fraud_detection/statistics/outliers.py
def detect_zscore_outliers(
    self,
    df: DataFrame,
    column: str,
    output_column: str,
    group_by: list[str] | None = None,
) -> DataFrame:
    """
    Identify outliers using the Z-score (standard score) method.

    The Z-score measures how many standard deviations a value is from the
    mean. Values with absolute Z-scores exceeding the configured threshold
    are flagged as outliers.

    Z-score is most effective when data is approximately normally distributed.
    For skewed distributions (common with financial data), consider using the
    IQR method instead.

    Parameters
    ----------
    df : DataFrame
        Input DataFrame containing the column to analyze.
    column : str
        Name of the numeric column to check for outliers.
    output_column : str
        Name for the boolean flag column to be added.
    group_by : list[str], optional
        Columns to partition by for group-wise statistics. For example,
        passing ``["procedure_code"]`` calculates separate means and
        standard deviations for each procedure, making the detection
        context-aware.

    Returns
    -------
    DataFrame
        Input DataFrame with added boolean column ``output_column`` where
        True indicates the value is an outlier.

    Notes
    -----
    The formula used is: ``z = (x - μ) / σ``

    A claim is flagged if ``|z| > threshold``. When standard deviation is
    zero (all values identical), Z-score is set to 0 to avoid division errors.
    """
    threshold = self.config.outlier_zscore_threshold

    if group_by:
        window = Window.partitionBy(group_by)
    else:
        window = Window.partitionBy(F.lit(1))

    df = df.withColumn(f"_{column}_mean", F.mean(column).over(window))
    df = df.withColumn(f"_{column}_stddev", F.stddev(column).over(window))

    df = df.withColumn(
        f"_{column}_zscore",
        F.when(
            F.col(f"_{column}_stddev") > 0,
            (F.col(column) - F.col(f"_{column}_mean")) / F.col(f"_{column}_stddev"),
        ).otherwise(F.lit(0.0)),
    )

    df = df.withColumn(
        output_column,
        F.abs(F.col(f"_{column}_zscore")) > threshold,
    )

    df = df.drop(f"_{column}_mean", f"_{column}_stddev", f"_{column}_zscore")

    return df

Benford's Law Analysis

fraud_detection.statistics.benfords

Benford's Law analysis for insurance fraud detection.

Benford's Law (also known as the First-Digit Law) is a mathematical observation that in many naturally occurring datasets, the leading digit is more likely to be small. Specifically:

  • Digit 1 appears as the leading digit ~30.1% of the time
  • Digit 9 appears as the leading digit ~4.6% of the time

This counterintuitive distribution applies to data spanning multiple orders of magnitude, such as financial transactions, population figures, and physical constants.

Fraud Detection Application

Financial fraud often violates Benford's Law because:

  1. Fabricated numbers: Fraudsters tend to create numbers with more uniform digit distributions, or subconsciously favor certain digits.

  2. Round number bias: Fraudulent amounts often cluster around round numbers (e.g., $100, $500, $1000), distorting the natural distribution.

  3. Threshold avoidance: Fraudsters may manipulate amounts to stay below review thresholds, creating artificial spikes at certain values.

A significant deviation from Benford's expected distribution in claim amounts is a strong indicator that warrants further investigation.

Classes

BenfordsLawAnalyzer

Analyze claim data for conformity to Benford's Law.

Compares the observed first-digit distribution of numeric data against the expected Benford's distribution. Significant deviations may indicate fabricated or manipulated data.

Parameters

spark : SparkSession Active Spark session for distributed processing.

Examples

analyzer = BenfordsLawAnalyzer(spark) claims = analyzer.analyze(claims, "charge_amount", group_by="provider_id") anomalies = claims.filter(claims.benfords_anomaly)

Generate detailed report for visualization

report = analyzer.get_distribution_report(claims, "charge_amount") report.show()

Notes

Benford's Law works best with data that:

  • Spans multiple orders of magnitude
  • Is not artificially constrained (e.g., not limited to a narrow range)
  • Has a sufficient sample size (typically 100+ values)

For small datasets or data with limited range, results may be unreliable.

Source code in packages/fraud_detection/src/fraud_detection/statistics/benfords.py
class BenfordsLawAnalyzer:
    """
    Analyze claim data for conformity to Benford's Law.

    Compares the observed first-digit distribution of numeric data against
    the expected Benford's distribution. Significant deviations may indicate
    fabricated or manipulated data.

    Parameters
    ----------
    spark : SparkSession
        Active Spark session for distributed processing.

    Examples
    --------
    >>> analyzer = BenfordsLawAnalyzer(spark)
    >>> claims = analyzer.analyze(claims, "charge_amount", group_by="provider_id")
    >>> anomalies = claims.filter(claims.benfords_anomaly)

    >>> # Generate detailed report for visualization
    >>> report = analyzer.get_distribution_report(claims, "charge_amount")
    >>> report.show()

    Notes
    -----
    Benford's Law works best with data that:

    - Spans multiple orders of magnitude
    - Is not artificially constrained (e.g., not limited to a narrow range)
    - Has a sufficient sample size (typically 100+ values)

    For small datasets or data with limited range, results may be unreliable.
    """

    BENFORDS_EXPECTED = {
        1: 0.301,
        2: 0.176,
        3: 0.125,
        4: 0.097,
        5: 0.079,
        6: 0.067,
        7: 0.058,
        8: 0.051,
        9: 0.046,
    }
    """Expected first-digit frequencies according to Benford's Law.

    Derived from the formula: P(d) = log10(1 + 1/d) for d = 1, 2, ..., 9
    """

    def __init__(self, spark: SparkSession) -> None:
        self.spark = spark

    def analyze(
        self,
        df: DataFrame,
        column: str,
        group_by: str | None = None,
        threshold: float = 0.15,
    ) -> DataFrame:
        """
        Analyze a numeric column for Benford's Law conformity.

        Extracts the first digit from each value, calculates the observed
        distribution, and compares it to the expected Benford's distribution.
        Values whose first digit appears more frequently than expected (by
        more than the threshold) are flagged.

        Parameters
        ----------
        df : DataFrame
            Input DataFrame containing the column to analyze.
        column : str
            Name of the numeric column to analyze (e.g., "charge_amount").
        group_by : str, optional
            Column to group by for per-group analysis (e.g., "provider_id").
            When provided, calculates separate distributions for each group,
            allowing detection of providers with anomalous digit patterns.
        threshold : float, default 0.15
            Maximum allowed deviation from expected frequency before flagging.
            A value of 0.15 means a digit appearing 45% of the time when
            expected to appear 30% would be flagged (0.45 - 0.30 = 0.15).

        Returns
        -------
        DataFrame
            Input DataFrame with added column:

            - ``benfords_anomaly`` : bool - True if the value's first digit
              is over-represented in the dataset, suggesting possible fabrication.
        """
        df = df.withColumn(
            "_first_digit",
            F.substring(F.abs(F.col(column)).cast("string"), 1, 1).cast("int"),
        )

        df = df.withColumn(
            "_first_digit",
            F.when((F.col("_first_digit") > 0) & (F.col("_first_digit") <= 9), F.col("_first_digit")),
        )

        if group_by:
            df = self._analyze_by_group(df, group_by, threshold)
        else:
            df = self._analyze_global(df, threshold)

        df = df.drop("_first_digit")

        return df

    def _analyze_by_group(self, df: DataFrame, group_by: str, threshold: float) -> DataFrame:
        """
        Perform Benford's Law analysis separately for each group.

        Calculates the digit distribution within each group (e.g., per provider)
        and flags groups whose distribution deviates significantly from expected.
        This is more sensitive than global analysis for detecting individual
        bad actors within an otherwise compliant dataset.

        Parameters
        ----------
        df : DataFrame
            Input DataFrame with ``_first_digit`` column already added.
        group_by : str
            Column to partition the analysis by.
        threshold : float
            Maximum deviation threshold for flagging.

        Returns
        -------
        DataFrame
            DataFrame with ``benfords_anomaly`` column added.
        """
        digit_counts = df.filter(F.col("_first_digit").isNotNull()).groupBy(group_by, "_first_digit").count()

        totals = digit_counts.groupBy(group_by).agg(F.sum("count").alias("_total"))

        digit_counts = digit_counts.join(totals, group_by)

        digit_counts = digit_counts.withColumn("_observed_freq", F.col("count") / F.col("_total"))

        expected_df = self.spark.createDataFrame(  # type: ignore[arg-type]
            list(self.BENFORDS_EXPECTED.items()),
            ["_first_digit", "_expected_freq"],
        )

        digit_counts = digit_counts.join(expected_df, "_first_digit")

        digit_counts = digit_counts.withColumn(
            "_deviation",
            F.abs(F.col("_observed_freq") - F.col("_expected_freq")),
        )

        group_deviations = digit_counts.groupBy(group_by).agg(
            F.max("_deviation").alias("_max_benford_deviation"),
            F.avg("_deviation").alias("_avg_benford_deviation"),
        )

        group_deviations = group_deviations.withColumn(
            "benfords_anomaly_group",
            F.col("_max_benford_deviation") > threshold,
        )

        df = df.join(
            group_deviations.select(group_by, "_max_benford_deviation", "benfords_anomaly_group"),
            group_by,
            "left",
        )

        df = df.withColumn(
            "benfords_anomaly",
            F.coalesce(F.col("benfords_anomaly_group"), F.lit(False)),
        )

        df = df.drop("_max_benford_deviation", "benfords_anomaly_group")

        return df

    def _analyze_global(self, df: DataFrame, threshold: float) -> DataFrame:
        """
        Perform Benford's Law analysis across the entire dataset.

        Calculates a single global digit distribution and identifies which
        digits are over-represented. Individual values with those first digits
        are flagged.

        Parameters
        ----------
        df : DataFrame
            Input DataFrame with ``_first_digit`` column already added.
        threshold : float
            Maximum deviation threshold for flagging a digit as anomalous.

        Returns
        -------
        DataFrame
            DataFrame with ``benfords_anomaly`` column added.
        """
        digit_counts = df.filter(F.col("_first_digit").isNotNull()).groupBy("_first_digit").count()

        total_first = digit_counts.agg(F.sum("count")).first()
        if not total_first:
            raise ValueError("No valid data to analyze")
        total = total_first[0]

        digit_counts = digit_counts.withColumn("_observed_freq", F.col("count") / F.lit(total))

        expected_df = self.spark.createDataFrame(  # type: ignore[arg-type]
            list(self.BENFORDS_EXPECTED.items()),
            ["_first_digit", "_expected_freq"],
        )

        digit_counts = digit_counts.join(expected_df, "_first_digit")

        digit_counts = digit_counts.withColumn(
            "_deviation",
            F.abs(F.col("_observed_freq") - F.col("_expected_freq")),
        )

        over_represented = (
            digit_counts.filter((F.col("_observed_freq") > F.col("_expected_freq")) & (F.col("_deviation") > threshold))
            .select("_first_digit")
            .collect()
        )

        flagged_digits = [row["_first_digit"] for row in over_represented]

        df = df.withColumn(
            "benfords_anomaly",
            F.col("_first_digit").isin(flagged_digits) if flagged_digits else F.lit(False),
        )

        return df

    def get_distribution_report(self, df: DataFrame, column: str, group_by: str | None = None) -> DataFrame:
        """
        Generate a detailed Benford's Law distribution report.

        Creates a summary table comparing observed vs. expected digit frequencies,
        useful for visualization, auditing, and deeper analysis of potential
        anomalies.

        Parameters
        ----------
        df : DataFrame
            Input DataFrame containing the column to analyze.
        column : str
            Name of the numeric column to analyze.
        group_by : str, optional
            Column to group by for per-group reports.

        Returns
        -------
        DataFrame
            Report with columns:

            - ``first_digit`` : int - The leading digit (1-9).
            - ``count`` : int - Number of occurrences.
            - ``total`` : int - Total values in group/dataset.
            - ``observed_frequency`` : float - Actual proportion.
            - ``expected_frequency`` : float - Benford's expected proportion.
            - ``deviation`` : float - Difference (observed - expected).
            - ``deviation_percentage`` : float - Deviation as % of expected.

            If ``group_by`` is provided, includes that column as well.

        Examples
        --------
        >>> report = analyzer.get_distribution_report(claims, "charge_amount")
        >>> report.show()
        +-----------+-----+-----+------------------+------------------+---------+--------------------+
        |first_digit|count|total|observed_frequency|expected_frequency|deviation|deviation_percentage|
        +-----------+-----+-----+------------------+------------------+---------+--------------------+
        |          1| 3021|10000|            0.3021|             0.301|   0.0011|                0.37|
        |          2| 1755|10000|            0.1755|             0.176|  -0.0005|               -0.28|
        ...
        """
        analysis_df = df.withColumn(
            "first_digit",
            F.substring(F.abs(F.col(column)).cast("string"), 1, 1).cast("int"),
        ).filter((F.col("first_digit") > 0) & (F.col("first_digit") <= 9))

        if group_by:
            digit_counts = analysis_df.groupBy(group_by, "first_digit").count()
            totals = digit_counts.groupBy(group_by).agg(F.sum("count").alias("total"))
            digit_counts = digit_counts.join(totals, group_by)
        else:
            digit_counts = analysis_df.groupBy("first_digit").count()
            total_first = digit_counts.agg(F.sum("count")).first()
            if not total_first:
                raise ValueError("No valid data to analyze")
            total = total_first[0]
            digit_counts = digit_counts.withColumn("total", F.lit(total))

        digit_counts = digit_counts.withColumn(
            "observed_frequency",
            F.round(F.col("count") / F.col("total"), 4),
        )

        expected_df = self.spark.createDataFrame(  # type: ignore[arg-type]
            [(d, round(f, 4)) for d, f in self.BENFORDS_EXPECTED.items()],
            ["first_digit", "expected_frequency"],
        )

        report = digit_counts.join(expected_df, "first_digit")

        report = report.withColumn(
            "deviation",
            F.round(F.col("observed_frequency") - F.col("expected_frequency"), 4),
        )

        report = report.withColumn(
            "deviation_percentage",
            F.round(F.col("deviation") / F.col("expected_frequency") * 100, 2),
        )

        return report.orderBy(*([group_by] if group_by else []), "first_digit")
Attributes
BENFORDS_EXPECTED = {1: 0.301, 2: 0.176, 3: 0.125, 4: 0.097, 5: 0.079, 6: 0.067, 7: 0.058, 8: 0.051, 9: 0.046} class-attribute instance-attribute

Expected first-digit frequencies according to Benford's Law.

Derived from the formula: P(d) = log10(1 + 1/d) for d = 1, 2, ..., 9

Functions
analyze(df, column, group_by=None, threshold=0.15)

Analyze a numeric column for Benford's Law conformity.

Extracts the first digit from each value, calculates the observed distribution, and compares it to the expected Benford's distribution. Values whose first digit appears more frequently than expected (by more than the threshold) are flagged.

Parameters

df : DataFrame Input DataFrame containing the column to analyze. column : str Name of the numeric column to analyze (e.g., "charge_amount"). group_by : str, optional Column to group by for per-group analysis (e.g., "provider_id"). When provided, calculates separate distributions for each group, allowing detection of providers with anomalous digit patterns. threshold : float, default 0.15 Maximum allowed deviation from expected frequency before flagging. A value of 0.15 means a digit appearing 45% of the time when expected to appear 30% would be flagged (0.45 - 0.30 = 0.15).

Returns

DataFrame Input DataFrame with added column:

- ``benfords_anomaly`` : bool - True if the value's first digit
  is over-represented in the dataset, suggesting possible fabrication.
Source code in packages/fraud_detection/src/fraud_detection/statistics/benfords.py
def analyze(
    self,
    df: DataFrame,
    column: str,
    group_by: str | None = None,
    threshold: float = 0.15,
) -> DataFrame:
    """
    Analyze a numeric column for Benford's Law conformity.

    Extracts the first digit from each value, calculates the observed
    distribution, and compares it to the expected Benford's distribution.
    Values whose first digit appears more frequently than expected (by
    more than the threshold) are flagged.

    Parameters
    ----------
    df : DataFrame
        Input DataFrame containing the column to analyze.
    column : str
        Name of the numeric column to analyze (e.g., "charge_amount").
    group_by : str, optional
        Column to group by for per-group analysis (e.g., "provider_id").
        When provided, calculates separate distributions for each group,
        allowing detection of providers with anomalous digit patterns.
    threshold : float, default 0.15
        Maximum allowed deviation from expected frequency before flagging.
        A value of 0.15 means a digit appearing 45% of the time when
        expected to appear 30% would be flagged (0.45 - 0.30 = 0.15).

    Returns
    -------
    DataFrame
        Input DataFrame with added column:

        - ``benfords_anomaly`` : bool - True if the value's first digit
          is over-represented in the dataset, suggesting possible fabrication.
    """
    df = df.withColumn(
        "_first_digit",
        F.substring(F.abs(F.col(column)).cast("string"), 1, 1).cast("int"),
    )

    df = df.withColumn(
        "_first_digit",
        F.when((F.col("_first_digit") > 0) & (F.col("_first_digit") <= 9), F.col("_first_digit")),
    )

    if group_by:
        df = self._analyze_by_group(df, group_by, threshold)
    else:
        df = self._analyze_global(df, threshold)

    df = df.drop("_first_digit")

    return df
get_distribution_report(df, column, group_by=None)

Generate a detailed Benford's Law distribution report.

Creates a summary table comparing observed vs. expected digit frequencies, useful for visualization, auditing, and deeper analysis of potential anomalies.

Parameters

df : DataFrame Input DataFrame containing the column to analyze. column : str Name of the numeric column to analyze. group_by : str, optional Column to group by for per-group reports.

Returns

DataFrame Report with columns:

- ``first_digit`` : int - The leading digit (1-9).
- ``count`` : int - Number of occurrences.
- ``total`` : int - Total values in group/dataset.
- ``observed_frequency`` : float - Actual proportion.
- ``expected_frequency`` : float - Benford's expected proportion.
- ``deviation`` : float - Difference (observed - expected).
- ``deviation_percentage`` : float - Deviation as % of expected.

If ``group_by`` is provided, includes that column as well.
Examples

report = analyzer.get_distribution_report(claims, "charge_amount") report.show() +-----------+-----+-----+------------------+------------------+---------+--------------------+ |first_digit|count|total|observed_frequency|expected_frequency|deviation|deviation_percentage| +-----------+-----+-----+------------------+------------------+---------+--------------------+ | 1| 3021|10000| 0.3021| 0.301| 0.0011| 0.37| | 2| 1755|10000| 0.1755| 0.176| -0.0005| -0.28| ...

Source code in packages/fraud_detection/src/fraud_detection/statistics/benfords.py
def get_distribution_report(self, df: DataFrame, column: str, group_by: str | None = None) -> DataFrame:
    """
    Generate a detailed Benford's Law distribution report.

    Creates a summary table comparing observed vs. expected digit frequencies,
    useful for visualization, auditing, and deeper analysis of potential
    anomalies.

    Parameters
    ----------
    df : DataFrame
        Input DataFrame containing the column to analyze.
    column : str
        Name of the numeric column to analyze.
    group_by : str, optional
        Column to group by for per-group reports.

    Returns
    -------
    DataFrame
        Report with columns:

        - ``first_digit`` : int - The leading digit (1-9).
        - ``count`` : int - Number of occurrences.
        - ``total`` : int - Total values in group/dataset.
        - ``observed_frequency`` : float - Actual proportion.
        - ``expected_frequency`` : float - Benford's expected proportion.
        - ``deviation`` : float - Difference (observed - expected).
        - ``deviation_percentage`` : float - Deviation as % of expected.

        If ``group_by`` is provided, includes that column as well.

    Examples
    --------
    >>> report = analyzer.get_distribution_report(claims, "charge_amount")
    >>> report.show()
    +-----------+-----+-----+------------------+------------------+---------+--------------------+
    |first_digit|count|total|observed_frequency|expected_frequency|deviation|deviation_percentage|
    +-----------+-----+-----+------------------+------------------+---------+--------------------+
    |          1| 3021|10000|            0.3021|             0.301|   0.0011|                0.37|
    |          2| 1755|10000|            0.1755|             0.176|  -0.0005|               -0.28|
    ...
    """
    analysis_df = df.withColumn(
        "first_digit",
        F.substring(F.abs(F.col(column)).cast("string"), 1, 1).cast("int"),
    ).filter((F.col("first_digit") > 0) & (F.col("first_digit") <= 9))

    if group_by:
        digit_counts = analysis_df.groupBy(group_by, "first_digit").count()
        totals = digit_counts.groupBy(group_by).agg(F.sum("count").alias("total"))
        digit_counts = digit_counts.join(totals, group_by)
    else:
        digit_counts = analysis_df.groupBy("first_digit").count()
        total_first = digit_counts.agg(F.sum("count")).first()
        if not total_first:
            raise ValueError("No valid data to analyze")
        total = total_first[0]
        digit_counts = digit_counts.withColumn("total", F.lit(total))

    digit_counts = digit_counts.withColumn(
        "observed_frequency",
        F.round(F.col("count") / F.col("total"), 4),
    )

    expected_df = self.spark.createDataFrame(  # type: ignore[arg-type]
        [(d, round(f, 4)) for d, f in self.BENFORDS_EXPECTED.items()],
        ["first_digit", "expected_frequency"],
    )

    report = digit_counts.join(expected_df, "first_digit")

    report = report.withColumn(
        "deviation",
        F.round(F.col("observed_frequency") - F.col("expected_frequency"), 4),
    )

    report = report.withColumn(
        "deviation_percentage",
        F.round(F.col("deviation") / F.col("expected_frequency") * 100, 2),
    )

    return report.orderBy(*([group_by] if group_by else []), "first_digit")