Day 38 Hard Lakehouse Iceberg/Delta/Hudi Metadata Batch+Stream

Data Lake & Lakehouse — Growing a Database on Object StorageOpen Table Formats, Metadata, Unified Batch/Stream

Scenario + Requirements

Design a PB-scale analytics platform: ingest 100TB/day of behavioral logs plus CDC changes from upstream OLTP databases, feeding three consumer classes at once — BI dashboards (needing minute-level freshness), ML features/training (full history, replayable to any point in time), and analyst ad-hoc SQL (scan tens of TB, return in seconds).

The old way runs a data lake (a pile of Parquet on S3) plus a warehouse (Snowflake/Redshift) side by side: the lake is cheap and holds raw data but has no transactions and rewrites an entire partition to change one row; the warehouse queries fast but is expensive, and data must be ETL'd over, so freshness always lags by hours. Two systems, two copies, double the cost, and they drift apart.

Four topics: lake vs warehouse vs lakehouse, how open table formats do ACID on S3, how the metadata layer makes queries fast and time-travel possible, and how one table serves both batch and stream.

High-level Architecture (Lakehouse layering)

graph TD
    SRC["Data sources
OLTP CDC · logs · event streams"] ING["Ingest layer
Spark / Flink / Kafka Connect"] subgraph LH["Lakehouse storage (object store S3/GCS)"] META["Table-format metadata
Iceberg / Delta / Hudi"] DATA["Data files
Parquet / ORC columnar"] end CAT["Catalog
table → current snapshot pointer"] ENG["Query/compute engines
Spark · Trino · Flink · DuckDB"] BI["BI dashboards"] ML["ML training/features"] AD["Ad-hoc SQL"] SRC --> ING --> META META -.points to.-> DATA CAT -.atomic swap of current snapshot.-> META ENG -->|read metadata→prune→read only relevant files| META ENG --> BI & ML & AD classDef src fill:#1a2530,stroke:#64c8ff,color:#e8eef5 classDef store fill:#1a1a30,stroke:#ffb450,color:#e8eef5 classDef eng fill:#0e2030,stroke:#5eead4,color:#e8eef5 class SRC,ING src class META,DATA,CAT store class ENG,BI,ML,AD eng

Key insight: storage is still open columnar files on object storage, but a layer of transactional metadata on top lets many engines operate on it like a database

Key Technical Points

1. Lake vs Warehouse vs Lakehouse: splitting the database into "files + metadata"

Principle: a classic warehouse is schema-on-write, storage-compute-coupled and closed — model first, proprietary format, fast but expensive and locked in. A data lake is schema-on-read — raw files dumped on S3, cheap, open, ML-friendly, but with no transactions, no indexes, no schema-evolution guarantees; changing one row rewrites a whole partition directory, and concurrent writes clobber each other. The lakehouse keeps the lake's open columnar files (Parquet) and adds a layer of transactional metadata on top, so one copy of data gets the lake's cost/openness and the warehouse's ACID/performance. Databricks systematically argued this in the CIDR 2021 Lakehouse paper.

WarehouseData LakeLakehouse
Storageproprietary, coupledobject store + open filesobject store + open files
Transactions✅ ACID❌ none✅ ACID (via metadata)
Schemaenforced on writeinterpreted on readevolution + optional enforce
Costhigh (always-on)lowlow (on-demand compute)
ML/opennessweak (export)strongstrong (multi-engine direct read)
Trade-off:
Real-world:

2. Open table formats: three schools for doing ACID on S3

Principle: object storage guarantees only single-object PUT atomicity — no cross-file transactions, no atomic rename (S3 rename = copy+delete). The table-format trick: all data files are append-only, immutable; every write produces new files, then atomically swaps a single "current snapshot" pointer — a reader sees either the entire old snapshot or the entire new one, never a partial state. That's snapshot isolation. The big three (Iceberg / Delta / Hudi) share this idea; they differ in metadata organization and write-optimization focus.

# Optimistic concurrency control (OCC) commit, common to all three
def commit(table, new_files):
    while True:
        base = catalog.current_snapshot(table)      # read current snapshot
        snap = base.add(new_files)                   # new snapshot (old + new files)
        # atomic CAS: swap pointer only if current still equals base
        if catalog.compare_and_swap(table,
                                    expected=base,
                                    new=snap):
            return                                   # committed
        # someone committed first → check conflict → retry/fail
        if conflicts(base, snap): raise ConflictError
Trade-off (format orientation):
Real-world:

3. Metadata management: layered manifests for pruning + time travel

Principle: a table format organizes metadata into a multi-level manifest tree. In Iceberg: metadata file (schema, partition spec, list of all snapshots) → manifest list (which manifests a snapshot contains, with partition value ranges) → manifest file (each data file's path + column-level stats: min/max, null count, row count). At query time the engine reads metadata top-down and uses partition ranges and column min/max to prune, skipping the vast majority of files and only opening relevant Parquet. No directory scans, no S3 LIST (the old Hive bottleneck). Each commit produces a new snapshot while old ones are retained, so time travel is just reading a historical snapshot pointer.

-- Iceberg time travel: audit / model reproduction / undelete
SELECT * FROM events
  FOR SYSTEM_TIME AS OF '2026-06-29 00:00:00';   -- by time
SELECT * FROM events FOR SYSTEM_VERSION AS OF 4912;  -- by snapshot id

-- File pruning: WHERE hits partition/stats → read 3 files not 80000
-- manifest: file_a: dt∈[06-28,06-28], user_id min=10 max=99
-- query dt='06-30' AND user_id=5 → ranges disjoint → skip whole file
The small-file problem (the lakehouse's #1 ops pain): streaming/CDC high-frequency writes produce a flood of small files, one manifest entry each, causing metadata explosion + thousands of tiny IOs per query — a performance cliff. You must periodically compact (merge small files into large ones) and expire snapshots (reclaim storage from old snapshots and orphan files). This is a standing background job, not optional.
Real-world:

4. Unified batch/stream: one table for stream writes and batch reads (COW vs MoR)

Principle: classic Lambda architecture maintains a batch layer plus a speed layer — two codebases, two results, then a merge — complex and prone to drift. The lakehouse lets stream and batch write the same table: Flink/Spark Streaming continuously append-commit snapshots while batch jobs read the same table's snapshots — a Kappa-ification. The key engineering trade-off is how updates land: Copy-on-Write (COW) merges changes into new data files at write time (fast reads, high write amplification); Merge-on-Read (MoR) only appends a delta log at write time and merges at read time (fast writes, an extra merge on read). Pick MoR for high-frequency CDC upserts, COW for analytics-heavy reads.

Copy-on-WriteMerge-on-Read
one-row updaterewrite the whole data fileappend one delta record
write amplificationhighlow
read latencylow (direct read)high (base + delta merge)
freshnessbounded by compaction cadencenear real-time
best forread-heavy, analyticsCDC, frequent upserts
# Hudi incremental read: pull only changes after a commit (core of incremental ETL)
df = (spark.read.format("hudi")
      .option("hoodie.datasource.query.type", "incremental")
      .option("hoodie.read.begin.instanttime", last_commit)  # last watermark
      .load(table_path))
# run business logic only on the delta, then upsert downstream → compute scales
# with "amount of change", not "full volume"
Trade-off: unified batch/stream removes one codebase, but exactly-once ingestion still requires "idempotent commits + checkpoint-aligned snapshots" (Flink checkpoint two-phase-bound to the table commit); MoR shifts cost from write to read — if compaction can't keep up, deltas pile up on the read side and queries get slower.
Real-world:

Scaling & Optimization

Pitfalls + Interview Questions

1. Using the lakehouse as OLTP. Table formats are analytical: snapshot-level ACID, batch/columnar scans — not row-level low-latency point lookups or high-frequency transactions. Tens of thousands of single-row updates per second belong in Postgres, not Iceberg.
2. Ignoring small files and compaction. Stream into the lake for three months with no governance and one table has millions of small files; queries degrade from 3s to 3 minutes. Schedule compaction + expire snapshots from day one.
3. Assuming "open format" means freely mixing engines. Catalog consistency is a hidden landmine: engine A just committed a new snapshot, engine B cached the old metadata pointer and reads stale data. Unify the catalog and understand each engine's metadata-refresh semantics.
4. Treating time travel as a free lunch. Every retained snapshot pins its referenced data files from reclamation, so longer retention = higher storage cost; expiring breaks replay to that point. The retention window is an audit-need vs storage-cost trade-off — set it explicitly.
5. Common interview questions: ① On S3, which only guarantees single-object atomicity, how do you implement multi-file transactions? (immutable files + atomic snapshot-pointer swap + OCC) ② Write/read amplification of COW vs MoR — which for CDC? ③ Why is Hive slow and how does Iceberg fix it (directory LIST vs metadata pruning)? ④ Storage cost and reclamation strategy of time travel? ⑤ How do concurrent multi-engine writes avoid clobbering each other?

Resources

Going Deeper (click to expand)

1. Object storage guarantees only single-object PUT atomicity, no cross-file transactions, no atomic rename. How can a table format deliver ACID? Where's the bottleneck?

Two core moves: ① data files are immutable — updates/deletes write new files, old ones retained; ② a write's entire visibility rides on one atomic op: swapping the "current snapshot" pointer. A reader sees the full file list that pointer references — either the whole old set or the whole new set, no in-between. That's snapshot isolation.

The atomic swap relies on the catalog providing compare-and-swap ("swap to v5 only if current is still v4"). A service-type catalog (REST / Hive Metastore / DynamoDB) implements CAS via a conditional update of one record.

Bottleneck: all commits serialize on the CAS of the same table pointer. A hot table with high write concurrency → frequent OCC conflicts → retry storms, throughput bounded by the single catalog's commit rate. Mitigate by splitting tables/partitions to shrink the conflict domain, batching commits, and using a scalable service-side catalog. That's why the lakehouse suits "batch/medium-frequency writes," not "tens of thousands of single-row transactions per second."

2. Hive tables are also Parquet on S3 — why do they become unusable at PB scale? Which root cause does Iceberg eliminate?

Hive's root cause: table = directory, partition = subdirectory, and which files belong to the table is inferred by LIST-ing the filesystem. On S3, LIST is slow, eventually consistent, and billed per request; with hundreds of thousands of partitions and tens of millions of files, just listing takes minutes and may list a partial set (consistency). With no file-level stats, pruning only reaches partition-directory granularity. Concurrent writes rely on directory conventions with no true atomic commit, easily producing dirty reads and orphan files.

Iceberg's fix: change "which files the table has" from filesystem LIST to reading metadata manifests — manifests explicitly list each file path + column-level min/max stats. So: ① no more S3 LIST, query planning reads only a few metadata objects; ② pruning drops to file level (even row-group level), skipping most files; ③ commits atomically swap snapshots, giving native ACID and a consistent file set. The cost is maintaining this metadata tree and running compaction/expire.

3. A MoR table takes tens of thousands of streaming CDC upserts per second. What happens if compaction pauses for a week? How do you design so it doesn't collapse?

What happens: MoR only appends delta logs (log/delete files) at write time, never touching the base. With compaction stopped, each file group's deltas pile up. The read side must merge base + a week of deltas in memory every time, so read latency degrades linearly with delta thickness, eventually timing out and blowing up memory. Small files/deltas explode in count, bloating metadata until even planning slows down.

Design points: ① treat compaction as a standing service merging incrementally per file group, not "an occasional batch job"; ② threshold-triggered (compact when delta size/count exceeds a limit) + rate-limited to avoid starving ingestion; ③ monitor "delta bytes per file group" and "read amplification" with alerts; ④ for read-heavy cases consider COW, or periodically materialize MoR into a COW snapshot for BI. In essence: MoR shifts cost from write to read, and compaction is the steady mechanism that shifts it back — it can't stop.

4. The lakehouse claims "one dataset, many engines" removes ETL. But if the BI team wants Snowflake-grade sub-second latency, how do you weigh it?

Acknowledge the limit first: querying open columnar files on object storage is bounded by S3's high-latency random reads and file-granularity metadata, so peak point-lookup / high-concurrency sub-second BI may still trail a dedicated MPP warehouse (local SSD, proprietary indexes, materialized views, result caching). Openness and peak latency trade off.

Options: ① push hot queries to seconds with clustering/Z-order + small-file governance + engine local caching — enough for most BI; ② tier it: lakehouse as the single source of truth and detail/ML, while materializing/syncing high-frequency BI aggregates into an OLAP store (ClickHouse/Druid) for sub-second dashboards — but this partly returns to two systems, trading sync latency; ③ use an MPP that reads lakehouse formats directly (Trino/StarRocks on Iceberg) to balance openness and performance. No silver bullet: either sacrifice some latency for openness/cost, or accept an extra serving replica for sub-second BI — laying this trade-off out explicitly for the business is the architect's job.

5. In unified batch/stream, Flink streams writes into Iceberg with exactly-once. How do you align checkpoints and table commits so nothing is dropped or duplicated?

The problem: Flink continuously writes data into Iceberg data files and commits snapshots. If the process crashes between "files written but snapshot not committed" or "committed but checkpoint didn't record it," recovery either drops data or re-commits the same batch.

Alignment (two-phase commit): bind the Iceberg commit to the Flink checkpoint barrier. ① pre-commit: within each checkpoint cycle the sink writes data files (not yet in any snapshot, invisible) and stores the "pending file list" as operator state in the checkpoint; ② on checkpoint success, trigger commit: an idempotent snapshot commit atomically adds the batch to the table. On crash recovery, replay from the last successful checkpoint: uncommitted file lists remain in state and are committed once; already-committed ones aren't re-added because the Iceberg commit carries an idempotency marker (e.g. an application/operation id).

Key point: the visibility boundary = the checkpoint boundary, so downstream freshness is checkpoint-interval-level (e.g. 1 min), not per-record. Shortening checkpoints lowers latency but committing too often creates small files — back to the compaction problem.