Design a PB-scale analytics platform: ingest 100TB/day of behavioral logs plus CDC changes from upstream OLTP databases, feeding three consumer classes at once — BI dashboards (needing minute-level freshness), ML features/training (full history, replayable to any point in time), and analyst ad-hoc SQL (scan tens of TB, return in seconds).
The old way runs a data lake (a pile of Parquet on S3) plus a warehouse (Snowflake/Redshift) side by side: the lake is cheap and holds raw data but has no transactions and rewrites an entire partition to change one row; the warehouse queries fast but is expensive, and data must be ETL'd over, so freshness always lags by hours. Two systems, two copies, double the cost, and they drift apart.
Four topics: lake vs warehouse vs lakehouse, how open table formats do ACID on S3, how the metadata layer makes queries fast and time-travel possible, and how one table serves both batch and stream.
graph TD
SRC["Data sources
OLTP CDC · logs · event streams"]
ING["Ingest layer
Spark / Flink / Kafka Connect"]
subgraph LH["Lakehouse storage (object store S3/GCS)"]
META["Table-format metadata
Iceberg / Delta / Hudi"]
DATA["Data files
Parquet / ORC columnar"]
end
CAT["Catalog
table → current snapshot pointer"]
ENG["Query/compute engines
Spark · Trino · Flink · DuckDB"]
BI["BI dashboards"]
ML["ML training/features"]
AD["Ad-hoc SQL"]
SRC --> ING --> META
META -.points to.-> DATA
CAT -.atomic swap of current snapshot.-> META
ENG -->|read metadata→prune→read only relevant files| META
ENG --> BI & ML & AD
classDef src fill:#1a2530,stroke:#64c8ff,color:#e8eef5
classDef store fill:#1a1a30,stroke:#ffb450,color:#e8eef5
classDef eng fill:#0e2030,stroke:#5eead4,color:#e8eef5
class SRC,ING src
class META,DATA,CAT store
class ENG,BI,ML,AD eng
Key insight: storage is still open columnar files on object storage, but a layer of transactional metadata on top lets many engines operate on it like a database
Principle: a classic warehouse is schema-on-write, storage-compute-coupled and closed — model first, proprietary format, fast but expensive and locked in. A data lake is schema-on-read — raw files dumped on S3, cheap, open, ML-friendly, but with no transactions, no indexes, no schema-evolution guarantees; changing one row rewrites a whole partition directory, and concurrent writes clobber each other. The lakehouse keeps the lake's open columnar files (Parquet) and adds a layer of transactional metadata on top, so one copy of data gets the lake's cost/openness and the warehouse's ACID/performance. Databricks systematically argued this in the CIDR 2021 Lakehouse paper.
| Warehouse | Data Lake | Lakehouse | |
|---|---|---|---|
| Storage | proprietary, coupled | object store + open files | object store + open files |
| Transactions | ✅ ACID | ❌ none | ✅ ACID (via metadata) |
| Schema | enforced on write | interpreted on read | evolution + optional enforce |
| Cost | high (always-on) | low | low (on-demand compute) |
| ML/openness | weak (export) | strong | strong (multi-engine direct read) |
Principle: object storage guarantees only single-object PUT atomicity — no cross-file transactions, no atomic rename (S3 rename = copy+delete). The table-format trick: all data files are append-only, immutable; every write produces new files, then atomically swaps a single "current snapshot" pointer — a reader sees either the entire old snapshot or the entire new one, never a partial state. That's snapshot isolation. The big three (Iceberg / Delta / Hudi) share this idea; they differ in metadata organization and write-optimization focus.
# Optimistic concurrency control (OCC) commit, common to all three
def commit(table, new_files):
while True:
base = catalog.current_snapshot(table) # read current snapshot
snap = base.add(new_files) # new snapshot (old + new files)
# atomic CAS: swap pointer only if current still equals base
if catalog.compare_and_swap(table,
expected=base,
new=snap):
return # committed
# someone committed first → check conflict → retry/fail
if conflicts(base, snap): raise ConflictError
_delta_log); ❌ historically best on Databricks, openness still catching up.Principle: a table format organizes metadata into a multi-level manifest tree. In Iceberg: metadata file (schema, partition spec, list of all snapshots) → manifest list (which manifests a snapshot contains, with partition value ranges) → manifest file (each data file's path + column-level stats: min/max, null count, row count). At query time the engine reads metadata top-down and uses partition ranges and column min/max to prune, skipping the vast majority of files and only opening relevant Parquet. No directory scans, no S3 LIST (the old Hive bottleneck). Each commit produces a new snapshot while old ones are retained, so time travel is just reading a historical snapshot pointer.
-- Iceberg time travel: audit / model reproduction / undelete
SELECT * FROM events
FOR SYSTEM_TIME AS OF '2026-06-29 00:00:00'; -- by time
SELECT * FROM events FOR SYSTEM_VERSION AS OF 4912; -- by snapshot id
-- File pruning: WHERE hits partition/stats → read 3 files not 80000
-- manifest: file_a: dt∈[06-28,06-28], user_id min=10 max=99
-- query dt='06-30' AND user_id=5 → ranges disjoint → skip whole file
WHERE ts > ... and the engine auto-prunes on day(ts) partitions; users need not know the physical partition column, and the partition scheme can evolve without rewriting history.Principle: classic Lambda architecture maintains a batch layer plus a speed layer — two codebases, two results, then a merge — complex and prone to drift. The lakehouse lets stream and batch write the same table: Flink/Spark Streaming continuously append-commit snapshots while batch jobs read the same table's snapshots — a Kappa-ification. The key engineering trade-off is how updates land: Copy-on-Write (COW) merges changes into new data files at write time (fast reads, high write amplification); Merge-on-Read (MoR) only appends a delta log at write time and merges at read time (fast writes, an extra merge on read). Pick MoR for high-frequency CDC upserts, COW for analytics-heavy reads.
| Copy-on-Write | Merge-on-Read | |
|---|---|---|
| one-row update | rewrite the whole data file | append one delta record |
| write amplification | high | low |
| read latency | low (direct read) | high (base + delta merge) |
| freshness | bounded by compaction cadence | near real-time |
| best for | read-heavy, analytics | CDC, frequent upserts |
# Hudi incremental read: pull only changes after a commit (core of incremental ETL)
df = (spark.read.format("hudi")
.option("hoodie.datasource.query.type", "incremental")
.option("hoodie.read.begin.instanttime", last_commit) # last watermark
.load(table_path))
# run business logic only on the delta, then upsert downstream → compute scales
# with "amount of change", not "full volume"
Two core moves: ① data files are immutable — updates/deletes write new files, old ones retained; ② a write's entire visibility rides on one atomic op: swapping the "current snapshot" pointer. A reader sees the full file list that pointer references — either the whole old set or the whole new set, no in-between. That's snapshot isolation.
The atomic swap relies on the catalog providing compare-and-swap ("swap to v5 only if current is still v4"). A service-type catalog (REST / Hive Metastore / DynamoDB) implements CAS via a conditional update of one record.
Bottleneck: all commits serialize on the CAS of the same table pointer. A hot table with high write concurrency → frequent OCC conflicts → retry storms, throughput bounded by the single catalog's commit rate. Mitigate by splitting tables/partitions to shrink the conflict domain, batching commits, and using a scalable service-side catalog. That's why the lakehouse suits "batch/medium-frequency writes," not "tens of thousands of single-row transactions per second."
Hive's root cause: table = directory, partition = subdirectory, and which files belong to the table is inferred by LIST-ing the filesystem. On S3, LIST is slow, eventually consistent, and billed per request; with hundreds of thousands of partitions and tens of millions of files, just listing takes minutes and may list a partial set (consistency). With no file-level stats, pruning only reaches partition-directory granularity. Concurrent writes rely on directory conventions with no true atomic commit, easily producing dirty reads and orphan files.
Iceberg's fix: change "which files the table has" from filesystem LIST to reading metadata manifests — manifests explicitly list each file path + column-level min/max stats. So: ① no more S3 LIST, query planning reads only a few metadata objects; ② pruning drops to file level (even row-group level), skipping most files; ③ commits atomically swap snapshots, giving native ACID and a consistent file set. The cost is maintaining this metadata tree and running compaction/expire.
What happens: MoR only appends delta logs (log/delete files) at write time, never touching the base. With compaction stopped, each file group's deltas pile up. The read side must merge base + a week of deltas in memory every time, so read latency degrades linearly with delta thickness, eventually timing out and blowing up memory. Small files/deltas explode in count, bloating metadata until even planning slows down.
Design points: ① treat compaction as a standing service merging incrementally per file group, not "an occasional batch job"; ② threshold-triggered (compact when delta size/count exceeds a limit) + rate-limited to avoid starving ingestion; ③ monitor "delta bytes per file group" and "read amplification" with alerts; ④ for read-heavy cases consider COW, or periodically materialize MoR into a COW snapshot for BI. In essence: MoR shifts cost from write to read, and compaction is the steady mechanism that shifts it back — it can't stop.
Acknowledge the limit first: querying open columnar files on object storage is bounded by S3's high-latency random reads and file-granularity metadata, so peak point-lookup / high-concurrency sub-second BI may still trail a dedicated MPP warehouse (local SSD, proprietary indexes, materialized views, result caching). Openness and peak latency trade off.
Options: ① push hot queries to seconds with clustering/Z-order + small-file governance + engine local caching — enough for most BI; ② tier it: lakehouse as the single source of truth and detail/ML, while materializing/syncing high-frequency BI aggregates into an OLAP store (ClickHouse/Druid) for sub-second dashboards — but this partly returns to two systems, trading sync latency; ③ use an MPP that reads lakehouse formats directly (Trino/StarRocks on Iceberg) to balance openness and performance. No silver bullet: either sacrifice some latency for openness/cost, or accept an extra serving replica for sub-second BI — laying this trade-off out explicitly for the business is the architect's job.
The problem: Flink continuously writes data into Iceberg data files and commits snapshots. If the process crashes between "files written but snapshot not committed" or "committed but checkpoint didn't record it," recovery either drops data or re-commits the same batch.
Alignment (two-phase commit): bind the Iceberg commit to the Flink checkpoint barrier. ① pre-commit: within each checkpoint cycle the sink writes data files (not yet in any snapshot, invisible) and stores the "pending file list" as operator state in the checkpoint; ② on checkpoint success, trigger commit: an idempotent snapshot commit atomically adds the batch to the table. On crash recovery, replay from the last successful checkpoint: uncommitted file lists remain in state and are committed once; already-committed ones aren't re-added because the Iceberg commit carries an idempotency marker (e.g. an application/operation id).
Key point: the visibility boundary = the checkpoint boundary, so downstream freshness is checkpoint-interval-level (e.g. 1 min), not per-record. Shortening checkpoints lowers latency but committing too often creates small files — back to the compaction problem.