100 Days of Polars - Day 006: Data Integrity - Handling Nulls and Schema Validation

Learn to maintain data integrity in Polars through null handling techniques and schema validation strategies
polars
data-engineering
100-days-of-polars
Author

NomadC

Published

February 2, 2026

Introduction

Data integrity is the foundation of reliable data pipelines. Without proper validation and null handling, your analyses can produce misleading results or fail entirely. In Polars, you have powerful tools to ensure your data meets quality standards before processing.

Today we’ll cover: - Null value handling: Detection, filling, and removal strategies - Schema validation: Ensuring data types match expectations - Data quality checks: Comprehensive validation patterns

Understanding Nulls in Polars

Polars uses null to represent missing values, distinct from NaN (not-a-number for floats) and None.

import polars as pl
import numpy as np

# Create a DataFrame with various null-like values
df = pl.DataFrame({
    "id": [1, 2, 3, 4, 5],
    "name": ["Alice", "Bob", None, "David", "Eve"],
    "age": [25, None, 30, 35, None],
    "score": [85.5, 90.0, np.nan, 78.0, 92.5]
})

print(df)
shape: (5, 4)
┌─────┬────────┬──────┬───────┐
│ id  ┆ name   ┆ age  ┆ score │
│ --- ┆ ---    ┆ ---  ┆ ---   │
│ i64 ┆ str    ┆ i64  ┆ f64   │
╞═════╪════════╪══════╪═══════╡
│ 1   ┆ Alice  ┆ 25   ┆ 85.5  │
│ 2   ┆ Bob    ┆ null ┆ 90.0  │
│ 3   ┆ null   ┆ 30   ┆ NaN   │
│ 4   ┆ David  ┆ 35   ┆ 78.0  │
│ 5   ┆ Eve    ┆ null ┆ 92.5  │
└─────┴────────┴──────┴───────┘

Detecting Null Values

Basic Null Detection

# Check for nulls in each column
null_counts = df.null_count()
print(null_counts)

# Check if any nulls exist in the entire DataFrame
has_nulls = df.null_count().sum_horizontal().max() > 0
print(f"DataFrame has nulls: {has_nulls}")

Column-Specific Null Checks

# Count nulls per column
for col in df.columns:
    null_count = df[col].is_null().sum()
    print(f"{col}: {null_count} nulls")

# Alternative: Get null percentage
null_pct = df.null_count() / df.height * 100
print(null_pct)

Finding Rows with Nulls

# Find rows with any null values
df_with_nulls = df.filter(
    pl.any_horizontal(pl.all().is_null())
)
print(df_with_nulls)

# Find rows where specific columns have nulls
df_name_or_age_null = df.filter(
    pl.col("name").is_null() | pl.col("age").is_null()
)
print(df_name_or_age_null)

Handling Null Values

Strategy 1: Drop Nulls

# Drop rows with ANY null values
df_clean = df.drop_nulls()

# Drop rows with nulls in specific columns
df_clean_names = df.drop_nulls(subset=["name"])

# Drop columns that contain any nulls
df_no_null_cols = df.select([
    col for col in df.columns 
    if df[col].null_count() == 0
])

Strategy 2: Fill Nulls

# Fill with a constant value
df_filled = df.with_columns(
    pl.col("age").fill_null(0),
    pl.col("name").fill_null("Unknown")
)

# Fill with forward fill (previous value)
df_ffill = df.with_columns(
    pl.col("age").fill_null(strategy="forward")
)

# Fill with backward fill (next value)
df_bfill = df.with_columns(
    pl.col("age").fill_null(strategy="backward")
)

# Fill with mean (for numeric columns)
df_mean_filled = df.with_columns(
    pl.col("age").fill_null(pl.col("age").mean())
)

# Fill with median
df_median_filled = df.with_columns(
    pl.col("age").fill_null(pl.col("age").median())
)

# Fill with interpolation (linear)
df_interpolated = df.with_columns(
    pl.col("age").interpolate()
)

Strategy 3: Conditional Filling

