100 Days of Polars - Day 009: Working with S3

How to read, scan, and analyze Parquet data on S3 directly from Polars — with lazy evaluation, anonymous access, and streaming
polars
data-engineering
s3
cloud
100-days-of-polars
Author

NomadC

Published

February 10, 2026

Introduction

One of Polars’ most powerful features is its native support for reading data directly from cloud object storage. No need to download files first — Polars can read and scan Parquet files on S3 with full support for lazy evaluation, predicate pushdown, and streaming. Today we’ll explore how to connect Polars to S3 and work with real-world datasets at scale.

Quick Reference

Method Function When to Use
pl.read_parquet("s3://...") Eager read Small files, quick exploration
pl.scan_parquet("s3://...") Lazy scan Large files, complex queries
storage_options={"anon": True} Anonymous access Public datasets, no credentials needed
storage_options={"profile": "..."} Named profile Multiple AWS accounts
collect(streaming=True) Streaming mode Datasets larger than memory

Prerequisites

Polars uses the object_store Rust crate under the hood, so S3 support works out of the box. Make sure you have Polars installed:

pip install polars

For authentication, credentials can be provided via:

  • AWS CLI config (aws configure)
  • Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
  • ~/.aws/credentials file
  • IAM roles (when running on EC2/ECS/Lambda)
  • storage_options parameter in Polars

Reading a Single File (Eager Mode)

The simplest approach — load an entire file into memory:

import polars as pl

# Read a single Parquet file from S3
df = pl.read_parquet(
    "s3://daylight-openstreetmap/parquet/osm_features/release=v1.58/type=way/"
    "20241112_191814_00139_grr7u_fea4d477-4748-4e7d-9aed-90290d792f01"
)

print(f"Shape: {df.shape[0]:,} rows × {df.shape[1]} columns")
print(f"Memory: {df.estimated_size() / 1024 / 1024:.2f} MB")
print(df.head())

Key points:

  • Works exactly like pl.read_parquet() with a local file
  • The entire file is downloaded and loaded into memory
  • Best for files that fit comfortably in RAM

Anonymous Access to Public Buckets

Many public datasets on S3 don’t require AWS credentials. Use storage_options to access them anonymously:

# Access public datasets without credentials
df = pl.read_parquet(
    "s3://daylight-openstreetmap/parquet/osm_features/release=v1.58/type=way/"
    "20241112_191814_00139_grr7u_fea4d477-4748-4e7d-9aed-90290d792f01",
    storage_options={"aws_region": "us-east-1"},
)

print(f"✓ Loaded {df.shape[0]:,} rows without any credentials!")

When to use storage_options:

Option Purpose
{"anon": True} Explicitly anonymous access
{"aws_region": "us-east-1"} Specify region for the bucket
{"profile": "work"} Use a named AWS CLI profile
{"aws_access_key_id": "...", "aws_secret_access_key": "..."} Inline credentials (avoid in production!)

Filtering S3 Data Efficiently

Combine lazy scanning with filters to minimize data transfer:

# Only reads 'tags', 'id', 'type', and 'geometry' columns
# Skips row groups where tags definitely don't contain "highway"
highways = (
    lf
    .filter(pl.col("tags").str.contains("highway"))
    .select(["id", "type", "tags", "geometry"])
    .limit(100)
    .collect()
)

print(f"Found {highways.shape[0]} highway features")
print(highways.head())

Aggregating Across Multiple Files

One of the best use cases for Polars + S3 is aggregating across a partitioned dataset:

# Statistics grouped by release version — scans across all files
release_stats = (
    lf
    .group_by("release")
    .agg([
        pl.count().alias("total_features"),
        pl.col("id").n_unique().alias("unique_ids"),
        pl.col("version").mean().alias("avg_version"),
    ])
    .sort("release")
    .collect()
)

print(release_stats)

Streaming Mode for Very Large Datasets

When even lazy evaluation isn’t enough — because the aggregation intermediate results are too large for memory — enable streaming:

# Streaming processes data in batches, keeping memory usage bounded
streaming_result = (
    lf
    .filter(pl.col("type") == "way")
    .group_by("release")
    .agg([
        pl.count().alias("count"),
        pl.col("version").max().alias("max_version"),
    ])
    .collect(streaming=True)  # ← Enable streaming engine
)

print(streaming_result)

When to use streaming:

  • Dataset doesn’t fit in memory
  • Aggregation across many files
  • You only need summary statistics, not the raw data

Joining S3 Datasets

You can join multiple S3 datasets together using lazy frames:

# Scan two separate datasets
features_lf = pl.scan_parquet(
    "s3://daylight-openstreetmap/parquet/osm_features/**/*.parquet"
)
elements_lf = pl.scan_parquet(
    "s3://daylight-openstreetmap/parquet/osm_elements/**/*.parquet"
)

# Join them on 'id' — Polars handles the optimization
joined = (
    features_lf.limit(10_000)
    .join(
        elements_lf.limit(10_000),
        on="id",
        how="inner"
    )
    .select(["id", "type", "tags"])
    .collect()
)

print(f"Joined result: {joined.shape}")
print(joined.head())

