suyashj1231 opened a new pull request, #5606:
URL: https://github.com/apache/texera/pull/5606

   ### What changes were proposed in this PR?
   
   `ReservoirSamplingOpExec` allocates a fixed-size reservoir of length `count` 
(the per-worker share of `k`). When a worker receives fewer tuples than 
`count`, only the first `n` slots are filled, but `onFinish` returned the whole 
array, yielding `count - n` trailing `null` entries. The nulls are currently 
swallowed by a distant null-guard in `DataProcessor`, so the bug is latent — 
but the operator violates the "do not emit null tuples" contract and breaks if 
that guard is ever narrowed or bypassed.
   
   ```
   Before:  input < k  ->  onFinish emits [t0 .. tn-1, null, ..., null]  
(engine guard hides them)
   After:   input < k  ->  onFinish emits [t0 .. tn-1]                   (no 
nulls emitted at all)
   ```
   
   The fix emits only the filled prefix:
   
   ```scala
   override def onFinish(port: Int): Iterator[TupleLike] = 
reservoir.iterator.take(n)
   ```
   
   `take(n)` is a no-op when `n >= count` (input ≥ k), so the sampled output is 
unchanged in the normal case.
   
   ### Any related issues, documentation, discussions?
   
   Closes #5592
   
   ### How was this PR tested?
   
   Added three regression cases to `ReservoirSamplingOpExecSpec`:
   
   | Case | Asserts |
   | --- | --- |
   | `input size < k` | only the received tuples are emitted, in order, no 
nulls |
   | empty input | `onFinish` emits nothing |
   | skewed partitioning (`k=10`, 3 workers, worker 0 gets 2 tuples) | no null 
padding for an under-filled worker share |
   
   All three fail against the old `reservoir.iterator` and pass with 
`reservoir.iterator.take(n)`; the 9 pre-existing cases stay green (TDD red → 
green verified by stashing the source fix).
   
   ```
   sbt "WorkflowOperator/testOnly 
org.apache.texera.amber.operator.reservoirsampling.ReservoirSamplingOpExecSpec"
   # Tests: succeeded 12, failed 0, canceled 0, ignored 0, pending 0
   ```
   
   `sbt WorkflowOperator/scalafixAll` and `sbt WorkflowOperator/scalafmtAll` 
produce no further diff.
   
   ### Was this PR authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Code (Claude Opus 4.7), reviewed and verified by the 
author.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to