This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5606-227cbd73960afbcaa734b30f3ac108dc669324f3
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 0d8f9a7b8a210447364c43cbfe8aa3fe0d1f8cd5
Author: Suyash Jain <[email protected]>
AuthorDate: Fri Jun 12 13:05:06 2026 -0700

    fix(workflow-operator): no null padding in reservoir sampling (#5606)
    
    ### What changes were proposed in this PR?
    
    `ReservoirSamplingOpExec` allocates a fixed-size reservoir of length
    `count` (the per-worker share of `k`). When a worker receives fewer
    tuples than `count`, only the first `n` slots are filled, but `onFinish`
    returned the whole array, yielding `count - n` trailing `null` entries.
    The nulls are currently swallowed by a distant null-guard in
    `DataProcessor`, so the bug is latent — but the operator violates the
    "do not emit null tuples" contract and breaks if that guard is ever
    narrowed or bypassed.
    
    ```
    Before:  input < k  ->  onFinish emits [t0 .. tn-1, null, ..., null]  
(engine guard hides them)
    After:   input < k  ->  onFinish emits [t0 .. tn-1]                   (no 
nulls emitted at all)
    ```
    
    The fix emits only the filled prefix:
    
    ```scala
    override def onFinish(port: Int): Iterator[TupleLike] = 
reservoir.iterator.take(n)
    ```
    
    `take(n)` is a no-op when `n >= count` (input ≥ k), so the sampled
    output is unchanged in the normal case.
    
    ### Any related issues, documentation, discussions?
    
    Closes #5592
    
    ### How was this PR tested?
    
    Added three regression cases to `ReservoirSamplingOpExecSpec`:
    
    | Case | Asserts |
    | --- | --- |
    | `input size < k` | only the received tuples are emitted, in order, no
    nulls |
    | empty input | `onFinish` emits nothing |
    | skewed partitioning (`k=10`, 3 workers, worker 0 gets 2 tuples) | no
    null padding for an under-filled worker share |
    
    All three fail against the old `reservoir.iterator` and pass with
    `reservoir.iterator.take(n)`; the 9 pre-existing cases stay green (TDD
    red → green verified by stashing the source fix).
    
    ```
    sbt "WorkflowOperator/testOnly 
org.apache.texera.amber.operator.reservoirsampling.ReservoirSamplingOpExecSpec"
    # Tests: succeeded 12, failed 0, canceled 0, ignored 0, pending 0
    ```
    
    `sbt WorkflowOperator/scalafixAll` and `sbt
    WorkflowOperator/scalafmtAll` produce no further diff.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Yes, partially. I (Suyash Jain) worked on this PR together with Claude
    Code as a pair-programming assistant. I reviewed the final diff, ran the
    spec locally, and verified the red → green behavior of the new
    regression tests myself before opening the PR.
    
    Generated-by: Claude Code (Claude Opus 4.7)
    
    Co-authored-by: Xuan Gu <[email protected]>
---
 .../reservoirsampling/ReservoirSamplingOpExec.scala |  4 +++-
 .../ReservoirSamplingOpExecSpec.scala               | 21 +++++++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExec.scala
index 155b0c99b2..3ec8d017c8 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExec.scala
@@ -58,6 +58,8 @@ class ReservoirSamplingOpExec(descString: String, idx: Int, 
workerCount: Int)
     Iterator()
   }
 
-  override def onFinish(port: Int): Iterator[TupleLike] = reservoir.iterator
+  // Only the first n slots are filled when the input is smaller than the 
reservoir;
+  // take(n) keeps the trailing unfilled (null) slots from being emitted.
+  override def onFinish(port: Int): Iterator[TupleLike] = 
reservoir.iterator.take(n)
 
 }
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExecSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExecSpec.scala
index e517fc3da6..a4cb103ea7 100644
--- 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExecSpec.scala
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/reservoirsampling/ReservoirSamplingOpExecSpec.scala
@@ -105,6 +105,27 @@ class ReservoirSamplingOpExecSpec extends AnyFlatSpec {
     assert(emitted == List(tuple(0), tuple(1), tuple(2), tuple(3)))
   }
 
+  it should "emit only the filled prefix, without null padding, when input 
size < k" in {
+    val exec = newExec(k = 5)
+    val emitted = runFinish(exec, 0 until 2)
+    assert(emitted == List(tuple(0), tuple(1)), "only the received tuples are 
emitted, in order")
+    assert(!emitted.contains(null), "the unfilled reservoir slots must not 
leak as null tuples")
+  }
+
+  it should "emit nothing when the input stream is empty" in {
+    val exec = newExec(k = 5)
+    val emitted = runFinish(exec, Seq.empty)
+    assert(emitted.isEmpty, "an unfilled reservoir with no input emits no 
(null) tuples")
+  }
+
+  it should "not emit null padding on a worker that receives fewer tuples than 
its share" in {
+    // k=10 over 3 workers gives worker 0 a share of 4 (equallyPartitionGoal), 
but skewed
+    // partitioning delivers it only 2 tuples; the 2 unfilled slots must not 
surface as nulls.
+    val exec = newExec(k = 10, idx = 0, workerCount = 3)
+    val emitted = runFinish(exec, 0 until 2)
+    assert(emitted == List(tuple(0), tuple(1)))
+  }
+
   it should "keep exactly k tuples, all drawn from the input, when input size 
> k" in {
     val exec = newExec(k = 5)
     val input = 0 until 100

Reply via email to