Writing Results Back to S3

After analysis, you can write results back to S3:

# Write results to local Parquet
result_df.write_parquet("analysis_results.parquet")

# Write to S3 (requires write permissions)
# Option 1: Use AWS CLI
# aws s3 cp analysis_results.parquet s3://my-bucket/results/

# Option 2: Use boto3 in Python
import boto3

s3_client = boto3.client("s3")
s3_client.upload_file(
    "analysis_results.parquet",
    "my-bucket",
    "results/analysis_results.parquet"
)

Performance Tips

  1. Always prefer scan_parquet over read_parquet for S3 — it enables predicate and projection pushdown
  2. Use glob patterns (**/*.parquet) to scan partitioned datasets
  3. Enable streaming for aggregations across datasets larger than memory
  4. Specify storage_options with the correct region to avoid cross-region data transfer
  5. Download locally first for iterative exploration on the same file:
# Download once, read many times
aws s3 cp s3://daylight-openstreetmap/parquet/osm_features/release=v1.58/type=way/FILENAME \
    ./osm_sample.parquet --no-sign-request

# Then load locally (much faster for repeated reads)
df = pl.read_parquet("./osm_sample.parquet")

Practice Exercise

You want to analyze public NYC taxi trip data stored on S3.

import polars as pl

# NYC Taxi dataset (public, partitioned by month)
# s3://nyc-tlc/trip data/yellow_tripdata_2023-*.parquet
# For practice, let's assume you have a local sample
taxi = pl.DataFrame({
    "pickup_datetime": [
        "2026-01-10 08:15:00", "2026-01-10 09:30:00",
        "2026-01-10 12:00:00", "2026-01-10 18:45:00",
        "2026-01-11 07:00:00", "2026-01-11 14:30:00",
    ],
    "dropoff_datetime": [
        "2026-01-10 08:45:00", "2026-01-10 10:00:00",
        "2026-01-10 12:30:00", "2026-01-10 19:15:00",
        "2026-01-11 07:25:00", "2026-01-11 15:00:00",
    ],
    "trip_distance": [3.2, 8.5, 2.1, 12.0, 5.5, 4.8],
    "fare_amount": [15.0, 32.5, 10.0, 45.0, 22.0, 18.5],
    "tip_amount": [3.0, 6.5, 2.0, 9.0, 4.4, 3.7],
    "payment_type": ["Credit", "Credit", "Cash", "Credit", "Credit", "Cash"],
    "passenger_count": [1, 2, 1, 3, 1, 2],
}).with_columns(
    pl.col("pickup_datetime").str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S"),
    pl.col("dropoff_datetime").str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S"),
)

Tasks:

  1. Simulate lazy S3 read: Convert the DataFrame to a LazyFrame (simulating scan_parquet) and calculate average fare per payment type.

  2. Trip duration: Calculate the trip duration in minutes for each trip using the pickup and dropoff times.

  3. Cost per mile: Add a column cost_per_mile (fare / distance) and find the most expensive trip per mile.

  4. Peak hours: Extract the pickup hour and find which hour has the highest average fare.

  5. Daily summary: Group by date and calculate: total trips, total revenue (fare + tip), and average trip distance.

Click to see solutions
# Task 1: Lazy evaluation simulation
avg_fare_by_payment = (
    taxi.lazy()
    .group_by("payment_type")
    .agg(pl.col("fare_amount").mean().alias("avg_fare"))
    .collect()
)
print("Avg fare by payment type:")
print(avg_fare_by_payment)

# Task 2: Trip duration in minutes
with_duration = taxi.with_columns(
    ((pl.col("dropoff_datetime") - pl.col("pickup_datetime"))
     .dt.total_minutes())
    .alias("duration_minutes")
)
print("\nTrip durations:")
print(with_duration.select(["pickup_datetime", "dropoff_datetime", "duration_minutes"]))

# Task 3: Cost per mile
with_cost = taxi.with_columns(
    (pl.col("fare_amount") / pl.col("trip_distance")).alias("cost_per_mile")
)
most_expensive = with_cost.sort("cost_per_mile", descending=True).head(1)
print("\nMost expensive trip per mile:")
print(most_expensive)

# Task 4: Peak hours
peak_hours = (
    taxi.with_columns(
        pl.col("pickup_datetime").dt.hour().alias("pickup_hour")
    )
    .group_by("pickup_hour")
    .agg(pl.col("fare_amount").mean().alias("avg_fare"))
    .sort("avg_fare", descending=True)
)
print("\nPeak hours by avg fare:")
print(peak_hours)

# Task 5: Daily summary
daily_summary = (
    taxi.with_columns(
        pl.col("pickup_datetime").dt.date().alias("date"),
        (pl.col("fare_amount") + pl.col("tip_amount")).alias("total_revenue"),
    )
    .group_by("date")
    .agg([
        pl.count().alias("total_trips"),
        pl.col("total_revenue").sum().alias("total_revenue"),
        pl.col("trip_distance").mean().alias("avg_distance"),
    ])
    .sort("date")
)
print("\nDaily summary:")
print(daily_summary)

Resources