Introduction
Data integrity is the foundation of reliable data pipelines. Without proper validation and null handling, your analyses can produce misleading results or fail entirely. In Polars, you have powerful tools to ensure your data meets quality standards before processing.
Today we’ll cover: - Null value handling: Detection, filling, and removal strategies - Schema validation: Ensuring data types match expectations - Data quality checks: Comprehensive validation patterns
Understanding Nulls in Polars
Polars uses null to represent missing values, distinct from NaN (not-a-number for floats) and None.
import polars as pl
import numpy as np
# Create a DataFrame with various null-like values
df = pl.DataFrame({
"id": [1, 2, 3, 4, 5],
"name": ["Alice", "Bob", None, "David", "Eve"],
"age": [25, None, 30, 35, None],
"score": [85.5, 90.0, np.nan, 78.0, 92.5]
})
print(df)shape: (5, 4)
┌─────┬────────┬──────┬───────┐
│ id ┆ name ┆ age ┆ score │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ i64 ┆ f64 │
╞═════╪════════╪══════╪═══════╡
│ 1 ┆ Alice ┆ 25 ┆ 85.5 │
│ 2 ┆ Bob ┆ null ┆ 90.0 │
│ 3 ┆ null ┆ 30 ┆ NaN │
│ 4 ┆ David ┆ 35 ┆ 78.0 │
│ 5 ┆ Eve ┆ null ┆ 92.5 │
└─────┴────────┴──────┴───────┘
Detecting Null Values
Basic Null Detection
# Check for nulls in each column
null_counts = df.null_count()
print(null_counts)
# Check if any nulls exist in the entire DataFrame
has_nulls = df.null_count().sum_horizontal().max() > 0
print(f"DataFrame has nulls: {has_nulls}")Column-Specific Null Checks
# Count nulls per column
for col in df.columns:
null_count = df[col].is_null().sum()
print(f"{col}: {null_count} nulls")
# Alternative: Get null percentage
null_pct = df.null_count() / df.height * 100
print(null_pct)Finding Rows with Nulls
# Find rows with any null values
df_with_nulls = df.filter(
pl.any_horizontal(pl.all().is_null())
)
print(df_with_nulls)
# Find rows where specific columns have nulls
df_name_or_age_null = df.filter(
pl.col("name").is_null() | pl.col("age").is_null()
)
print(df_name_or_age_null)Handling Null Values
Strategy 1: Drop Nulls
# Drop rows with ANY null values
df_clean = df.drop_nulls()
# Drop rows with nulls in specific columns
df_clean_names = df.drop_nulls(subset=["name"])
# Drop columns that contain any nulls
df_no_null_cols = df.select([
col for col in df.columns
if df[col].null_count() == 0
])Strategy 2: Fill Nulls
# Fill with a constant value
df_filled = df.with_columns(
pl.col("age").fill_null(0),
pl.col("name").fill_null("Unknown")
)
# Fill with forward fill (previous value)
df_ffill = df.with_columns(
pl.col("age").fill_null(strategy="forward")
)
# Fill with backward fill (next value)
df_bfill = df.with_columns(
pl.col("age").fill_null(strategy="backward")
)
# Fill with mean (for numeric columns)
df_mean_filled = df.with_columns(
pl.col("age").fill_null(pl.col("age").mean())
)
# Fill with median
df_median_filled = df.with_columns(
pl.col("age").fill_null(pl.col("age").median())
)
# Fill with interpolation (linear)
df_interpolated = df.with_columns(
pl.col("age").interpolate()
)Strategy 3: Conditional Filling
# Fill based on conditions
df_conditional = df.with_columns(
pl.when(pl.col("age").is_null())
.then(18) # Default age if missing
.otherwise(pl.col("age"))
.alias("age_filled")
)
# Fill based on group statistics
df_grouped_fill = df.with_columns(
pl.col("age").fill_null(
pl.col("age").mean().over("department") # If you have a department column
)
)Handling NaN Values
# Detect NaN (different from null)
df_with_nan_check = df.with_columns(
pl.col("score").is_nan().alias("is_nan")
)
# Replace NaN with null first, then handle
df_no_nan = df.with_columns(
pl.col("score").replace({float("nan"): None})
)
# Or fill NaN directly
df_filled_nan = df.with_columns(
pl.when(pl.col("score").is_nan())
.then(0.0)
.otherwise(pl.col("score"))
)
df_filled_nan = df.fill_nan(0.0)Schema Validation
Understanding Polars Schemas
# Check current schema
print(df.schema)
# Get detailed schema information
for name, dtype in df.schema.items():
print(f"{name}: {dtype}")Strict Schema Definition
from typing import Dict
# Define expected schema
expected_schema: Dict[str, pl.DataType] = {
"id": pl.Int64,
"name": pl.Utf8,
"age": pl.Int64,
"score": pl.Float64,
"is_active": pl.Boolean
}
def validate_schema(df: pl.DataFrame, expected: Dict[str, pl.DataType]) -> bool:
"""Validate that DataFrame matches expected schema."""
actual_schema = df.schema
# Check all expected columns exist
for col, dtype in expected.items():
if col not in actual_schema:
print(f"Missing column: {col}")
return False
if actual_schema[col] != dtype:
print(f"Type mismatch for {col}: expected {dtype}, got {actual_schema[col]}")
return False
return True
# Use the validator
is_valid = validate_schema(df, expected_schema)
print(f"Schema valid: {is_valid}")Schema Enforcement
def enforce_schema(df: pl.DataFrame, schema: Dict[str, pl.DataType]) -> pl.DataFrame:
"""Cast columns to expected types, creating missing columns with nulls."""
columns = []
for col_name, dtype in schema.items():
if col_name in df.columns:
# Cast existing column to expected type
columns.append(pl.col(col_name).cast(dtype))
else:
# Create column with nulls of expected type
columns.append(pl.lit(None).cast(dtype).alias(col_name))
return df.select(columns)
# Apply schema enforcement
df_enforced = enforce_schema(df, expected_schema)
print(df_enforced)
print(df_enforced.schema)Type Coercion with Error Handling
# Safe casting that handles errors
def safe_cast(df: pl.DataFrame, column: str, dtype: pl.DataType,
default=None) -> pl.DataFrame:
"""Attempt to cast column, filling errors with default value."""
try:
return df.with_columns(
pl.col(column).cast(dtype, strict=False)
)
except Exception as e:
print(f"Error casting {column}: {e}")
if default is not None:
return df.with_columns(
pl.lit(default).cast(dtype).alias(column)
)
return df
# Example: Cast age to Int32 with fallback
df_casted = safe_cast(df, "age", pl.Int32, default=0)Comprehensive Data Quality Checks
Uniqueness Constraints
# Check for duplicates
def check_uniqueness(df: pl.DataFrame, columns: list) -> bool:
"""Check if specified columns have unique values."""
unique_count = df.select(columns).n_unique()
total_count = df.height
is_unique = unique_count == total_count
if not is_unique:
duplicates = total_count - unique_count
print(f"Found {duplicates} duplicate rows in columns {columns}")
return is_unique
# Check id column is unique
is_id_unique = check_uniqueness(df, ["id"])
# Find duplicates
duplicates = df.filter(
pl.col("id").is_duplicated()
)
print(duplicates)Pattern Validation (Strings)
# Validate email format
def validate_emails(df: pl.DataFrame, email_col: str) -> pl.DataFrame:
email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
return df.with_columns(
pl.col(email_col)
.str.contains(email_pattern)
.alias(f"{email_col}_valid")
)
# Create sample data with emails
email_df = pl.DataFrame({
"id": [1, 2, 3],
"email": ["alice@example.com", "bob@invalid", "charlie@company.org"]
})
email_validated = validate_emails(email_df, "email")
print(email_validated)Complete Data Validation Pipeline
class DataValidator:
"""Comprehensive data validation for Polars DataFrames."""
def __init__(self, schema: Dict[str, pl.DataType]):
self.schema = schema
self.errors = []
def validate(self, df: pl.DataFrame) -> tuple[bool, pl.DataFrame]:
"""Run all validations and return (is_valid, validated_df)."""
self.errors = []
# Step 1: Schema validation
if not self._validate_schema(df):
return False, df
# Step 2: Enforce schema
df = self._enforce_schema(df)
# Step 3: Check null thresholds
self._check_nulls(df)
# Step 4: Check duplicates
self._check_duplicates(df)
is_valid = len(self.errors) == 0
return is_valid, df
def _validate_schema(self, df: pl.DataFrame) -> bool:
for col, dtype in self.schema.items():
if col not in df.columns:
self.errors.append(f"Missing required column: {col}")
return len(self.errors) == 0
def _enforce_schema(self, df: pl.DataFrame) -> pl.DataFrame:
columns = []
for col, dtype in self.schema.items():
if col in df.columns:
columns.append(pl.col(col).cast(dtype, strict=False))
else:
columns.append(pl.lit(None).cast(dtype).alias(col))
return df.select(columns)
def _check_nulls(self, df: pl.DataFrame, threshold: float = 0.1):
for col in df.columns:
null_pct = df[col].is_null().sum() / df.height
if null_pct > threshold:
self.errors.append(f"{col}: {null_pct:.1%} nulls exceeds threshold")
def _check_duplicates(self, df: pl.DataFrame):
dups = df.height - df.n_unique()
if dups > 0:
self.errors.append(f"Found {dups} duplicate rows")
def get_errors(self) -> list:
return self.errors
# Usage example
validator = DataValidator(expected_schema)
is_valid, validated_df = validator.validate(df)
if not is_valid:
print("Validation errors:")
for error in validator.get_errors():
print(f" - {error}")
else:
print("Data validation passed!")
print(validated_df)Best Practices for Data Integrity
1. Always Validate at Ingestion
def load_and_validate_data(path: str, validator: DataValidator) -> pl.DataFrame:
"""Load data with validation."""
df = pl.read_csv(path)
is_valid, df = validator.validate(df)
if not is_valid:
raise ValueError(f"Data validation failed: {validator.get_errors()}")
return dfPractice Exercise
Scenario: You’re building a data pipeline for customer information:
import polars as pl
# Sample data with quality issues
customers = pl.DataFrame({
"customer_id": [1, 2, 3, 4, 4, 6], # Duplicate ID (4)
"name": ["Alice", None, "Charlie", "David", "Eve", "Frank"],
"email": ["alice@example.com", "bob@invalid", None, "david@test.com",
"eve@company.org", "frank@example.com"],
"age": [25, 150, 30, -5, 35, None], # Invalid ages (150, -5)
"registration_date": ["2023-01-15", "invalid_date", "2023-03-20",
None, "2023-05-10", "2023-06-01"],
"is_premium": [True, False, True, None, False, True]
})Tasks:
- Define a proper schema for this data with appropriate types
- Detect and report all null values
- Fix the duplicate customer_id (keep the first occurrence)
- Validate and clean the age column (valid range: 18-100)
- Validate email format for non-null emails
- Parse registration_date as Date type, handling errors gracefully
- Generate a data quality report after cleaning
Bonus Challenge:
Create a validation pipeline that: - Rejects rows with invalid emails - Fills missing ages with the median age - Drops rows with duplicate IDs (keep first) - Converts registration_date to Date type or null if invalid
Click to see solutions
import polars as pl
from datetime import datetime
# Task 1: Define schema
expected_schema = {
"customer_id": pl.Int64,
"name": pl.Utf8,
"email": pl.Utf8,
"age": pl.Int64,
"registration_date": pl.Date,
"is_premium": pl.Boolean
}
# Task 2: Detect nulls
print("Null counts:")
for col in customers.columns:
null_count = customers[col].null_count()
print(f" {col}: {null_count}")
# Task 3: Remove duplicates (keep first)
customers_clean = customers.unique(subset=["customer_id"], keep="first")
# Task 4: Validate and clean age
# First, see invalid ages
invalid_ages = customers_clean.filter(
(pl.col("age") < 18) | (pl.col("age") > 100)
)
print(f"\nInvalid ages: {invalid_ages.height} rows")
# Clean ages: set invalid to null, then fill with median
valid_ages = customers_clean.with_columns(
pl.when((pl.col("age") >= 18) & (pl.col("age") <= 100))
.then(pl.col("age"))
.otherwise(None)
.alias("age")
)
median_age = valid_ages["age"].median()
customers_clean = valid_ages.with_columns(
pl.col("age").fill_null(int(median_age))
)
# Task 5: Validate email format
email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
customers_clean = customers_clean.with_columns(
pl.when(pl.col("email").is_null())
.then(None)
.when(pl.col("email").str.contains(email_pattern))
.then(pl.col("email"))
.otherwise(None)
.alias("email")
)
# Task 6: Parse dates
customers_clean = customers_clean.with_columns(
pl.col("registration_date").str.to_date(format="%Y-%m-%d", strict=False)
)
# Task 7: Generate report
def generate_report(df):
return {
"shape": (df.height, df.width),
"null_percentages": {
col: df[col].null_count() / df.height
for col in df.columns
},
"duplicates": df.height - df.n_unique()
}
report = generate_report(customers_clean)
print("\nFinal Data Quality Report:")
print(f" Shape: {report['shape']}")
print(f" Duplicates: {report['duplicates']}")
print(" Null Percentages:")
for col, pct in report['null_percentages'].items():
print(f" {col}: {pct:.1%}")
print("\nCleaned DataFrame:")
print(customers_clean)
# Bonus: Complete validation pipeline
def validate_customers(df: pl.DataFrame) -> pl.DataFrame:
# 1. Remove duplicates
df = df.unique(subset=["customer_id"], keep="first")
# 2. Clean and fill ages
df = df.with_columns(
pl.when((pl.col("age") >= 18) & (pl.col("age") <= 100))
.then(pl.col("age"))
.otherwise(None)
.alias("age")
)
median_age = int(df["age"].median())
df = df.with_columns(pl.col("age").fill_null(median_age))
# 3. Validate emails (set invalid to null)
email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
df = df.with_columns(
pl.when(pl.col("email").str.contains(email_pattern))
.then(pl.col("email"))
.otherwise(None)
)
# 4. Parse dates
df = df.with_columns(
pl.col("registration_date").str.to_date(format="%Y-%m-%d", strict=False)
)
# 5. Fill missing names
df = df.with_columns(pl.col("name").fill_null("Unknown"))
# 6. Fill missing premium status
df = df.with_columns(pl.col("is_premium").fill_null(False))
return df
final_df = validate_customers(customers)
print("\nBonus - Validated DataFrame:")
print(final_df)