Ma77Ball opened a new issue, #5409:
URL: https://github.com/apache/texera/issues/5409

   ### What happened?
   
   `ReservoirSamplingOpExec` emits `null` "tuples" from `onFinish` when the 
input stream contains **fewer tuples than k** (the per-worker reservoir size).
   
   The reservoir is a fixed-size array allocated to `count` (this worker's 
share of k):
   
   ```scala
   // 
common/workflow-operator/.../reservoirsampling/ReservoirSamplingOpExec.scala
   override def open(): Unit = {
     n = 0
     reservoir = Array.ofDim(count)   // fixed size = count
   }
   override def onFinish(port: Int): Iterator[TupleLike] = reservoir.iterator
   ```
   
   When `n < count`, `processTuple` only fills `reservoir(0 until n)`; the 
trailing `count - n` slots stay `null`. `onFinish` returns 
`reservoir.iterator`, so it yields those `null` slots into the operator's 
output stream.
   
   **Expected:** when fewer than k tuples arrive, the operator should emit 
exactly the `n` buffered tuples and nothing else (reservoir sampling of a 
stream shorter than k returns the whole stream).
   
   ### Severity and current impact
   
   **Low, currently latent, not crashing.** The emitted nulls are silently 
dropped downstream by a defensive guard in the worker:
   
   ```scala
   // amber/.../worker/DataProcessor.scala  (outputOneTuple)
   if (outputTuple == null) return   // line 157
   ```
   
   This guard sits *before* `passTupleToDownstream`, storage writes, and 
output-count statistics, so today there is no NPE, no corrupt output, and the 
row count is correct. The bug still matters because:
   
   - It violates the operator contract (`onFinish` should emit valid tuples) 
and relies entirely on a defensive null-check elsewhere in the engine. Any 
consumer that iterates the operator output without that guard (a refactor, a 
different execution path, direct unit testing) would observe or mishandle the 
nulls.
   - It over-allocates and iterates dead reservoir slots.
   
   ### How to reproduce?
   
   Construct the executor with `k = 5`, feed 3 tuples, then drain `onFinish`:
   
   ```
   processTuple(t0); processTuple(t1); processTuple(t2)
   onFinish(0).toList  // => List(t0, t1, t2, null, null)   <- two trailing 
nulls
   ```
   
   (The non-null prefix is correct; the trailing nulls are the bug.)
   
   ### Suggested fix
   
   Emit only the filled slots, a one-line change in `onFinish`:
   
   ```scala
   override def onFinish(port: Int): Iterator[TupleLike] = 
reservoir.iterator.take(n)
   // or: reservoir.iterator.filter(_ != null)
   ```
   
   `take(n)` is preferable: it is O(n), needs no per-element null check, and 
`n` already tracks how many slots are populated.
   
   ### Branch
   
   main
   
   ### Commit Hash (Optional)
   
   7deed35fd
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to