aglinxinyuan commented on code in PR #4958: URL: https://github.com/apache/texera/pull/4958#discussion_r3192316998
########## amber/src/test/scala/org/apache/texera/amber/engine/architecture/sendsemantics/partitioners/NetworkOutputBufferSpec.scala: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.sendsemantics.partitioners + +import org.apache.texera.amber.config.ApplicationConfig +import org.apache.texera.amber.core.state.State +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} +import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity +import org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway +import org.apache.texera.amber.engine.common.ambermessage.{ + DataFrame, + StateFrame, + WorkflowFIFOMessage +} +import org.scalatest.flatspec.AnyFlatSpec + +import scala.collection.mutable.ArrayBuffer + +class NetworkOutputBufferSpec extends AnyFlatSpec { + + // --- fixtures -------------------------------------------------------------- + + private val sender = ActorVirtualIdentity("sender") + private val receiver = ActorVirtualIdentity("receiver-1") + + private val intAttr = new Attribute("v", AttributeType.INTEGER) + private val schema: Schema = Schema().add(intAttr) + private def tuple(value: Int): Tuple = + Tuple.builder(schema).add(intAttr, value).build() + + /** Recording wrapper around a real `NetworkOutputGateway`. */ + private class Capture { + val messages: ArrayBuffer[WorkflowFIFOMessage] = ArrayBuffer.empty + val gateway: NetworkOutputGateway = + new NetworkOutputGateway(sender, m => messages += m) + } + + private def newBuffer(batchSize: Int = 4): (NetworkOutputBuffer, Capture) = { + val cap = new Capture + val buf = new NetworkOutputBuffer(receiver, cap.gateway, batchSize = batchSize) + (buf, cap) + } + + // --- construction defaults ------------------------------------------------- + + "NetworkOutputBuffer" should "default batchSize to ApplicationConfig.defaultDataTransferBatchSize" in { + val cap = new Capture + val buf = new NetworkOutputBuffer(receiver, cap.gateway) + assert(buf.batchSize == ApplicationConfig.defaultDataTransferBatchSize) + } + + it should "expose `to` and `dataOutputPort` as immutable accessors" in { + val cap = new Capture + val buf = new NetworkOutputBuffer(receiver, cap.gateway, batchSize = 4) + assert(buf.to == receiver) + assert(buf.dataOutputPort eq cap.gateway) + } + + it should "start with an empty buffer (no implicit auto-flush at construction)" in { + val (_, cap) = newBuffer() + assert(cap.messages.isEmpty) + } + + // --- addTuple buffering / auto-flush -------------------------------------- + + "NetworkOutputBuffer.addTuple" should "NOT flush while the buffer is below batchSize" in { + val (buf, cap) = newBuffer(batchSize = 4) + buf.addTuple(tuple(0)) + buf.addTuple(tuple(1)) + buf.addTuple(tuple(2)) + assert(cap.messages.isEmpty, "no DataFrame should be sent until batchSize is reached") + } + + it should "auto-flush when the buffer exactly reaches batchSize" in { + val (buf, cap) = newBuffer(batchSize = 3) + buf.addTuple(tuple(0)) + buf.addTuple(tuple(1)) + buf.addTuple(tuple(2)) // boundary: now size == batchSize + assert(cap.messages.size == 1, "exactly one DataFrame should be auto-flushed at the boundary") + val frame = cap.messages.head.payload.asInstanceOf[DataFrame] + assert(frame.frame.toList == List(tuple(0), tuple(1), tuple(2))) + } + + it should "produce a separate DataFrame for each successive batch" in { + val (buf, cap) = newBuffer(batchSize = 2) + (0 until 6).foreach(i => buf.addTuple(tuple(i))) + assert(cap.messages.size == 3, "three full batches → three DataFrames") + val payloads = cap.messages.map(_.payload.asInstanceOf[DataFrame].frame.toList) + assert(payloads.head == List(tuple(0), tuple(1))) + assert(payloads(1) == List(tuple(2), tuple(3))) + assert(payloads(2) == List(tuple(4), tuple(5))) + } + + it should "send DataFrames to the configured receiver only" in { + val (buf, cap) = newBuffer(batchSize = 2) + buf.addTuple(tuple(0)) + buf.addTuple(tuple(1)) + assert(cap.messages.size == 1) + val msg = cap.messages.head + assert(msg.channelId.fromWorkerId == sender) + assert(msg.channelId.toWorkerId == receiver) + assert(!msg.channelId.isControl, "data path must not use the control channel") + } + + // --- flush() ---------------------------------------------------------------- + + "NetworkOutputBuffer.flush" should "send a DataFrame and reset the buffer when the buffer is non-empty" in { + val (buf, cap) = newBuffer(batchSize = 100) // never auto-flushes + buf.addTuple(tuple(7)) + buf.addTuple(tuple(8)) + buf.flush() + assert(cap.messages.size == 1) + val frame = cap.messages.head.payload.asInstanceOf[DataFrame] + assert(frame.frame.toList == List(tuple(7), tuple(8))) + // A second flush() with nothing buffered must not send another frame. + buf.flush() + assert(cap.messages.size == 1, "flush() on an empty buffer must be a no-op") + } + + it should "be a no-op when called on an empty buffer (no DataFrame, no StateFrame)" in { + val (buf, cap) = newBuffer() + buf.flush() + buf.flush() + buf.flush() + assert(cap.messages.isEmpty) + } + + it should "assign monotonically increasing sequence numbers across multiple flushes" in { + // The gateway tracks sequence numbers per channel; each successive + // DataFrame on the same channel gets the next number. Pin so a + // regression that resets seq on flush is visible. + val (buf, cap) = newBuffer(batchSize = 1) // each addTuple flushes + (0 until 4).foreach(i => buf.addTuple(tuple(i))) + val seqs = cap.messages.map(_.sequenceNumber).toList + assert(seqs == List(0L, 1L, 2L, 3L), s"unexpected sequence: $seqs") + } + + // --- sendState ---------------------------------------------------------- + + "NetworkOutputBuffer.sendState" should "flush pending tuples FIRST, then send the StateFrame" in { + val (buf, cap) = newBuffer(batchSize = 100) + buf.addTuple(tuple(0)) + buf.addTuple(tuple(1)) + val state = State(Map("checkpoint" -> 99)) + buf.sendState(state) + // Expected order: DataFrame (the buffered tuples) → StateFrame. + assert(cap.messages.size == 2) + val first = cap.messages.head.payload + val second = cap.messages(1).payload + assert(first.isInstanceOf[DataFrame], s"first frame should be DataFrame, got $first") + assert(first.asInstanceOf[DataFrame].frame.toList == List(tuple(0), tuple(1))) + assert(second == StateFrame(state)) + } + + it should "send only the StateFrame when no tuples are pending (empty pre-flush is a no-op)" in { + val (buf, cap) = newBuffer() + val state = State(Map("k" -> "v")) + buf.sendState(state) + assert(cap.messages.size == 1) + assert(cap.messages.head.payload == StateFrame(state)) + } + + it should "leave the tuple buffer empty after sendState (trailing flush no-op)" in { + // sendState calls flush() AFTER sending the state too. Pin that the + // trailing flush doesn't double-send and that subsequent addTuple + // starts from a clean buffer. + val (buf, cap) = newBuffer(batchSize = 100) + buf.addTuple(tuple(0)) + buf.sendState(State(Map.empty)) + val countBefore = cap.messages.size // DataFrame + StateFrame = 2 + assert(countBefore == 2) + // Add another tuple and explicit flush — must produce one fresh frame. + buf.addTuple(tuple(99)) + buf.flush() + assert(cap.messages.size == 3) + val third = cap.messages(2).payload.asInstanceOf[DataFrame] + assert(third.frame.toList == List(tuple(99)), "post-state buffer must start empty") + } + + it should "share a single sequence-number stream across DataFrames and the StateFrame on the same channel" in { + // Pin: DataFrame and StateFrame go through the same `sendTo` path on + // the same channel, so they share the gateway's sequence-number + // counter. A regression that opens a side-channel for StateFrame + // would produce a non-monotonic stream and fail this. + val (buf, cap) = newBuffer(batchSize = 100) + buf.addTuple(tuple(0)) + buf.addTuple(tuple(1)) + buf.sendState(State(Map("x" -> 1))) + buf.addTuple(tuple(2)) + buf.flush() + val seqs = cap.messages.map(_.sequenceNumber).toList + assert(seqs == List(0L, 1L, 2L), s"unexpected sequence: $seqs") + } + + // --- batchSize edge cases ------------------------------------------------- + + "NetworkOutputBuffer with batchSize = 1" should "flush immediately after every addTuple" in { + val (buf, cap) = newBuffer(batchSize = 1) + buf.addTuple(tuple(0)) + assert(cap.messages.size == 1) + buf.addTuple(tuple(1)) + assert(cap.messages.size == 2) + val frames = cap.messages.toList.map(_.payload.asInstanceOf[DataFrame].frame.toList) + assert(frames == List(List(tuple(0)), List(tuple(1)))) + } + + // NOTE: `batchSize <= 0` is not exercised here. The workflow-settings UI + // (frontend/.../settings/settings.component.ts) restricts data-transfer + // batch size to `>= 1` before it ever reaches `NetworkOutputBuffer`, so + // a value of 0 is not reachable from production. Pinning a + // characterization for `batchSize = 0` would enshrine an implementation + // detail (the `>=` guard) and block a future hardening change that + // rejects the invalid value at construction time. Review Comment: Verified your point at SyncExecutionResource.scala:159 — the server accepts `request.workflowSettings` directly with no `>= 1` validation, so `batchSize=0` is reachable from a direct API caller even though the UI restricts it. My previous NOTE was misleading. Done in 8ba25f084e — replaced the NOTE with an accurate description of the reachable-from-API reality, plus a `pendingUntilFixed` test asserting the desired hardening (construction with `batchSize <= 0` should throw `IllegalArgumentException`, e.g. via a `require(batchSize > 0, ...)` guard). Once the hardening lands the test will pass, and `pendingUntilFixed` will invert that into a deliberate failure forcing the marker to be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
