aglinxinyuan opened a new pull request, #4958:
URL: https://github.com/apache/texera/pull/4958

   ### What changes were proposed in this PR?
   
   Adds `NetworkOutputBufferSpec` covering `NetworkOutputBuffer` (defined in 
`amber/src/main/scala/org/apache/texera/amber/engine/architecture/sendsemantics/partitioners/Partitioner.scala`)
 — the per-receiver batched-tuple sender every concrete `Partitioner` subclass 
uses to push data downstream. The class has stateful buffer + flush semantics 
that nothing currently pins.
   
   | Surface | Pinned |
   | --- | --- |
   | Construction | `batchSize` default = 
`ApplicationConfig.defaultDataTransferBatchSize`; `to` / `dataOutputPort` 
exposed as immutable accessors; no implicit auto-flush at construction. |
   | `addTuple` | No flush below `batchSize`; exact-boundary auto-flush; one 
DataFrame per batch with tuples in insertion order; no cross-batch leakage; 
data-channel routing to configured receiver. |
   | `flush()` | Non-empty: sends a DataFrame, resets buffer. Empty: no-op (pin 
so a regression sending empty DataFrames breaks here). Sequence numbers 
monotonically increase across flushes. |
   | `flush()` + `sendState` share a sequence stream | Pin: DataFrame and 
StateFrame go through the same channel and share the gateway's per-channel 
sequence counter. A regression that side-channels StateFrame would produce a 
non-monotonic stream and fail this. |
   | `sendState` | Pre-flush bookend drains pending tuples FIRST as their own 
DataFrame, then sends StateFrame; pre-flush is a no-op when nothing is pending; 
trailing post-state flush leaves the buffer clean for subsequent `addTuple`. |
   | `batchSize` edge cases | `batchSize = 1` flushes after every `addTuple`. 
`batchSize = 0` collapses to the same behavior (the `>=` guard fires for any 
non-empty buffer); characterized so a future tightening to `>` breaks on 
purpose. |
   
   The test wires a **real** `NetworkOutputGateway` with a recording handler, 
so assertions exercise the production codepath end-to-end (sequence-number 
assignment, channel-id construction, payload-type routing) rather than mocking 
the gateway.
   
   No production code changed; this is test-only.
   
   ### Any related issues, documentation, discussions?
   
   Closes #4957
   
   ### How was this PR tested?
   
   ```
   sbt "WorkflowExecutionService/Test/testOnly 
org.apache.texera.amber.engine.architecture.sendsemantics.partitioners.NetworkOutputBufferSpec"
   # → 16 tests, all pass
   
   sbt "WorkflowExecutionService/Test/scalafmtCheck"
   # → clean
   ```
   
   ### Was this PR authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Code (Claude Opus 4.7)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to