This is an automated email from the ASF dual-hosted git repository.
gengliangwang pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new cf3f659a695a [SPARK-57025][SQL] SortMergeJoinExec: extract
JoinHelper.resetMatched for full-outer BitSet bookkeeping
cf3f659a695a is described below
commit cf3f659a695a3bc2e83922a39aad61c4fde65673
Author: Gengliang Wang <[email protected]>
AuthorDate: Sat May 23 21:01:16 2026 -0700
[SPARK-57025][SQL] SortMergeJoinExec: extract JoinHelper.resetMatched for
full-outer BitSet bookkeeping
### What changes were proposed in this pull request?
This is a sub-task of
[SPARK-56908](https://issues.apache.org/jira/browse/SPARK-56908).
`SortMergeJoinExec.codegenFullOuter` and its interpreted counterpart
`SortMergeFullOuterJoinScanner.findMatchingRows` both duplicate the "reuse the
`BitSet` if its capacity is enough, otherwise allocate a new one" idiom for
tracking matched left/right rows.
Extract it into a new static helper class at
`sql/core/src/main/java/org/apache/spark/sql/execution/joins/JoinHelper.java`:
```java
public static BitSet resetMatched(BitSet matched, int bufferSize) { ... }
```
and call it from the four sites (left + right in each method).
### Why are the changes needed?
- Replaces ~16 inline lines with 4 helper calls, shrinking generated Java
for the codegen sites.
- Keeps the codegen and interpreted full-outer paths in lockstep so a
future change to the reset rule lands in one place.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing `OuterJoinSuite` covers full-outer SMJ through both code paths
(codegen on and off).
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code
Closes #56073 from gengliangwang/SPARK-57025-resetMatched.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
(cherry picked from commit b2c2a8d68dcbbaca715adc74c0dd543582c9ff02)
Signed-off-by: Gengliang Wang <[email protected]>
---
.../spark/sql/execution/joins/JoinHelper.java | 47 ++++++++++++++++++++++
.../sql/execution/joins/SortMergeJoinExec.scala | 26 +++---------
2 files changed, 53 insertions(+), 20 deletions(-)
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/joins/JoinHelper.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/joins/JoinHelper.java
new file mode 100644
index 000000000000..91156b2600fd
--- /dev/null
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/joins/JoinHelper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins;
+
+import org.apache.spark.util.collection.BitSet;
+
+/**
+ * Static helpers shared by join operators in this package, used both from
whole-stage codegen and
+ * from interpreted execution paths. Hoisting recurring snippets here keeps
the generated Java
+ * source smaller and lets the JIT compile the bodies once instead of once per
stage.
+ */
+public final class JoinHelper {
+
+ private JoinHelper() {}
+
+ /**
+ * Reset a Spark {@link org.apache.spark.util.collection.BitSet} (not {@link
java.util.BitSet})
+ * that tracks which rows in a buffer of size {@code bufferSize} have
already been matched.
+ * Reuses {@code matched} when its capacity is sufficient; otherwise returns
a freshly allocated
+ * BitSet. Callers must assign the returned reference back to their bit-set
field.
+ *
+ * <p>Used by full-outer sort-merge join, where the left- and right-side
buffers are repopulated
+ * for each batch of rows sharing a join key.
+ */
+ public static BitSet resetMatched(BitSet matched, int bufferSize) {
+ if (bufferSize <= matched.capacity()) {
+ matched.clearUntil(bufferSize);
+ return matched;
+ }
+ return new BitSet(bufferSize);
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index bc2f9197df9d..985fc518742c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -954,16 +954,10 @@ case class SortMergeJoinExec(
| }
|
| // Reset bit sets of buffers accordingly
- | if ($leftBuffer.size() <= $leftMatched.capacity()) {
- | $leftMatched.clearUntil($leftBuffer.size());
- | } else {
- | $leftMatched = new $matchedClsName($leftBuffer.size());
- | }
- | if ($rightBuffer.size() <= $rightMatched.capacity()) {
- | $rightMatched.clearUntil($rightBuffer.size());
- | } else {
- | $rightMatched = new $matchedClsName($rightBuffer.size());
- | }
+ | $leftMatched = ${classOf[JoinHelper].getName}.resetMatched(
+ | $leftMatched, $leftBuffer.size());
+ | $rightMatched = ${classOf[JoinHelper].getName}.resetMatched(
+ | $rightMatched, $rightBuffer.size());
|}
""".stripMargin)
@@ -1457,16 +1451,8 @@ private class SortMergeFullOuterJoinScanner(
advancedRight()
}
- if (leftMatches.size <= leftMatched.capacity) {
- leftMatched.clearUntil(leftMatches.size)
- } else {
- leftMatched = new BitSet(leftMatches.size)
- }
- if (rightMatches.size <= rightMatched.capacity) {
- rightMatched.clearUntil(rightMatches.size)
- } else {
- rightMatched = new BitSet(rightMatches.size)
- }
+ leftMatched = JoinHelper.resetMatched(leftMatched, leftMatches.size)
+ rightMatched = JoinHelper.resetMatched(rightMatched, rightMatches.size)
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]