This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch release/v1.2
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/release/v1.2 by this push:
new b876b475d9 fix(workflow-operator): no null padding in reservoir
sampling (#5606)
b876b475d9 is described below
commit b876b475d93c37696b8da0436d7ad3e7263e6317
Author: Suyash Jain <[email protected]>
AuthorDate: Fri Jun 12 20:39:30 2026 +0000
fix(workflow-operator): no null padding in reservoir sampling (#5606)
### What changes were proposed in this PR?
`ReservoirSamplingOpExec` allocates a fixed-size reservoir of length
`count` (the per-worker share of `k`). When a worker receives fewer
tuples than `count`, only the first `n` slots are filled, but `onFinish`
returned the whole array, yielding `count - n` trailing `null` entries.
The nulls are currently swallowed by a distant null-guard in
`DataProcessor`, so the bug is latent — but the operator violates the
"do not emit null tuples" contract and breaks if that guard is ever
narrowed or bypassed.
```
Before: input < k -> onFinish emits [t0 .. tn-1, null, ..., null]
(engine guard hides them)
After: input < k -> onFinish emits [t0 .. tn-1] (no
nulls emitted at all)
```
The fix emits only the filled prefix:
```scala
override def onFinish(port: Int): Iterator[TupleLike] =
reservoir.iterator.take(n)
```
`take(n)` is a no-op when `n >= count` (input ≥ k), so the sampled
output is unchanged in the normal case.
### Any related issues, documentation, discussions?
Closes #5592
### How was this PR tested?
Added three regression cases to `ReservoirSamplingOpExecSpec`:
| Case | Asserts |
| --- | --- |
| `input size < k` | only the received tuples are emitted, in order, no
nulls |
| empty input | `onFinish` emits nothing |
| skewed partitioning (`k=10`, 3 workers, worker 0 gets 2 tuples) | no
null padding for an under-filled worker share |
All three fail against the old `reservoir.iterator` and pass with
`reservoir.iterator.take(n)`; the 9 pre-existing cases stay green (TDD
red → green verified by stashing the source fix).
```
sbt "WorkflowOperator/testOnly
org.apache.texera.amber.operator.reservoirsampling.ReservoirSamplingOpExecSpec"
# Tests: succeeded 12, failed 0, canceled 0, ignored 0, pending 0
```
`sbt WorkflowOperator/scalafixAll` and `sbt
WorkflowOperator/scalafmtAll` produce no further diff.
### Was this PR authored or co-authored using generative AI tooling?
Yes, partially. I (Suyash Jain) worked on this PR together with Claude
Code as a pair-programming assistant. I reviewed the final diff, ran the
spec locally, and verified the red → green behavior of the new
regression tests myself before opening the PR.
Generated-by: Claude Code (Claude Opus 4.7)
(backported from commit d5f5e12fb6879f15dbcf0c9cf6aaae3b532784e6)
Co-authored-by: Xuan Gu <[email protected]>
---
.../reservoirsampling/ReservoirSamplingOpExec.scala | 4 +++-
.../ReservoirSamplingOpExecSpec.scala | 21 +++++++++++++++++++++
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExec.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExec.scala
index 155b0c99b2..3ec8d017c8 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExec.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExec.scala
@@ -58,6 +58,8 @@ class ReservoirSamplingOpExec(descString: String, idx: Int,
workerCount: Int)
Iterator()
}
- override def onFinish(port: Int): Iterator[TupleLike] = reservoir.iterator
+ // Only the first n slots are filled when the input is smaller than the
reservoir;
+ // take(n) keeps the trailing unfilled (null) slots from being emitted.
+ override def onFinish(port: Int): Iterator[TupleLike] =
reservoir.iterator.take(n)
}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExecSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExecSpec.scala
index e517fc3da6..a4cb103ea7 100644
---
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExecSpec.scala
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExecSpec.scala
@@ -105,6 +105,27 @@ class ReservoirSamplingOpExecSpec extends AnyFlatSpec {
assert(emitted == List(tuple(0), tuple(1), tuple(2), tuple(3)))
}
+ it should "emit only the filled prefix, without null padding, when input
size < k" in {
+ val exec = newExec(k = 5)
+ val emitted = runFinish(exec, 0 until 2)
+ assert(emitted == List(tuple(0), tuple(1)), "only the received tuples are
emitted, in order")
+ assert(!emitted.contains(null), "the unfilled reservoir slots must not
leak as null tuples")
+ }
+
+ it should "emit nothing when the input stream is empty" in {
+ val exec = newExec(k = 5)
+ val emitted = runFinish(exec, Seq.empty)
+ assert(emitted.isEmpty, "an unfilled reservoir with no input emits no
(null) tuples")
+ }
+
+ it should "not emit null padding on a worker that receives fewer tuples than
its share" in {
+ // k=10 over 3 workers gives worker 0 a share of 4 (equallyPartitionGoal),
but skewed
+ // partitioning delivers it only 2 tuples; the 2 unfilled slots must not
surface as nulls.
+ val exec = newExec(k = 10, idx = 0, workerCount = 3)
+ val emitted = runFinish(exec, 0 until 2)
+ assert(emitted == List(tuple(0), tuple(1)))
+ }
+
it should "keep exactly k tuples, all drawn from the input, when input size
> k" in {
val exec = newExec(k = 5)
val input = 0 until 100