Copilot commented on code in PR #4958:
URL: https://github.com/apache/texera/pull/4958#discussion_r3192171099


##########
amber/src/test/scala/org/apache/texera/amber/engine/architecture/sendsemantics/partitioners/NetworkOutputBufferSpec.scala:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.sendsemantics.partitioners
+
+import org.apache.texera.amber.config.ApplicationConfig
+import org.apache.texera.amber.core.state.State
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
+import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity}

Review Comment:
   `ChannelIdentity` is imported here but never used. The amber module enables 
`-Ywarn-unused:imports` in `amber/build.sbt`, so keeping this import adds 
avoidable compiler noise to the test target.
   



##########
amber/src/test/scala/org/apache/texera/amber/engine/architecture/sendsemantics/partitioners/NetworkOutputBufferSpec.scala:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.sendsemantics.partitioners
+
+import org.apache.texera.amber.config.ApplicationConfig
+import org.apache.texera.amber.core.state.State
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
+import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity}
+import 
org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway
+import org.apache.texera.amber.engine.common.ambermessage.{
+  DataFrame,
+  StateFrame,
+  WorkflowFIFOMessage
+}
+import org.scalatest.flatspec.AnyFlatSpec
+
+import scala.collection.mutable.ArrayBuffer
+
+class NetworkOutputBufferSpec extends AnyFlatSpec {
+
+  // --- fixtures 
--------------------------------------------------------------
+
+  private val sender = ActorVirtualIdentity("sender")
+  private val receiver = ActorVirtualIdentity("receiver-1")
+
+  private val intAttr = new Attribute("v", AttributeType.INTEGER)
+  private val schema: Schema = Schema().add(intAttr)
+  private def tuple(value: Int): Tuple =
+    Tuple.builder(schema).add(intAttr, value).build()
+
+  /** Recording wrapper around a real `NetworkOutputGateway`. */
+  private class Capture {
+    val messages: ArrayBuffer[WorkflowFIFOMessage] = ArrayBuffer.empty
+    val gateway: NetworkOutputGateway =
+      new NetworkOutputGateway(sender, m => messages += m)
+  }
+
+  private def newBuffer(batchSize: Int = 4): (NetworkOutputBuffer, Capture) = {
+    val cap = new Capture
+    val buf = new NetworkOutputBuffer(receiver, cap.gateway, batchSize = 
batchSize)
+    (buf, cap)
+  }
+
+  // --- construction defaults 
-------------------------------------------------
+
+  "NetworkOutputBuffer" should "default batchSize to 
ApplicationConfig.defaultDataTransferBatchSize" in {
+    val cap = new Capture
+    val buf = new NetworkOutputBuffer(receiver, cap.gateway)
+    assert(buf.batchSize == ApplicationConfig.defaultDataTransferBatchSize)
+  }
+
+  it should "expose `to` and `dataOutputPort` as immutable accessors" in {
+    val cap = new Capture
+    val buf = new NetworkOutputBuffer(receiver, cap.gateway, batchSize = 4)
+    assert(buf.to == receiver)
+    assert(buf.dataOutputPort eq cap.gateway)
+  }
+
+  it should "start with an empty buffer (no implicit auto-flush at 
construction)" in {
+    val (_, cap) = newBuffer()
+    assert(cap.messages.isEmpty)
+  }
+
+  // --- addTuple buffering / auto-flush --------------------------------------
+
+  "NetworkOutputBuffer.addTuple" should "NOT flush while the buffer is below 
batchSize" in {
+    val (buf, cap) = newBuffer(batchSize = 4)
+    buf.addTuple(tuple(0))
+    buf.addTuple(tuple(1))
+    buf.addTuple(tuple(2))
+    assert(cap.messages.isEmpty, "no DataFrame should be sent until batchSize 
is reached")
+  }
+
+  it should "auto-flush when the buffer exactly reaches batchSize" in {
+    val (buf, cap) = newBuffer(batchSize = 3)
+    buf.addTuple(tuple(0))
+    buf.addTuple(tuple(1))
+    buf.addTuple(tuple(2)) // boundary: now size == batchSize
+    assert(cap.messages.size == 1, "exactly one DataFrame should be 
auto-flushed at the boundary")
+    val frame = cap.messages.head.payload.asInstanceOf[DataFrame]
+    assert(frame.frame.toList == List(tuple(0), tuple(1), tuple(2)))
+  }
+
+  it should "produce a separate DataFrame for each successive batch" in {
+    val (buf, cap) = newBuffer(batchSize = 2)
+    (0 until 6).foreach(i => buf.addTuple(tuple(i)))
+    assert(cap.messages.size == 3, "three full batches → three DataFrames")
+    val payloads = 
cap.messages.map(_.payload.asInstanceOf[DataFrame].frame.toList)
+    assert(payloads.head == List(tuple(0), tuple(1)))
+    assert(payloads(1) == List(tuple(2), tuple(3)))
+    assert(payloads(2) == List(tuple(4), tuple(5)))
+  }
+
+  it should "send DataFrames to the configured receiver only" in {
+    val (buf, cap) = newBuffer(batchSize = 2)
+    buf.addTuple(tuple(0))
+    buf.addTuple(tuple(1))
+    assert(cap.messages.size == 1)
+    val msg = cap.messages.head
+    assert(msg.channelId.fromWorkerId == sender)
+    assert(msg.channelId.toWorkerId == receiver)
+    assert(!msg.channelId.isControl, "data path must not use the control 
channel")
+  }
+
+  // --- flush() 
----------------------------------------------------------------
+
+  "NetworkOutputBuffer.flush" should "send a DataFrame and reset the buffer 
when the buffer is non-empty" in {
+    val (buf, cap) = newBuffer(batchSize = 100) // never auto-flushes
+    buf.addTuple(tuple(7))
+    buf.addTuple(tuple(8))
+    buf.flush()
+    assert(cap.messages.size == 1)
+    val frame = cap.messages.head.payload.asInstanceOf[DataFrame]
+    assert(frame.frame.toList == List(tuple(7), tuple(8)))
+    // A second flush() with nothing buffered must not send another frame.
+    buf.flush()
+    assert(cap.messages.size == 1, "flush() on an empty buffer must be a 
no-op")
+  }
+
+  it should "be a no-op when called on an empty buffer (no DataFrame, no 
StateFrame)" in {
+    val (buf, cap) = newBuffer()
+    buf.flush()
+    buf.flush()
+    buf.flush()
+    assert(cap.messages.isEmpty)
+  }
+
+  it should "assign monotonically increasing sequence numbers across multiple 
flushes" in {
+    // The gateway tracks sequence numbers per channel; each successive
+    // DataFrame on the same channel gets the next number. Pin so a
+    // regression that resets seq on flush is visible.
+    val (buf, cap) = newBuffer(batchSize = 1) // each addTuple flushes
+    (0 until 4).foreach(i => buf.addTuple(tuple(i)))
+    val seqs = cap.messages.map(_.sequenceNumber).toList
+    assert(seqs == List(0L, 1L, 2L, 3L), s"unexpected sequence: $seqs")
+  }
+
+  // --- sendState ----------------------------------------------------------
+
+  "NetworkOutputBuffer.sendState" should "flush pending tuples FIRST, then 
send the StateFrame" in {
+    val (buf, cap) = newBuffer(batchSize = 100)
+    buf.addTuple(tuple(0))
+    buf.addTuple(tuple(1))
+    val state = State(Map("checkpoint" -> 99))
+    buf.sendState(state)
+    // Expected order: DataFrame (the buffered tuples) → StateFrame.
+    assert(cap.messages.size == 2)
+    val first = cap.messages.head.payload
+    val second = cap.messages(1).payload
+    assert(first.isInstanceOf[DataFrame], s"first frame should be DataFrame, 
got $first")
+    assert(first.asInstanceOf[DataFrame].frame.toList == List(tuple(0), 
tuple(1)))
+    assert(second == StateFrame(state))
+  }
+
+  it should "send only the StateFrame when no tuples are pending (empty 
pre-flush is a no-op)" in {
+    val (buf, cap) = newBuffer()
+    val state = State(Map("k" -> "v"))
+    buf.sendState(state)
+    assert(cap.messages.size == 1)
+    assert(cap.messages.head.payload == StateFrame(state))
+  }
+
+  it should "leave the tuple buffer empty after sendState (trailing flush 
no-op)" in {
+    // sendState calls flush() AFTER sending the state too. Pin that the
+    // trailing flush doesn't double-send and that subsequent addTuple
+    // starts from a clean buffer.
+    val (buf, cap) = newBuffer(batchSize = 100)
+    buf.addTuple(tuple(0))
+    buf.sendState(State(Map.empty))
+    val countBefore = cap.messages.size // DataFrame + StateFrame = 2
+    assert(countBefore == 2)
+    // Add another tuple and explicit flush — must produce one fresh frame.
+    buf.addTuple(tuple(99))
+    buf.flush()
+    assert(cap.messages.size == 3)
+    val third = cap.messages(2).payload.asInstanceOf[DataFrame]
+    assert(third.frame.toList == List(tuple(99)), "post-state buffer must 
start empty")
+  }
+
+  it should "share a single sequence-number stream across DataFrames and the 
StateFrame on the same channel" in {
+    // Pin: DataFrame and StateFrame go through the same `sendTo` path on
+    // the same channel, so they share the gateway's sequence-number
+    // counter. A regression that opens a side-channel for StateFrame
+    // would produce a non-monotonic stream and fail this.
+    val (buf, cap) = newBuffer(batchSize = 100)
+    buf.addTuple(tuple(0))
+    buf.addTuple(tuple(1))
+    buf.sendState(State(Map("x" -> 1)))
+    buf.addTuple(tuple(2))
+    buf.flush()
+    val seqs = cap.messages.map(_.sequenceNumber).toList
+    assert(seqs == List(0L, 1L, 2L), s"unexpected sequence: $seqs")
+  }
+
+  // --- batchSize edge cases -------------------------------------------------
+
+  "NetworkOutputBuffer with batchSize = 1" should "flush immediately after 
every addTuple" in {
+    val (buf, cap) = newBuffer(batchSize = 1)
+    buf.addTuple(tuple(0))
+    assert(cap.messages.size == 1)
+    buf.addTuple(tuple(1))
+    assert(cap.messages.size == 2)
+    val frames = 
cap.messages.toList.map(_.payload.asInstanceOf[DataFrame].frame.toList)
+    assert(frames == List(List(tuple(0)), List(tuple(1))))
+  }
+
+  "NetworkOutputBuffer with batchSize = 0" should
+    "flush immediately after every addTuple (the `>=` guard fires for any 
non-empty buffer)" in {
+    // Pin the corner-case behavior: `if (buffer.size >= batchSize)` is
+    // true for size >= 0 = true after any append, so batchSize=0 collapses
+    // to "flush every tuple individually" — same observable as batchSize=1.
+    // A future tightening to `> 0` would break this on purpose.
+    val (buf, cap) = newBuffer(batchSize = 0)
+    buf.addTuple(tuple(42))
+    assert(cap.messages.size == 1)
+    val frame = cap.messages.head.payload.asInstanceOf[DataFrame]
+    assert(frame.frame.toList == List(tuple(42)))

Review Comment:
   This assertion is pinning `batchSize = 0` as supported behavior even though 
the workflow settings UI already treats data-transfer batch size as `>= 1` 
(`frontend/src/app/workspace/component/left-panel/settings/settings.component.ts`
 and `.html`). Enshrining the current `>=` implementation detail here will make 
a sensible future hardening/validation change fail as a regression instead of 
letting us reject an invalid configuration.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to