# Fill based on conditions
df_conditional = df.with_columns(
    pl.when(pl.col("age").is_null())
    .then(18)  # Default age if missing
    .otherwise(pl.col("age"))
    .alias("age_filled")
)

# Fill based on group statistics
df_grouped_fill = df.with_columns(
    pl.col("age").fill_null(
        pl.col("age").mean().over("department")  # If you have a department column
    )
)

Handling NaN Values

# Detect NaN (different from null)
df_with_nan_check = df.with_columns(
    pl.col("score").is_nan().alias("is_nan")
)

# Replace NaN with null first, then handle
df_no_nan = df.with_columns(
    pl.col("score").replace({float("nan"): None})
)

# Or fill NaN directly
df_filled_nan = df.with_columns(
    pl.when(pl.col("score").is_nan())
    .then(0.0)
    .otherwise(pl.col("score"))
)

df_filled_nan = df.fill_nan(0.0)

Schema Validation

Understanding Polars Schemas

# Check current schema
print(df.schema)

# Get detailed schema information
for name, dtype in df.schema.items():
    print(f"{name}: {dtype}")

Strict Schema Definition

from typing import Dict

# Define expected schema
expected_schema: Dict[str, pl.DataType] = {
    "id": pl.Int64,
    "name": pl.Utf8,
    "age": pl.Int64,
    "score": pl.Float64,
    "is_active": pl.Boolean
}

def validate_schema(df: pl.DataFrame, expected: Dict[str, pl.DataType]) -> bool:
    """Validate that DataFrame matches expected schema."""
    actual_schema = df.schema
    
    # Check all expected columns exist
    for col, dtype in expected.items():
        if col not in actual_schema:
            print(f"Missing column: {col}")
            return False
        if actual_schema[col] != dtype:
            print(f"Type mismatch for {col}: expected {dtype}, got {actual_schema[col]}")
            return False
    
    return True

# Use the validator
is_valid = validate_schema(df, expected_schema)
print(f"Schema valid: {is_valid}")

Schema Enforcement

def enforce_schema(df: pl.DataFrame, schema: Dict[str, pl.DataType]) -> pl.DataFrame:
    """Cast columns to expected types, creating missing columns with nulls."""
    columns = []
    
    for col_name, dtype in schema.items():
        if col_name in df.columns:
            # Cast existing column to expected type
            columns.append(pl.col(col_name).cast(dtype))
        else:
            # Create column with nulls of expected type
            columns.append(pl.lit(None).cast(dtype).alias(col_name))
    
    return df.select(columns)

# Apply schema enforcement
df_enforced = enforce_schema(df, expected_schema)
print(df_enforced)
print(df_enforced.schema)

Type Coercion with Error Handling

# Safe casting that handles errors
def safe_cast(df: pl.DataFrame, column: str, dtype: pl.DataType, 
              default=None) -> pl.DataFrame:
    """Attempt to cast column, filling errors with default value."""
    try:
        return df.with_columns(
            pl.col(column).cast(dtype, strict=False)
        )
    except Exception as e:
        print(f"Error casting {column}: {e}")
        if default is not None:
            return df.with_columns(
                pl.lit(default).cast(dtype).alias(column)
            )
        return df

# Example: Cast age to Int32 with fallback
df_casted = safe_cast(df, "age", pl.Int32, default=0)

Comprehensive Data Quality Checks

Uniqueness Constraints

# Check for duplicates
def check_uniqueness(df: pl.DataFrame, columns: list) -> bool:
    """Check if specified columns have unique values."""
    unique_count = df.select(columns).n_unique()
    total_count = df.height
    
    is_unique = unique_count == total_count
    if not is_unique:
        duplicates = total_count - unique_count
        print(f"Found {duplicates} duplicate rows in columns {columns}")
    
    return is_unique

# Check id column is unique
is_id_unique = check_uniqueness(df, ["id"])

# Find duplicates
duplicates = df.filter(
    pl.col("id").is_duplicated()
)
print(duplicates)

