Join 是分布式数据处理中最昂贵的操作之一。在 Apache Spark 里,join 策略的选择决定一个作业是几分钟还是几小时跑完——或是否因 OOM 错误或失控的 shuffle 而崩溃集群。Spark 的 Catalyst 优化器基于数据集大小和可用统计自动选策略,但理解每种策略的机制、shuffle 的真实成本、数据倾斜如何悄无声息地摧毁一个选得好的策略,以及自适应查询执行如何在运行时改变局面——这些技能区分了资深数据工程师与那些只调 df.join() 并祈祷的人。
- 广播 Join(Broadcast Join)——把小的一侧复制到每个 executor;大侧零 shuffle。默认阈值 10 MB(通过
autoBroadcastJoinThreshold可配)。 - Sort-Merge Join——Spark 大-大 join 的稳健默认;shuffle 后排序两侧,无论数据集多大都在常量内存里合并。
- Shuffle Hash Join——shuffle 后从较小侧建内存哈希表;build 侧装得下时比 sort-merge 快,装不下则溢出到磁盘。
- Bucketing 在写时预分区表,让对同一 key 的重复 join 完全跳过 shuffle——一次性写成本,永久收益。
- 先查
df.explain();只在有证据它选错或统计陈旧时才用 hint 覆盖优化器。
一侧小到能装进内存时用广播 join——它完全消除网络 shuffle。大-大 join 用 sort-merge join;它是该场景下 Spark 的稳健默认。build 侧装得进哈希表时 shuffle hash join 对大数据集快。除非显式需要叉积,避免笛卡尔 join。Bucketing 预分区数据以消除重复 join 中的 shuffle。
Spark 的执行模型——Stage 与 Shuffle
理解 join 策略前,你必须理解 Spark 如何执行查询。Spark 把逻辑计划翻译成 stage 的有向无环图(DAG)。每个 stage 是一组能在同一数据的不同分区上并行运行的任务——一个 stage 内任务间无数据移动。stage 由 shuffle 边界分隔:数据必须跨分区重分布的点。
shuffle 是 Spark 执行模型中最昂贵的操作。它涉及:(1) map 侧把 shuffle 数据按目标分区排序写到本地磁盘;(2) reduce 侧通过网络从所有 map 任务读那些数据;(3) 反序列化并重分区数据。对 1 TB 数据集,一次 shuffle 意味着 1 TB 写磁盘、1 TB 经网络发送、1 TB 从网络读——单个操作 3 TB 的 I/O。这就是为什么每个 join 优化从根本上都是关于减少 shuffle 量或完全消除 shuffle。
# 读物理计划——永远是第一步诊断
df.explain(mode="extended") # 显示逻辑、优化后、物理计划
# 要找的关键物理计划节点:
# BroadcastHashJoin → 选了广播 join(大侧无 shuffle)
# ShuffledHashJoin → shuffle hash join(两侧 shuffle,建哈希)
# SortMergeJoin → sort-merge join(两侧 shuffle 并排序)
# CartesianProduct → cross join(危险:O(M×N) 输出)
# Exchange → shuffle 边界(每个 Exchange = 一次 shuffle)
# 数 Exchange 节点:0 = 无 shuffle,1 = 单侧 shuffle(广播),2 = 全 shuffle
Spark UI 里的 DAG 可视化把 stage 映射到执行计划。每个 stage 边界对应一次 shuffle。两个大数据集间的 Sort-Merge Join 产生两个 Exchange 节点(每侧一个),并被拆成至少三个 stage:读两侧(stage 1 和 2)、shuffle+排序两侧(stage 3)、合并(stage 4)。广播 join 在大侧产生零个 Exchange 节点——它通常装进两个 stage。
为什么 Join 策略重要
在分布式集群里,join 的主要成本是跨网络移动数据——叫 shuffle。shuffle 强迫 Spark 序列化数据、写磁盘、经网络发到正确的 executor,并在接收端反序列化。对大数据集这能以一个数量级主导总作业运行时。每个 join 优化的目标都是最小化不必要的 shuffle——要么完全消除 shuffle(广播 join、Bucketing),要么在 shuffle 不可避免时选最高效的策略(sort-merge vs shuffle hash,取决于 build 侧是否装进内存)。
广播 Join——消除大侧的 Shuffle
当两个数据集之一小到能装进每个 worker 的内存,Spark 把较小数据集广播(复制)到所有 executor 节点。每个 executor 随后对它那部分较大数据集本地执行 join——大侧无需 shuffle。大表从不移动。小表通过类 BitTorrent 的点对点广播协议每 executor 发一次(不是从 driver 逐个发给每个 executor,那会让 driver 成瓶颈)。
- 优势:完全消除大数据集的网络 shuffle。每行直接内存哈希查找,无磁盘 I/O。小-大 join 的最佳性能——对同样数据常比 sort-merge 快 10–100 倍。
- 局限:广播的数据集必须装进 executor 内存。若超过阈值,广播本身会在所有 worker 上同时造成 OOM。默认阈值是 10 MB——对现代集群保守;若 executor 堆大小合适,100–500 MB 常安全。
from pyspark.sql import functions as F
# 提高广播阈值——executor 有足够堆则安全
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 104857600) # 100 MB
# 手动广播 hint——无视大小估计强制广播
# 当你知道表小但 Catalyst 不知(统计陈旧)时用
result = orders.join(
F.broadcast(country_codes), # 把 country_codes 复制到每个 executor
on="country_code",
how="left"
)
# SQL hint 语法(等价)
spark.sql("""
SELECT /*+ BROADCAST(c) */ o.*, c.country_name
FROM orders o
JOIN country_codes c ON o.country_code = c.code
""")
# 禁用自动广播——为测试强制 sort-merge
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
广播一个大于 executor 内存的数据集会在每个 worker 上同时造成 OOM——一次集群级崩溃。当一个上月还小的表已增长时要警惕。检查 Spark UI 广播 stage 的 "Input" 大小。若它接近 executor 堆大小(扣除开销后 executor 堆通常是总 executor 内存的 60–70%),要么增加 executor 内存,要么用 autoBroadcastJoinThreshold=-1 切到 sort-merge。
Shuffle Hash Join——全 Shuffle 后的哈希表 Join
两个数据集都通过网络 shuffle 按 join key 重分区,所以同 key 的所有行落到同一 executor。Spark 随后从一侧("build" 侧,选较小的分区集)建内存哈希表,并用另一侧("probe" 侧)的每行探测它。probe 侧每行的哈希查找是 O(1)——比 sort-merge join 里的顺序合并扫描快。
- 优势:当 build 侧完全装进每个 executor 内存时比 sort-merge 快——无排序开销、直接哈希查找。两个数据集都大但每分区 build 侧可控时很合适。
- 局限:需要两侧全网络 shuffle(计划里两个 Exchange 节点)。若 build 侧装不进 executor 的内存分区,Spark 溢出到磁盘——性能显著退化并接近或超过 sort-merge 成本。因这种溢出风险,Spark 默认禁用 Shuffle Hash Join 而倾向 sort-merge。
# 启用 Shuffle Hash Join——默认禁用
spark.conf.set("spark.sql.join.preferSortMergeJoin", False)
# SQL hint 在特定查询上强制 Shuffle Hash Join
spark.sql("""
SELECT /*+ SHUFFLE_HASH(orders) */ *
FROM orders
JOIN transactions ON orders.order_id = transactions.order_id
""")
# 什么时候值得?
# 每分区 build 侧(较小表)装进 executor 内存
# 且 join key 高基数(无倾斜)
# 且你想避免 sort-merge 的排序开销
Sort-Merge Join——Spark 大-大 Join 的稳健默认
当广播不可能时,Spark 大-大 join 的默认策略。两侧都按 join key shuffle(两个 Exchange 节点),然后在每个分区内排序。排序后的分区被顺序合并——同时穿过两个有序流的单次线性扫描,以同步前进的方式按记录匹配。这与外部归并排序的归并阶段是同一算法,且无论分区大小都在常量内存里工作:它从不需要把整个分区放进 RAM,每次每侧只一条记录。
- 优势:处理超过总可用内存的数据集——排序和合并是对有界块的流式操作。对高基数 key 正确。最通用、生产最安全的 Spark join 策略。抗倾斜:即使一个 key 比其他多得多的行,合并扫描也能处理而无需建可能 OOM 的哈希表。
- 局限:两次全 shuffle(昂贵)加两侧排序(CPU 开销)。总 I/O 大约是合并数据集大小的 3 倍(写 shuffle 数据、读 shuffle 数据、写排序输出)。在小-大场景不如广播快,哈希表装得下时不如 shuffle hash 快。
# Sort-Merge 是 Spark 默认——无需配置
# SQL hint 显式强制 Sort-Merge
spark.sql("""
SELECT /*+ MERGE(orders, transactions) */ *
FROM orders
JOIN transactions ON orders.order_id = transactions.order_id
""")
# 调 Sort-Merge:shuffle 分区数
# 默认 200——对大数据常太少,对小数据太多
# 经验法则:每个 shuffle 分区大小应 ~128–256 MB
# total_data_size_GB * 1024 / 200 MB_每分区 → 目标分区数
spark.conf.set("spark.sql.shuffle.partitions", 2000) # 对 ~400 GB shuffle 数据
# 启用 AQE(Spark 3.0+),这会自动调优——见下方 AQE 节
spark.conf.set("spark.sql.adaptive.enabled", True)
笛卡尔(Cross)Join——O(M×N)——极度小心使用
把左数据集每行与右数据集每行配对,产生 M×N 结果。无需 join key。输出大小二次增长——即便中等大的输入也产生不可管理的巨大输出。一张 100 万行的表与一张 100 万行的表叉乘产生 1 万亿行。Spark 要求显式 crossJoin() 调用或 spark.sql.crossJoin.enabled=true 以防意外的 cross join。
- 恰当用途:为小数据集生成所有成对组合(如所有可能的 A/B 测试变体组合)、对小特征向量的 ML 特征叉积计算、"把单行展开到所有分区"模式。
- 警告:在大表上意外触发笛卡尔 join——通常因省略 join 条件或写了坏 SQL 查询——是 Spark 作业失败的常见原因。提交大作业前总是检查
explain()里的CartesianProduct节点。
策略对比
| 策略 | 最适合 | 网络 Shuffle | 内存压力 | 触发条件 |
|---|---|---|---|---|
| 广播 Join | 小(< 阈值)× 大 | 仅小侧(一次) | 低(若小侧装得下) | 一侧 < autoBroadcastJoinThreshold 或 BROADCAST hint |
| Shuffle Hash Join | 中 × 大(build 装进哈希) | 两侧全 shuffle | 中–高(哈希表) | preferSortMergeJoin=false;build 侧装进内存 |
| Sort-Merge Join | 大 × 大(默认) | 两侧全 shuffle + 排序 | 低(流式合并) | 超过广播阈值时默认 |
| 笛卡尔 Join | 叉积(仅极小数据集) | 所有数据重分布 | 极端(O(M×N) 输出) | 无 join key;需 crossJoin.enabled=true |
数据倾斜——沉默的性能杀手
数据倾斜发生在行在 join key 值上分布不均时。若一个 key 有 1000 万行而其他都 1000 行,一个 executor 的任务会处理比同伴多 1 万倍的数据,而其余空闲。这是 Spark 作业在"99% 完成"卡几小时的最常见原因——一个掉队任务拖住整个 stage。
检测倾斜
检查 Spark UI 的 Stage 详情页。按 Duration 或 Input Size 排序任务。若少数任务(或就一个)耗时比中位数戏剧性地长,你有倾斜。代码里这样验证:
# join 前剖析 key 分布
df.groupBy("join_key") \
.count() \
.orderBy(F.col("count").desc()) \
.show(20)
# 若前 5 个 key 含 > 50% 所有行:你有倾斜
# 技术 1:加盐(Salting)——人为分散热 key
# 在两侧给倾斜 key 追加随机盐(0-9)
import pyspark.sql.functions as F
# 热 key 表:为所有盐值复制每行
hot_keys = F.broadcast(
small_dimension.crossJoin(
spark.range(10).toDF("salt") # 10 个盐值
).withColumn("salted_key",
F.concat(F.col("join_key"), F.lit("_"), F.col("salt")))
)
# 大表:给每行随机分配一个盐
salted_large = large_fact.withColumn(
"salted_key",
F.concat(F.col("join_key"),
F.lit("_"),
(F.rand() * 10).cast("int").cast("string"))
)
# 在加盐 key 上 join——倾斜分散到 10 倍多的分区
result = salted_large.join(hot_keys, on="salted_key")
Skew Join Hint(Spark 3.0+)
Spark 3.0 引入 SKEW_JOIN hint,告诉 Catalyst 优化器把倾斜分区拆成更小子分区并独立 join,而无需手动加盐:
# SQL 倾斜 hint——告诉 Catalyst 哪张表倾斜
spark.sql("""
SELECT /*+ SKEW_JOIN(orders) */ *
FROM orders
JOIN transactions ON orders.order_id = transactions.order_id
""")
# 基于 AQE 的自动倾斜处理(Spark 3.2+)
spark.conf.set("spark.sql.adaptive.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
# 倾斜阈值:一个分区被视为倾斜,若
# 其大小 > skewFactor × 中位数分区大小 且 > skewedPartitionThresholdInBytes
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", 5)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
Bucketing——消除重复 Join 的 Shuffle
Bucketing 在表写到存储时按 join key 预分区。若 join 两侧用相同桶数按同一 key bucketing,Spark 能完全跳过 shuffle——数据已按 key 共置。表 A 的每个桶与表 B 对应的桶在同一 executor 上 join,零网络传输。这是一次性写成本,在数据集生命周期内对该表的每次后续 join 都受益。
问题:桶数必须两侧精确匹配,且 bucketing 只在从 Hive Metastore 支撑的表格式(通过 saveAsTable 的 Parquet/ORC)读时工作——不是从裸文件读。最优桶数是数据大小的函数:目标每桶文件 ~128–256 MB,这样文件既不太小(任务太多)也不太大(掉队任务)。对 1 TB 表,4,000–8,000 桶是合理起点。
# 把 orders 按 customer_id bucketing 写出——一次性成本
orders.write \
.bucketBy(256, "customer_id") \
.sortBy("customer_id") \ # 桶内排序,Sort-Merge 无需重排
.saveAsTable("orders_bucketed")
# 用相同桶数 + key 写 customers——关键要求
customers.write \
.bucketBy(256, "customer_id") \
.sortBy("customer_id") \
.saveAsTable("customers_bucketed")
# 现在 join 无 shuffle——Catalyst 检测到桶共置
# 物理计划会显示 SortMergeJoin 而前面没有 Exchange 节点
result = spark.table("orders_bucketed").join(
spark.table("customers_bucketed"),
on="customer_id"
)
# 验证:查 explain()——无 Exchange 节点 = shuffle 已消除
result.explain()
Bucketing 在实践中脆弱。若你给一张表加分区(用不同桶数重新 bucketing),共置保证被打破,Spark 静默退回全 shuffle。schema 或分区变更后总用 explain() 确认。另外:bucketing 只在用 spark.table("name") 读时被识别,而非用 spark.read.parquet("path") 直接读 Parquet 文件时。
自适应查询执行(AQE)——Spark 3.0+
AQE 通过允许 Spark 在运行时用实际 shuffle 统计而非执行前估计来做计划决策,从根本上改变了 join 优化局面。三个直接影响 join 性能的能力:
1. 动态 Join 策略切换
若一个数据集被估计为大(于是计划了 sort-merge),但首个 shuffle stage 后实际数据比预期小得多,AQE 能在执行中途切到广播 join——而无需重启查询。这抓住了统计陈旧或不存在、优化器过于保守的常见情况。
2. 动态合并 Shuffle 分区
默认 200 的 shuffle 分区数是任意的且常错:对小数据集太多(造成数千个小任务),对大数据集太少(造成分区过大的掉队任务)。AQE 测量 shuffle 后的实际分区大小,把相邻小分区合并成更少、更大的——自动为实际数据分布调正并行度。
3. 自动倾斜处理
AQE 检测倾斜分区(那些显著大于中位数的)并自动把它们拆成子分区,从另一侧复制匹配的非倾斜分区以对每个子分区 join。这消除掉队任务而无需手动加盐——对有真实世界倾斜数据的生产管道最有影响力的 AQE 特性。
# 启用 AQE——Spark 3.2+ 默认 True,3.0/3.1 须手动设
spark.conf.set("spark.sql.adaptive.enabled", True)
# 动态 join 策略切换
# AQE 会在运行时大小 < 阈值时把 Sort-Merge → 广播
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", True)
# 动态分区合并
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", True)
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")
# 自动倾斜 join 处理
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", 5) # 5× 中位数 = 倾斜
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
读查询计划与诊断问题
用 df.explain(mode='extended')(或就 df.explain(True))检查 Spark 选了哪个 join 策略。物理计划是实际执行的——聚焦它。要找的关键节点:
BroadcastHashJoin——广播 join;看BuildRight或BuildLeft知道哪侧被广播。ShuffledHashJoin——shuffle hash join;一侧建哈希表。SortMergeJoin——sort-merge join;两侧被排序。CartesianProduct——cross join;若意外则是红旗。Exchange——shuffle 边界;数它们以理解总 shuffle 成本。BroadcastExchangevsShuffleExchange——广播把数据一次发给所有 executor;shuffle 按 key 重分布。
若 Spark 选错策略——表统计陈旧时常见——用 hint 注解或运行 ANALYZE TABLE <name> COMPUTE STATISTICS FOR ALL COLUMNS 刷新统计让 Catalyst 重新规划。启用 AQE 时,陈旧统计问题较小,因为计划在每个 shuffle stage 边界用实际运行时大小重新优化。
关键调优参数
| 参数 | 默认 | 控制什么 |
|---|---|---|
spark.sql.autoBroadcastJoinThreshold | 10 MB | 自动广播的数据集最大大小。大 executor 集群提到 100–512 MB。 |
spark.sql.shuffle.partitions | 200 | Sort-Merge/Shuffle Hash 的 shuffle 分区数。启用 AQE 时这成为上限;实际自动合并。 |
spark.sql.join.preferSortMergeJoin | true | 设 false 以允许 build 侧装进内存时用 Shuffle Hash Join。 |
spark.sql.adaptive.enabled | true (3.2+) | 自适应查询执行的总开关。 |
spark.sql.adaptive.advisoryPartitionSizeInBytes | 64 MB | AQE 合并的目标分区大小。128–256 MB 通常最优。 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor | 5 | 分区须比中位数大这么多倍才被视为倾斜。 |
spark.executor.memory | 1g | 总 executor 堆。广播 join 需小表装进这里。Shuffle Hash 需 build 侧分区装进。 |
spark.memory.fraction | 0.6 | executor 堆用于执行+存储(vs JVM 开销)的比例。内存重的 join 负载提到 0.7–0.75。 |
Join 决策树——何时用什么
实践中,提交任何 join 重的 Spark 作业前应用这个决策树:
- 检查一侧是否小到能广播——跑
df.count()并估计未压缩大小。若它装进 executor 内存(考虑压缩比,Parquet 通常 3–5 倍),用广播。提高autoBroadcastJoinThreshold或用 hint。 - 若两侧都大,检查倾斜——剖析 key 分布。若倾斜,启用 AQE 倾斜 join 处理或在选策略前应用手动加盐。
- 若 join 在同一 key 上重复——考虑写时把两表都 bucketing。一次性成本,永久消除 shuffle。
- 若启用 AQE(Spark 3.2+)——带 AQE 的 Sort-Merge 自适应处理大多数情况。除非剖析显示特定问题,信任它。
- 只在以下情况启用 Shuffle Hash Join:每分区 build 侧明确装进 executor 内存(测量,非估计)且排序开销是可测量的瓶颈。
- 总用
explain()验证——在生产数据上跑前确认物理计划匹配你的意图。
永远先查物理计划——让 Spark 优化器做它的工作,只在有证据它选错时才覆盖。决策树:一侧装进内存 → 广播(完全消除大侧 shuffle);两侧都大但 build 侧装进哈希表 → Shuffle Hash;两侧真大 → Sort-Merge(常量内存合并,Spark 的安全默认)。在倾斜让好策略失去意义之前检测并处理它。写时给频繁 join 的大表 bucketing 以永久消除 shuffle。Spark 3.0+ 启用 AQE 以获得自适应策略切换、分区合并和自动倾斜处理——它解决了大多数以前需要手动调优的问题。
为什么 shuffle 是 Spark join 的敌人?shuffle 强迫序列化、写磁盘、网络传输和反序列化——对 1 TB 数据集是一个操作 3 TB 的 I/O。每个 join 优化都旨在最小化不必要的 shuffle:广播在大侧消除它,Bucketing 完全消除它,Sort-Merge 通过做一次并流式合并来最小化额外工作。
广播 vs Sort-Merge——各自何时胜?一侧装进 executor 内存时广播胜——它消除所有大侧网络流量,常带来 10–100 倍加速。大-大 join 时 Sort-Merge 胜,因为它通过顺序扫描在有界常量内存里合并,处理超过总可用 RAM 的数据集而无 OOM 风险。
Bucketing 怎么消除 shuffle?两表在写时用相同桶数按同一 join key 预分区。Spark 的 Catalyst 优化器检测到匹配行已在同一 executor 共置,完全跳过 shuffle Exchange 节点——由物理计划里无 Exchange 确认。
什么是数据倾斜、怎么修?倾斜是某些 join key 值比其他有多得多的行,造成一个 executor 任务处理比同伴多 1000 倍的数据——一个掉队者拖住整个 stage。修法:(1) AQE 的自动倾斜 join 处理(Spark 3.0+),拆分倾斜分区;(2) 手动加盐——在两侧给倾斜 key 追加随机后缀,为所有盐值复制维度侧;(3) 若倾斜的小维度装进内存就广播它。
AQE 对 join 做什么?AQE(Spark 3.0+)在每个 shuffle 边界用实际运行时统计重新优化计划:若实际数据比估计小能把 Sort-Merge 切到广播、合并 shuffle 分区以调正并行度,并自动把倾斜分区拆成子分区以均衡执行。