设计一个 PB 级分析平台:每天摄入 100TB 行为日志 + 上游 OLTP 库的 CDC 变更,要同时喂三类消费者——BI 报表(要求分钟级新鲜度)、ML 特征/训练(要全历史、可回放任意时间点)、数据分析师 ad-hoc SQL(扫描几十 TB 也得几秒返回)。
老路子是 数据湖(S3 一堆 Parquet)+ 数仓(Snowflake/Redshift) 并存:湖便宜能存原始数据但没事务、改一行要重写整个分区;仓查询快但贵,数据还要 ETL 搬一遍,新鲜度永远落后几小时。两套系统、两份数据、两倍成本,还对不齐。
本期讲四件事:湖/仓/湖仓怎么选、开放表格式怎么在 S3 上做 ACID、元数据层怎么让查询又快又能时间旅行、批流怎么一套搞定。
graph TD
SRC["数据源
OLTP CDC · 日志 · 事件流"]
ING["摄入层
Spark / Flink / Kafka Connect"]
subgraph LH["Lakehouse 存储(对象存储 S3/GCS)"]
META["表格式元数据层
Iceberg / Delta / Hudi"]
DATA["数据文件
Parquet / ORC 列存"]
end
CAT["Catalog
表→当前快照指针"]
ENG["查询/计算引擎
Spark · Trino · Flink · DuckDB"]
BI["BI 报表"]
ML["ML 训练/特征"]
AD["Ad-hoc SQL"]
SRC --> ING --> META
META -.指向.-> DATA
CAT -.原子切换当前快照.-> META
ENG -->|读元数据→剪枝→只读相关文件| META
ENG --> BI & ML & AD
classDef src fill:#1a2530,stroke:#64c8ff,color:#e8eef5
classDef store fill:#1a1a30,stroke:#ffb450,color:#e8eef5
classDef eng fill:#0e2030,stroke:#5eead4,color:#e8eef5
class SRC,ING src
class META,DATA,CAT store
class ENG,BI,ML,AD eng
核心洞察:存储仍是对象存储里的开放列存文件,但上面盖一层事务性元数据,让多引擎像操作数据库一样操作它
原理:传统数仓是 schema-on-write、存算耦合的封闭系统——先建模、私有格式,查得快但贵且锁定。数据湖是 schema-on-read——原始文件直接堆 S3,便宜、开放、ML 友好,但没事务、没索引、没 schema 演进保证,改一行要重写整个分区目录,并发写互相覆盖。湖仓的本质是:保留湖的开放列存文件(Parquet),在其上加一层事务性元数据,于是同一份数据既有湖的成本/开放性,又有仓的 ACID/性能。Databricks 在 CIDR 2021 的 Lakehouse 论文系统论证了这个架构。
| 数据仓库 | 数据湖 | 湖仓 Lakehouse | |
|---|---|---|---|
| 存储 | 私有格式、存算耦合 | 对象存储 + 开放文件 | 对象存储 + 开放文件 |
| 事务 | ✅ ACID | ❌ 无 | ✅ ACID(靠元数据层) |
| Schema | write 时强制 | read 时解释 | 演进 + 强制可选 |
| 成本 | 高(常驻存算) | 低 | 低(按需算力) |
| ML/开放 | 弱(要导出) | 强 | 强(多引擎直读) |
原理:对象存储只保证单个对象 PUT 的原子性,没有跨文件事务、没有 rename 原子性(S3 的 rename = copy+delete)。表格式的做法是:所有数据文件只追加不修改(immutable),每次写产生新文件,再原子地切换一个『当前快照』指针——读者要么看到旧快照全集、要么看到新快照全集,绝不看到半截。这就是快照隔离(snapshot isolation)。三大格式(Iceberg / Delta / Hudi)思路同源,区别在元数据组织和写优化方向。
# 乐观并发控制(OCC)提交,三格式通用思路
def commit(table, new_files):
while True:
base = catalog.current_snapshot(table) # 读当前快照
snap = base.add(new_files) # 生成新快照(指向旧+新文件)
# 原子 CAS:仅当 current 仍等于 base 才切指针
if catalog.compare_and_swap(table,
expected=base,
new=snap):
return # 提交成功
# 否则有人抢先提交 → 校验是否冲突 → 重试/失败
if conflicts(base, snap): raise ConflictError
_delta_log 的有序 JSON)简单直观;❌ 历史上最佳体验绑 Databricks,开放性追赶中。原理:表格式把元数据组织成多级清单树。以 Iceberg 为例:metadata file(表 schema、分区规范、所有快照列表)→ manifest list(一个快照包含哪些 manifest,带分区值范围)→ manifest file(每个数据文件的路径 + 列级统计:min/max、null 数、行数)。查询时引擎自顶向下读元数据:用分区值范围和列 min/max 做剪枝(pruning),跳过绝大多数文件,只真正打开相关 Parquet。不需要扫目录、不需要 LIST S3(Hive 表的老瓶颈)。每次提交生成新快照、旧快照保留,time travel 就是读历史快照指针。
-- Iceberg 时间旅行:审计 / 模型复现 / 误删恢复
SELECT * FROM events
FOR SYSTEM_TIME AS OF '2026-06-29 00:00:00'; -- 按时间
SELECT * FROM events FOR SYSTEM_VERSION AS OF 4912; -- 按快照ID
-- 文件剪枝示意:WHERE 命中分区/统计 → 只读 3 个文件而非 80000 个
-- manifest 记录 file_a: dt∈[06-28,06-28], user_id min=10 max=99
-- 查 dt='06-30' AND user_id=5 → min/max 不相交 → 整文件跳过
WHERE ts > ...,引擎自动用 day(ts) 分区剪枝,用户不必知道物理分区列,分区方案还能演进而不重写历史。原理:传统 Lambda 架构要维护批层 + 速度层两套代码两套结果再合并,复杂且对不齐。湖仓让流和批写同一张表:Flink/Spark Streaming 持续 append 提交快照,批作业读同一张表的快照——Kappa 化。关键工程权衡是更新怎么落盘:Copy-on-Write(COW)写时就把改动合进新数据文件(读快、写放大大);Merge-on-Read(MoR)写时只追加 delta 日志、读时再合并(写快、读时多一次 merge)。CDC 高频 upsert 选 MoR,分析为主选 COW。
| Copy-on-Write | Merge-on-Read | |
|---|---|---|
| 写一行更新 | 重写整个数据文件 | 追加一条 delta 记录 |
| 写放大 | 高 | 低 |
| 读延迟 | 低(直读) | 高(base + delta 合并) |
| 新鲜度 | 受 compaction 节奏限 | 近实时 |
| 适合 | 读多写少、分析 | CDC、高频 upsert |
# Hudi 增量读:只拉某 commit 之后的变更(增量 ETL 核心)
df = (spark.read.format("hudi")
.option("hoodie.datasource.query.type", "incremental")
.option("hoodie.read.begin.instanttime", last_commit) # 上次水位
.load(table_path))
# 只对 delta 跑业务逻辑,再 upsert 到下游表 → 算力随『变更量』而非『全量』
核心两招:① 数据文件不可变——更新/删除都写新文件、旧文件保留;② 一次写的可见性全压在一个原子操作上:切换『当前快照』指针。读者看到的是指针指向的那整套文件清单,要么旧全集、要么新全集,没有中间态——这就是快照隔离。
原子切指针靠 catalog 的 compare-and-swap(『仅当当前还是 v4 才换 v5』):服务型 catalog(REST / Hive Metastore / DynamoDB)用一行记录的条件更新实现 CAS。
瓶颈:所有提交序列化在同一张表指针的 CAS 上。热表高并发写 → 频繁 OCC 冲突 → 重试风暴,吞吐受限于单点 catalog 的提交速率。缓解:拆表/拆分区降冲突域、批量提交、用服务化 catalog。这也是为什么湖仓适合『批量/中频写』而非『每秒几万笔单行事务』。
Hive 的根因:表 = 目录,分区 = 子目录,哪些文件属于这张表是靠 LIST 文件系统推断的。在 S3 上 LIST 是慢、最终一致、按请求收费的操作;一张表几十万分区、上千万文件时,光列目录就要几分钟,且可能列到一半(一致性问题)。没有文件级统计,剪枝只能到分区目录粒度。并发写靠目录约定,没有真正的原子提交,易产生脏读和孤儿文件。
Iceberg 的解法:把『这张表有哪些文件』从文件系统 LIST 变成读元数据清单——manifest 里显式列出每个文件路径 + 列级 min/max 统计。于是:① 不再 LIST S3,规划查询只读几个元数据对象;② 剪枝下沉到文件级(甚至行组级),跳过绝大多数文件;③ 提交是原子切快照,天然 ACID 与一致的文件集。代价是要维护这套元数据树,并定期 compaction/expire。
会发生:MoR 写时只追加 delta 日志(log/delete 文件),不动 base。compaction 停了,每个 file group 的 delta 越堆越高。读端每次都要把 base + 一周的 delta 在内存里 merge,读延迟随 delta 厚度线性恶化,最终查询超时、内存打爆。同时小文件/小 delta 数量爆炸,元数据膨胀,连规划查询都变慢。
设计要点:① compaction 当作常驻服务按 file group 增量合并,不是『偶尔跑的批作业』;② 阈值触发(delta 大小/数量超限就压)+ 限速避免抢占摄入;③ 监控『每 file group delta 字节数』『读放大』并告警;④ 读多场景考虑 COW,或定期把 MoR 物化成 COW 快照供 BI 读。本质:MoR 把成本从写挪到读,compaction 是把它挪回去的常态机制,不能停。
先承认边界:湖仓在对象存储 + 开放列存上做查询,受限于 S3 的高延迟随机读与文件粒度元数据,极致点查/高并发亚秒 BI 仍可能不如专用 MPP 数仓(本地 SSD、私有索引、物化视图、结果缓存)。开放性与极致延迟有取舍。
权衡选项:① 靠 clustering/Z-order + 小文件治理 + 引擎本地缓存把热查询压到秒级,多数 BI 够用;② 分层:湖仓做 single source of truth 与明细,把高频 BI 聚合物化/同步到 OLAP(ClickHouse/Druid)服务亚秒看板——但这又部分回到两套系统,要权衡同步延迟;③ 用直读湖仓格式的 MPP(Trino/StarRocks 读 Iceberg)兼顾开放与性能。没有银弹:要么牺牲一点延迟换开放/省钱,要么为亚秒 BI 接受一层额外 serving 副本——把这个取舍显式摆给业务,是架构师该做的事。
问题:Flink 持续把数据写成 Iceberg 数据文件并提交快照。若进程在『写了文件但没提交快照』或『提交了但 checkpoint 没记上』之间崩溃,恢复后要么漏数据、要么重复提交同一批文件。
对齐机制(两阶段提交):把 Iceberg commit 绑到 Flink checkpoint 屏障上。① pre-commit:每个 checkpoint 周期内 sink 把数据写成数据文件(这些文件此刻还没进快照、不可见),并把『待提交文件清单』作为算子状态存入 checkpoint;② checkpoint 成功后触发 commit:用幂等的快照提交把这批文件原子加入表。崩溃恢复时从最近成功 checkpoint 重放:未提交的文件清单还在状态里,重新提交一次即可;已提交的因 Iceberg 提交带幂等标识(如 commit 的 application/operation id)不会被重复加入。
关键点:可见性边界 = checkpoint 边界,下游新鲜度是checkpoint 间隔级(如 1 分钟)而非逐条实时——这是 exactly-once 与延迟的取舍。缩短 checkpoint 能降延迟,但提交太频又制造小文件,回到 compaction 问题。