Pattern Validation (Strings)

# Validate email format
def validate_emails(df: pl.DataFrame, email_col: str) -> pl.DataFrame:
    email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
    
    return df.with_columns(
        pl.col(email_col)
        .str.contains(email_pattern)
        .alias(f"{email_col}_valid")
    )

# Create sample data with emails
email_df = pl.DataFrame({
    "id": [1, 2, 3],
    "email": ["alice@example.com", "bob@invalid", "charlie@company.org"]
})

email_validated = validate_emails(email_df, "email")
print(email_validated)

Complete Data Validation Pipeline

class DataValidator:
    """Comprehensive data validation for Polars DataFrames."""
    
    def __init__(self, schema: Dict[str, pl.DataType]):
        self.schema = schema
        self.errors = []
    
    def validate(self, df: pl.DataFrame) -> tuple[bool, pl.DataFrame]:
        """Run all validations and return (is_valid, validated_df)."""
        self.errors = []
        
        # Step 1: Schema validation
        if not self._validate_schema(df):
            return False, df
        
        # Step 2: Enforce schema
        df = self._enforce_schema(df)
        
        # Step 3: Check null thresholds
        self._check_nulls(df)
        
        # Step 4: Check duplicates
        self._check_duplicates(df)
        
        is_valid = len(self.errors) == 0
        return is_valid, df
    
    def _validate_schema(self, df: pl.DataFrame) -> bool:
        for col, dtype in self.schema.items():
            if col not in df.columns:
                self.errors.append(f"Missing required column: {col}")
        return len(self.errors) == 0
    
    def _enforce_schema(self, df: pl.DataFrame) -> pl.DataFrame:
        columns = []
        for col, dtype in self.schema.items():
            if col in df.columns:
                columns.append(pl.col(col).cast(dtype, strict=False))
            else:
                columns.append(pl.lit(None).cast(dtype).alias(col))
        return df.select(columns)
    
    def _check_nulls(self, df: pl.DataFrame, threshold: float = 0.1):
        for col in df.columns:
            null_pct = df[col].is_null().sum() / df.height
            if null_pct > threshold:
                self.errors.append(f"{col}: {null_pct:.1%} nulls exceeds threshold")
    
    def _check_duplicates(self, df: pl.DataFrame):
        dups = df.height - df.n_unique()
        if dups > 0:
            self.errors.append(f"Found {dups} duplicate rows")
    
    def get_errors(self) -> list:
        return self.errors

# Usage example
validator = DataValidator(expected_schema)
is_valid, validated_df = validator.validate(df)

if not is_valid:
    print("Validation errors:")
    for error in validator.get_errors():
        print(f"  - {error}")
else:
    print("Data validation passed!")
    print(validated_df)

Best Practices for Data Integrity

1. Always Validate at Ingestion

def load_and_validate_data(path: str, validator: DataValidator) -> pl.DataFrame:
    """Load data with validation."""
    df = pl.read_csv(path)
    
    is_valid, df = validator.validate(df)
    if not is_valid:
        raise ValueError(f"Data validation failed: {validator.get_errors()}")
    
    return df

Practice Exercise

Scenario: You’re building a data pipeline for customer information:

import polars as pl

# Sample data with quality issues
customers = pl.DataFrame({
    "customer_id": [1, 2, 3, 4, 4, 6],  # Duplicate ID (4)
    "name": ["Alice", None, "Charlie", "David", "Eve", "Frank"],
    "email": ["alice@example.com", "bob@invalid", None, "david@test.com", 
              "eve@company.org", "frank@example.com"],
    "age": [25, 150, 30, -5, 35, None],  # Invalid ages (150, -5)
    "registration_date": ["2023-01-15", "invalid_date", "2023-03-20",
                          None, "2023-05-10", "2023-06-01"],
    "is_premium": [True, False, True, None, False, True]
})

