This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5553-d76a51e347f54c6c3ff43a7f8cd11f14ae5739ea in repository https://gitbox.apache.org/repos/asf/texera.git
commit 7e9cabf2bf4edc0540ab7397dd644bd96cc2a042 Author: Xinyuan Lin <[email protected]> AuthorDate: Fri Jun 12 10:20:18 2026 -0700 test(amber): add unit test coverage for WorkerBatchInternalQueue (#5553) ### What changes were proposed in this PR? Pin behavior of `WorkerBatchInternalQueue` — the per-DP-thread mailbox trait used by the Python worker. Previously uncovered; the only uncovered module in the `pythonworker` package whose contract is unit-testable without standing up a real Python subprocess. No production-code changes. | Spec | Source class | Tests | | --- | --- | --- | | `WorkerBatchInternalQueueSpec` | `WorkerBatchInternalQueue` (trait + companion) | 17 | Spec file name follows the `<srcClassName>Spec.scala` one-to-one convention. **Behavior pinned** | Surface | Contract | | --- | --- | | `enqueueData` + `getElement` | round-trip a `DataElement` (with both `DataFrame` and `StateFrame` payloads) | | `enqueueCommand` + `getElement` | round-trip a `ControlElement` | | `enqueueActorCommand` + `getElement` | round-trip an `ActorCommandElement` | | Multi-priority dispatch | control elements are returned **before** data elements when both are queued (sub-queue 0 < 1) | | FIFO within the control queue | `ControlElement` enqueued first comes out before `ActorCommandElement` enqueued second | | `getDataQueueLength` | reports only data-queue items (control is excluded) | | `getControlQueueLength` / `isControlQueueEmpty` | report all control-queue items (`ControlElement` + `ActorCommandElement`) | | `disableDataQueue` | hides queued data from `getElement` until `enableDataQueue` is called; control flow still moves | | `getQueuedCredit(sender)` | `0` initially; tracks bytes-in minus bytes-out for `DataFrame` payloads per sender; stays `0` for control / `StateFrame` payloads; per-sender accounting is independent; accumulates across multiple enqueues for the same sender | | Companion constants | `CONTROL_QUEUE == 0`, `DATA_QUEUE == 1`, and `CONTROL_QUEUE < DATA_QUEUE` (relied on by the multi-priority semantics) | The trait is exercised through a small test-only subclass (`class TestQueue extends WorkerBatchInternalQueue`), with `DirectControlMessagePayload` represented by a local marker case object since the production trait carries no behavior. ### Any related issues, documentation, discussions? Closes #5552. ### How was this PR tested? Pure unit-test addition; verified locally with: - `sbt "WorkflowExecutionService/testOnly org.apache.texera.amber.engine.architecture.pythonworker.WorkerBatchInternalQueueSpec"` — 17 tests, all green - `sbt scalafmtCheckAll` — clean - CI to confirm ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Sonnet 4.5) --------- Signed-off-by: Yicong Huang <[email protected]> Co-authored-by: Yicong Huang <[email protected]> Co-authored-by: Copilot Autofix powered by AI <[email protected]> --- .../WorkerBatchInternalQueueSpec.scala | 271 +++++++++++++++++++++ 1 file changed, 271 insertions(+) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/WorkerBatchInternalQueueSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/WorkerBatchInternalQueueSpec.scala new file mode 100644 index 0000000000..e2d2363ed7 --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/WorkerBatchInternalQueueSpec.scala @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.pythonworker + +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} +import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} +import org.apache.texera.amber.engine.architecture.pythonworker.WorkerBatchInternalQueue._ +import org.apache.texera.amber.engine.common.actormessage.ActorCommand +import org.apache.texera.amber.engine.common.ambermessage.{ + DataFrame, + DirectControlMessagePayload, + StateFrame +} +import org.scalatest.flatspec.AnyFlatSpec + +class WorkerBatchInternalQueueSpec extends AnyFlatSpec { + + // --------------------------------------------------------------------------- + // Test harness — WorkerBatchInternalQueue is a trait. Mix it into an + // otherwise-empty class so we can drive its public surface directly. + // --------------------------------------------------------------------------- + + private class TestQueue extends WorkerBatchInternalQueue + + private def channelId(from: String, to: String): ChannelIdentity = + ChannelIdentity(ActorVirtualIdentity(from), ActorVirtualIdentity(to), isControl = false) + + // A one-attribute schema is enough to build real Tuple instances for + // DataFrame byte-accounting tests. + private val intAttr = new Attribute("v", AttributeType.INTEGER) + private val schema: Schema = Schema().add(intAttr) + private def tuple(value: Int): Tuple = + Tuple.builder(schema).add(intAttr, Integer.valueOf(value)).build() + + // Marker DirectControlMessagePayload for ControlElement tests. The + // trait is empty (just a marker), so any concrete value works. + private case object DummyCtrl extends DirectControlMessagePayload + + // --------------------------------------------------------------------------- + // enqueue + getElement — type-preserving round-trip + // --------------------------------------------------------------------------- + + "WorkerBatchInternalQueue.enqueueData + getElement" should + "round-trip a DataElement carrying a DataFrame" in { + val q = new TestQueue + val cid = channelId("up", "self") + val frame = DataFrame(Array(tuple(1), tuple(2))) + q.enqueueData(DataElement(frame, cid)) + val out = q.getElement + assert(out == DataElement(frame, cid)) + } + + it should "round-trip a DataElement carrying a non-DataFrame DataPayload (e.g. StateFrame)" in { + // The byte-accounting path is `case frame: DataFrame =>`; a non- + // DataFrame DataPayload (StateFrame) must still pass through + // enqueue/dequeue cleanly even though credit is not updated. + val q = new TestQueue + val cid = channelId("up", "self") + val state = StateFrame(org.apache.texera.amber.core.state.State(Map.empty)) + q.enqueueData(DataElement(state, cid)) + val out = q.getElement + assert(out == DataElement(state, cid)) + } + + "WorkerBatchInternalQueue.enqueueCommand + getElement" should + "round-trip a ControlElement" in { + val q = new TestQueue + val cid = channelId("up", "self") + q.enqueueCommand(DummyCtrl, cid) + val out = q.getElement + assert(out == ControlElement(DummyCtrl, cid)) + } + + "WorkerBatchInternalQueue.enqueueActorCommand + getElement" should + "round-trip an ActorCommandElement (control-priority lane)" in { + val q = new TestQueue + q.enqueueActorCommand(ActorCommand.Empty) + val out = q.getElement + assert(out == ActorCommandElement(ActorCommand.Empty)) + } + + // --------------------------------------------------------------------------- + // Multi-priority dispatch — control queue (priority 0) wins over data (1) + // --------------------------------------------------------------------------- + + "WorkerBatchInternalQueue.getElement" should + "drain the control queue before the data queue (lower numeric priority wins)" in { + val q = new TestQueue + val cid = channelId("up", "self") + // Enqueue data FIRST, then control — getElement must still return + // control first because its sub-queue has priority 0 < 1. + q.enqueueData(DataElement(DataFrame(Array(tuple(1))), cid)) + q.enqueueCommand(DummyCtrl, cid) + val first = q.getElement + val second = q.getElement + assert(first.isInstanceOf[ControlElement], s"expected control first, got $first") + assert(second.isInstanceOf[DataElement], s"expected data second, got $second") + } + + it should "interleave actor-command and direct-control items as they were enqueued" in { + // Both ActorCommandElement and ControlElement go into the control + // queue (priority 0). Within a single queue, FIFO order applies. + val q = new TestQueue + val cid = channelId("up", "self") + q.enqueueCommand(DummyCtrl, cid) + q.enqueueActorCommand(ActorCommand.Empty) + val first = q.getElement + val second = q.getElement + assert(first == ControlElement(DummyCtrl, cid)) + assert(second == ActorCommandElement(ActorCommand.Empty)) + } + + // --------------------------------------------------------------------------- + // Queue-length accounting + // --------------------------------------------------------------------------- + + "WorkerBatchInternalQueue.getDataQueueLength" should + "track only data-queue items (excludes control)" in { + val q = new TestQueue + val cid = channelId("up", "self") + assert(q.getDataQueueLength == 0) + q.enqueueData(DataElement(DataFrame(Array.empty), cid)) + q.enqueueData(DataElement(DataFrame(Array.empty), cid)) + // Control items must NOT inflate the data-queue count. + q.enqueueCommand(DummyCtrl, cid) + assert(q.getDataQueueLength == 2) + } + + "WorkerBatchInternalQueue.getControlQueueLength" should + "track every control-queue item (ControlElement + ActorCommandElement)" in { + val q = new TestQueue + val cid = channelId("up", "self") + assert(q.getControlQueueLength == 0) + q.enqueueCommand(DummyCtrl, cid) + q.enqueueActorCommand(ActorCommand.Empty) + q.enqueueData(DataElement(DataFrame(Array.empty), cid)) + assert(q.getControlQueueLength == 2) + } + + "WorkerBatchInternalQueue.isControlQueueEmpty" should + "be true initially and after every control item is consumed" in { + val q = new TestQueue + val cid = channelId("up", "self") + assert(q.isControlQueueEmpty) + q.enqueueCommand(DummyCtrl, cid) + assert(!q.isControlQueueEmpty) + q.getElement // drains the control item + assert(q.isControlQueueEmpty) + } + + // --------------------------------------------------------------------------- + // disable / enable data queue — control flow keeps moving + // --------------------------------------------------------------------------- + + "WorkerBatchInternalQueue.disableDataQueue" should + "keep enqueued data invisible to getElement until enableDataQueue is called" in { + val q = new TestQueue + val cid = channelId("up", "self") + q.enqueueData(DataElement(DataFrame(Array(tuple(1))), cid)) + q.disableDataQueue() + // With the data queue disabled, the only enqueued item is data — + // queue an inert control item, getElement must yield the control + // item even though data was enqueued FIRST. + q.enqueueCommand(DummyCtrl, cid) + val out = q.getElement + assert(out.isInstanceOf[ControlElement], s"expected control while data is disabled, got $out") + // Re-enable; the originally-enqueued data is now reachable. + q.enableDataQueue() + val nextOut = q.getElement + assert(nextOut.isInstanceOf[DataElement]) + } + + // --------------------------------------------------------------------------- + // getQueuedCredit — byte-accounting for DataFrame payloads + // --------------------------------------------------------------------------- + + "WorkerBatchInternalQueue.getQueuedCredit" should + "report 0 for any sender when nothing has been enqueued" in { + val q = new TestQueue + assert(q.getQueuedCredit(channelId("a", "b")) == 0L) + assert(q.getQueuedCredit(channelId("c", "d")) == 0L) + } + + it should "track bytes-in minus bytes-out for DataFrame payloads per sender" in { + val q = new TestQueue + val cid = channelId("up", "self") + val frame = DataFrame(Array(tuple(1), tuple(2), tuple(3))) + q.enqueueData(DataElement(frame, cid)) + // After enqueue, bytes-in > 0, bytes-out == 0 → credit positive. + val creditAfterEnqueue = q.getQueuedCredit(cid) + assert(creditAfterEnqueue == frame.inMemSize) + // After getElement, bytes-out catches up → credit drops to 0. + q.getElement + assert(q.getQueuedCredit(cid) == 0L) + } + + it should "NOT increment credit for ControlElement payloads (control bytes are untracked)" in { + val q = new TestQueue + val cid = channelId("up", "self") + q.enqueueCommand(DummyCtrl, cid) + assert(q.getQueuedCredit(cid) == 0L, "control payloads should not affect the credit counter") + } + + it should "NOT increment credit for non-DataFrame DataPayloads (e.g. StateFrame)" in { + val q = new TestQueue + val cid = channelId("up", "self") + val state = StateFrame(org.apache.texera.amber.core.state.State(Map.empty)) + q.enqueueData(DataElement(state, cid)) + assert(q.getQueuedCredit(cid) == 0L, "StateFrame payloads should not affect the credit counter") + } + + it should "track each sender's credit independently (no cross-sender accumulation)" in { + val q = new TestQueue + val s1 = channelId("up-1", "self") + val s2 = channelId("up-2", "self") + val f1 = DataFrame(Array(tuple(10))) + val f2 = DataFrame(Array(tuple(20), tuple(30))) + q.enqueueData(DataElement(f1, s1)) + q.enqueueData(DataElement(f2, s2)) + assert(q.getQueuedCredit(s1) == f1.inMemSize) + assert(q.getQueuedCredit(s2) == f2.inMemSize) + // Drain only s1 by reading the first item (FIFO within data queue). + q.getElement // s1's frame + assert(q.getQueuedCredit(s1) == 0L) + assert( + q.getQueuedCredit(s2) == f2.inMemSize, + "s2's credit should still reflect un-consumed data" + ) + } + + it should "accumulate credit across multiple enqueues for the same sender" in { + val q = new TestQueue + val cid = channelId("up", "self") + val f1 = DataFrame(Array(tuple(1))) + val f2 = DataFrame(Array(tuple(2))) + q.enqueueData(DataElement(f1, cid)) + q.enqueueData(DataElement(f2, cid)) + assert(q.getQueuedCredit(cid) == f1.inMemSize + f2.inMemSize) + } + + // --------------------------------------------------------------------------- + // Companion constants + // --------------------------------------------------------------------------- + + "WorkerBatchInternalQueue companion" should + "define stable queue-priority constants (CONTROL_QUEUE < DATA_QUEUE)" in { + // Pin the numeric ordering — control must have a lower (higher-priority) + // queue id than data. The multi-priority semantics in `getElement` rely + // on this. A reversal would silently change dispatch behavior. + assert(WorkerBatchInternalQueue.CONTROL_QUEUE == 0) + assert(WorkerBatchInternalQueue.DATA_QUEUE == 1) + assert(WorkerBatchInternalQueue.CONTROL_QUEUE < WorkerBatchInternalQueue.DATA_QUEUE) + } +}
