This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 7e9cabf2bf test(amber): add unit test coverage for
WorkerBatchInternalQueue (#5553)
7e9cabf2bf is described below
commit 7e9cabf2bf4edc0540ab7397dd644bd96cc2a042
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri Jun 12 10:20:18 2026 -0700
test(amber): add unit test coverage for WorkerBatchInternalQueue (#5553)
### What changes were proposed in this PR?
Pin behavior of `WorkerBatchInternalQueue` — the per-DP-thread mailbox
trait used by the Python worker. Previously uncovered; the only
uncovered module in the `pythonworker` package whose contract is
unit-testable without standing up a real Python subprocess. No
production-code changes.
| Spec | Source class | Tests |
| --- | --- | --- |
| `WorkerBatchInternalQueueSpec` | `WorkerBatchInternalQueue` (trait +
companion) | 17 |
Spec file name follows the `<srcClassName>Spec.scala` one-to-one
convention.
**Behavior pinned**
| Surface | Contract |
| --- | --- |
| `enqueueData` + `getElement` | round-trip a `DataElement` (with both
`DataFrame` and `StateFrame` payloads) |
| `enqueueCommand` + `getElement` | round-trip a `ControlElement` |
| `enqueueActorCommand` + `getElement` | round-trip an
`ActorCommandElement` |
| Multi-priority dispatch | control elements are returned **before**
data elements when both are queued (sub-queue 0 < 1) |
| FIFO within the control queue | `ControlElement` enqueued first comes
out before `ActorCommandElement` enqueued second |
| `getDataQueueLength` | reports only data-queue items (control is
excluded) |
| `getControlQueueLength` / `isControlQueueEmpty` | report all
control-queue items (`ControlElement` + `ActorCommandElement`) |
| `disableDataQueue` | hides queued data from `getElement` until
`enableDataQueue` is called; control flow still moves |
| `getQueuedCredit(sender)` | `0` initially; tracks bytes-in minus
bytes-out for `DataFrame` payloads per sender; stays `0` for control /
`StateFrame` payloads; per-sender accounting is independent; accumulates
across multiple enqueues for the same sender |
| Companion constants | `CONTROL_QUEUE == 0`, `DATA_QUEUE == 1`, and
`CONTROL_QUEUE < DATA_QUEUE` (relied on by the multi-priority semantics)
|
The trait is exercised through a small test-only subclass (`class
TestQueue extends WorkerBatchInternalQueue`), with
`DirectControlMessagePayload` represented by a local marker case object
since the production trait carries no behavior.
### Any related issues, documentation, discussions?
Closes #5552.
### How was this PR tested?
Pure unit-test addition; verified locally with:
- `sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.pythonworker.WorkerBatchInternalQueueSpec"`
— 17 tests, all green
- `sbt scalafmtCheckAll` — clean
- CI to confirm
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Sonnet 4.5)
---------
Signed-off-by: Yicong Huang <[email protected]>
Co-authored-by: Yicong Huang
<[email protected]>
Co-authored-by: Copilot Autofix powered by AI
<[email protected]>
---
.../WorkerBatchInternalQueueSpec.scala | 271 +++++++++++++++++++++
1 file changed, 271 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/WorkerBatchInternalQueueSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/WorkerBatchInternalQueueSpec.scala
new file mode 100644
index 0000000000..e2d2363ed7
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/WorkerBatchInternalQueueSpec.scala
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.pythonworker
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema,
Tuple}
+import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity,
ChannelIdentity}
+import
org.apache.texera.amber.engine.architecture.pythonworker.WorkerBatchInternalQueue._
+import org.apache.texera.amber.engine.common.actormessage.ActorCommand
+import org.apache.texera.amber.engine.common.ambermessage.{
+ DataFrame,
+ DirectControlMessagePayload,
+ StateFrame
+}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class WorkerBatchInternalQueueSpec extends AnyFlatSpec {
+
+ //
---------------------------------------------------------------------------
+ // Test harness — WorkerBatchInternalQueue is a trait. Mix it into an
+ // otherwise-empty class so we can drive its public surface directly.
+ //
---------------------------------------------------------------------------
+
+ private class TestQueue extends WorkerBatchInternalQueue
+
+ private def channelId(from: String, to: String): ChannelIdentity =
+ ChannelIdentity(ActorVirtualIdentity(from), ActorVirtualIdentity(to),
isControl = false)
+
+ // A one-attribute schema is enough to build real Tuple instances for
+ // DataFrame byte-accounting tests.
+ private val intAttr = new Attribute("v", AttributeType.INTEGER)
+ private val schema: Schema = Schema().add(intAttr)
+ private def tuple(value: Int): Tuple =
+ Tuple.builder(schema).add(intAttr, Integer.valueOf(value)).build()
+
+ // Marker DirectControlMessagePayload for ControlElement tests. The
+ // trait is empty (just a marker), so any concrete value works.
+ private case object DummyCtrl extends DirectControlMessagePayload
+
+ //
---------------------------------------------------------------------------
+ // enqueue + getElement — type-preserving round-trip
+ //
---------------------------------------------------------------------------
+
+ "WorkerBatchInternalQueue.enqueueData + getElement" should
+ "round-trip a DataElement carrying a DataFrame" in {
+ val q = new TestQueue
+ val cid = channelId("up", "self")
+ val frame = DataFrame(Array(tuple(1), tuple(2)))
+ q.enqueueData(DataElement(frame, cid))
+ val out = q.getElement
+ assert(out == DataElement(frame, cid))
+ }
+
+ it should "round-trip a DataElement carrying a non-DataFrame DataPayload
(e.g. StateFrame)" in {
+ // The byte-accounting path is `case frame: DataFrame =>`; a non-
+ // DataFrame DataPayload (StateFrame) must still pass through
+ // enqueue/dequeue cleanly even though credit is not updated.
+ val q = new TestQueue
+ val cid = channelId("up", "self")
+ val state = StateFrame(org.apache.texera.amber.core.state.State(Map.empty))
+ q.enqueueData(DataElement(state, cid))
+ val out = q.getElement
+ assert(out == DataElement(state, cid))
+ }
+
+ "WorkerBatchInternalQueue.enqueueCommand + getElement" should
+ "round-trip a ControlElement" in {
+ val q = new TestQueue
+ val cid = channelId("up", "self")
+ q.enqueueCommand(DummyCtrl, cid)
+ val out = q.getElement
+ assert(out == ControlElement(DummyCtrl, cid))
+ }
+
+ "WorkerBatchInternalQueue.enqueueActorCommand + getElement" should
+ "round-trip an ActorCommandElement (control-priority lane)" in {
+ val q = new TestQueue
+ q.enqueueActorCommand(ActorCommand.Empty)
+ val out = q.getElement
+ assert(out == ActorCommandElement(ActorCommand.Empty))
+ }
+
+ //
---------------------------------------------------------------------------
+ // Multi-priority dispatch — control queue (priority 0) wins over data (1)
+ //
---------------------------------------------------------------------------
+
+ "WorkerBatchInternalQueue.getElement" should
+ "drain the control queue before the data queue (lower numeric priority
wins)" in {
+ val q = new TestQueue
+ val cid = channelId("up", "self")
+ // Enqueue data FIRST, then control — getElement must still return
+ // control first because its sub-queue has priority 0 < 1.
+ q.enqueueData(DataElement(DataFrame(Array(tuple(1))), cid))
+ q.enqueueCommand(DummyCtrl, cid)
+ val first = q.getElement
+ val second = q.getElement
+ assert(first.isInstanceOf[ControlElement], s"expected control first, got
$first")
+ assert(second.isInstanceOf[DataElement], s"expected data second, got
$second")
+ }
+
+ it should "interleave actor-command and direct-control items as they were
enqueued" in {
+ // Both ActorCommandElement and ControlElement go into the control
+ // queue (priority 0). Within a single queue, FIFO order applies.
+ val q = new TestQueue
+ val cid = channelId("up", "self")
+ q.enqueueCommand(DummyCtrl, cid)
+ q.enqueueActorCommand(ActorCommand.Empty)
+ val first = q.getElement
+ val second = q.getElement
+ assert(first == ControlElement(DummyCtrl, cid))
+ assert(second == ActorCommandElement(ActorCommand.Empty))
+ }
+
+ //
---------------------------------------------------------------------------
+ // Queue-length accounting
+ //
---------------------------------------------------------------------------
+
+ "WorkerBatchInternalQueue.getDataQueueLength" should
+ "track only data-queue items (excludes control)" in {
+ val q = new TestQueue
+ val cid = channelId("up", "self")
+ assert(q.getDataQueueLength == 0)
+ q.enqueueData(DataElement(DataFrame(Array.empty), cid))
+ q.enqueueData(DataElement(DataFrame(Array.empty), cid))
+ // Control items must NOT inflate the data-queue count.
+ q.enqueueCommand(DummyCtrl, cid)
+ assert(q.getDataQueueLength == 2)
+ }
+
+ "WorkerBatchInternalQueue.getControlQueueLength" should
+ "track every control-queue item (ControlElement + ActorCommandElement)" in
{
+ val q = new TestQueue
+ val cid = channelId("up", "self")
+ assert(q.getControlQueueLength == 0)
+ q.enqueueCommand(DummyCtrl, cid)
+ q.enqueueActorCommand(ActorCommand.Empty)
+ q.enqueueData(DataElement(DataFrame(Array.empty), cid))
+ assert(q.getControlQueueLength == 2)
+ }
+
+ "WorkerBatchInternalQueue.isControlQueueEmpty" should
+ "be true initially and after every control item is consumed" in {
+ val q = new TestQueue
+ val cid = channelId("up", "self")
+ assert(q.isControlQueueEmpty)
+ q.enqueueCommand(DummyCtrl, cid)
+ assert(!q.isControlQueueEmpty)
+ q.getElement // drains the control item
+ assert(q.isControlQueueEmpty)
+ }
+
+ //
---------------------------------------------------------------------------
+ // disable / enable data queue — control flow keeps moving
+ //
---------------------------------------------------------------------------
+
+ "WorkerBatchInternalQueue.disableDataQueue" should
+ "keep enqueued data invisible to getElement until enableDataQueue is
called" in {
+ val q = new TestQueue
+ val cid = channelId("up", "self")
+ q.enqueueData(DataElement(DataFrame(Array(tuple(1))), cid))
+ q.disableDataQueue()
+ // With the data queue disabled, the only enqueued item is data —
+ // queue an inert control item, getElement must yield the control
+ // item even though data was enqueued FIRST.
+ q.enqueueCommand(DummyCtrl, cid)
+ val out = q.getElement
+ assert(out.isInstanceOf[ControlElement], s"expected control while data is
disabled, got $out")
+ // Re-enable; the originally-enqueued data is now reachable.
+ q.enableDataQueue()
+ val nextOut = q.getElement
+ assert(nextOut.isInstanceOf[DataElement])
+ }
+
+ //
---------------------------------------------------------------------------
+ // getQueuedCredit — byte-accounting for DataFrame payloads
+ //
---------------------------------------------------------------------------
+
+ "WorkerBatchInternalQueue.getQueuedCredit" should
+ "report 0 for any sender when nothing has been enqueued" in {
+ val q = new TestQueue
+ assert(q.getQueuedCredit(channelId("a", "b")) == 0L)
+ assert(q.getQueuedCredit(channelId("c", "d")) == 0L)
+ }
+
+ it should "track bytes-in minus bytes-out for DataFrame payloads per sender"
in {
+ val q = new TestQueue
+ val cid = channelId("up", "self")
+ val frame = DataFrame(Array(tuple(1), tuple(2), tuple(3)))
+ q.enqueueData(DataElement(frame, cid))
+ // After enqueue, bytes-in > 0, bytes-out == 0 → credit positive.
+ val creditAfterEnqueue = q.getQueuedCredit(cid)
+ assert(creditAfterEnqueue == frame.inMemSize)
+ // After getElement, bytes-out catches up → credit drops to 0.
+ q.getElement
+ assert(q.getQueuedCredit(cid) == 0L)
+ }
+
+ it should "NOT increment credit for ControlElement payloads (control bytes
are untracked)" in {
+ val q = new TestQueue
+ val cid = channelId("up", "self")
+ q.enqueueCommand(DummyCtrl, cid)
+ assert(q.getQueuedCredit(cid) == 0L, "control payloads should not affect
the credit counter")
+ }
+
+ it should "NOT increment credit for non-DataFrame DataPayloads (e.g.
StateFrame)" in {
+ val q = new TestQueue
+ val cid = channelId("up", "self")
+ val state = StateFrame(org.apache.texera.amber.core.state.State(Map.empty))
+ q.enqueueData(DataElement(state, cid))
+ assert(q.getQueuedCredit(cid) == 0L, "StateFrame payloads should not
affect the credit counter")
+ }
+
+ it should "track each sender's credit independently (no cross-sender
accumulation)" in {
+ val q = new TestQueue
+ val s1 = channelId("up-1", "self")
+ val s2 = channelId("up-2", "self")
+ val f1 = DataFrame(Array(tuple(10)))
+ val f2 = DataFrame(Array(tuple(20), tuple(30)))
+ q.enqueueData(DataElement(f1, s1))
+ q.enqueueData(DataElement(f2, s2))
+ assert(q.getQueuedCredit(s1) == f1.inMemSize)
+ assert(q.getQueuedCredit(s2) == f2.inMemSize)
+ // Drain only s1 by reading the first item (FIFO within data queue).
+ q.getElement // s1's frame
+ assert(q.getQueuedCredit(s1) == 0L)
+ assert(
+ q.getQueuedCredit(s2) == f2.inMemSize,
+ "s2's credit should still reflect un-consumed data"
+ )
+ }
+
+ it should "accumulate credit across multiple enqueues for the same sender"
in {
+ val q = new TestQueue
+ val cid = channelId("up", "self")
+ val f1 = DataFrame(Array(tuple(1)))
+ val f2 = DataFrame(Array(tuple(2)))
+ q.enqueueData(DataElement(f1, cid))
+ q.enqueueData(DataElement(f2, cid))
+ assert(q.getQueuedCredit(cid) == f1.inMemSize + f2.inMemSize)
+ }
+
+ //
---------------------------------------------------------------------------
+ // Companion constants
+ //
---------------------------------------------------------------------------
+
+ "WorkerBatchInternalQueue companion" should
+ "define stable queue-priority constants (CONTROL_QUEUE < DATA_QUEUE)" in {
+ // Pin the numeric ordering — control must have a lower (higher-priority)
+ // queue id than data. The multi-priority semantics in `getElement` rely
+ // on this. A reversal would silently change dispatch behavior.
+ assert(WorkerBatchInternalQueue.CONTROL_QUEUE == 0)
+ assert(WorkerBatchInternalQueue.DATA_QUEUE == 1)
+ assert(WorkerBatchInternalQueue.CONTROL_QUEUE <
WorkerBatchInternalQueue.DATA_QUEUE)
+ }
+}