Tasks:

  1. Define a proper schema for this data with appropriate types
  2. Detect and report all null values
  3. Fix the duplicate customer_id (keep the first occurrence)
  4. Validate and clean the age column (valid range: 18-100)
  5. Validate email format for non-null emails
  6. Parse registration_date as Date type, handling errors gracefully
  7. Generate a data quality report after cleaning

Bonus Challenge:

Create a validation pipeline that: - Rejects rows with invalid emails - Fills missing ages with the median age - Drops rows with duplicate IDs (keep first) - Converts registration_date to Date type or null if invalid

Click to see solutions
import polars as pl
from datetime import datetime

# Task 1: Define schema
expected_schema = {
    "customer_id": pl.Int64,
    "name": pl.Utf8,
    "email": pl.Utf8,
    "age": pl.Int64,
    "registration_date": pl.Date,
    "is_premium": pl.Boolean
}

# Task 2: Detect nulls
print("Null counts:")
for col in customers.columns:
    null_count = customers[col].null_count()
    print(f"  {col}: {null_count}")

# Task 3: Remove duplicates (keep first)
customers_clean = customers.unique(subset=["customer_id"], keep="first")

# Task 4: Validate and clean age
# First, see invalid ages
invalid_ages = customers_clean.filter(
    (pl.col("age") < 18) | (pl.col("age") > 100)
)
print(f"\nInvalid ages: {invalid_ages.height} rows")

# Clean ages: set invalid to null, then fill with median
valid_ages = customers_clean.with_columns(
    pl.when((pl.col("age") >= 18) & (pl.col("age") <= 100))
    .then(pl.col("age"))
    .otherwise(None)
    .alias("age")
)
median_age = valid_ages["age"].median()
customers_clean = valid_ages.with_columns(
    pl.col("age").fill_null(int(median_age))
)

# Task 5: Validate email format
email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
customers_clean = customers_clean.with_columns(
    pl.when(pl.col("email").is_null())
    .then(None)
    .when(pl.col("email").str.contains(email_pattern))
    .then(pl.col("email"))
    .otherwise(None)
    .alias("email")
)

# Task 6: Parse dates
customers_clean = customers_clean.with_columns(
    pl.col("registration_date").str.to_date(format="%Y-%m-%d", strict=False)
)

# Task 7: Generate report
def generate_report(df):
    return {
        "shape": (df.height, df.width),
        "null_percentages": {
            col: df[col].null_count() / df.height 
            for col in df.columns
        },
        "duplicates": df.height - df.n_unique()
    }

report = generate_report(customers_clean)
print("\nFinal Data Quality Report:")
print(f"  Shape: {report['shape']}")
print(f"  Duplicates: {report['duplicates']}")
print("  Null Percentages:")
for col, pct in report['null_percentages'].items():
    print(f"    {col}: {pct:.1%}")

print("\nCleaned DataFrame:")
print(customers_clean)

# Bonus: Complete validation pipeline
def validate_customers(df: pl.DataFrame) -> pl.DataFrame:
    # 1. Remove duplicates
    df = df.unique(subset=["customer_id"], keep="first")
    
    # 2. Clean and fill ages
    df = df.with_columns(
        pl.when((pl.col("age") >= 18) & (pl.col("age") <= 100))
        .then(pl.col("age"))
        .otherwise(None)
        .alias("age")
    )
    median_age = int(df["age"].median())
    df = df.with_columns(pl.col("age").fill_null(median_age))
    
    # 3. Validate emails (set invalid to null)
    email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
    df = df.with_columns(
        pl.when(pl.col("email").str.contains(email_pattern))
        .then(pl.col("email"))
        .otherwise(None)
    )
    
    # 4. Parse dates
    df = df.with_columns(
        pl.col("registration_date").str.to_date(format="%Y-%m-%d", strict=False)
    )
    
    # 5. Fill missing names
    df = df.with_columns(pl.col("name").fill_null("Unknown"))
    
    # 6. Fill missing premium status
    df = df.with_columns(pl.col("is_premium").fill_null(False))
    
    return df

final_df = validate_customers(customers)
print("\nBonus - Validated DataFrame:")
print(final_df)

Resources