This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 6d617b612c test(amber): add unit test coverage for AmberFIFOChannel
and WorkerPort (#4831)
6d617b612c is described below
commit 6d617b612c924a5082af503f39d4caecb0624631
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun May 3 12:58:43 2026 -0700
test(amber): add unit test coverage for AmberFIFOChannel and WorkerPort
(#4831)
### What changes were proposed in this PR?
Add two specs covering the messaging-layer pure-logic primitives.
`AmberFIFOChannelSpec` (per-channel exactly-once FIFO ordering):
- Construction defaults (empty queue, sequence 0, zero credit, enabled)
- `acceptMessage` in-order delivery advances `currentSeq` and credit
- Out-of-order seq stashes; subsequent in-order seq drains the stash in
FIFO order
- Duplicate-below-current is silently dropped
- Duplicate stash does not double-count the stash size
- `take` decrements `getQueuedCredit` by the dequeued message size
- `enable` / `isEnabled` toggle
- `getPortId` throws when no port id is set; returns the most recently
configured port id
`WorkerPortSpec` (WorkerPort case-class shape):
- Default empty channel set / not-completed
- Channel set wiring at construction and post-construction append
- `completed` mutable flip
- Case-class equality across distinct instances
`WorkerTimerService` is intentionally skipped because it is a thin
wrapper around `AkkaActorService.sendToSelfWithFixedDelay` and would
require a full Pekko harness for low-value scheduling assertions.
### Any related issues, documentation, discussions?
Closes #4830
### How was this PR tested?
`sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.messaginglayer.AmberFIFOChannelSpec
org.apache.texera.amber.engine.architecture.messaginglayer.WorkerPortSpec"`
— 15/15 tests pass.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../messaginglayer/AmberFIFOChannelSpec.scala | 168 +++++++++++++++++++++
.../messaginglayer/WorkerPortSpec.scala | 67 ++++++++
2 files changed, 235 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannelSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannelSpec.scala
new file mode 100644
index 0000000000..b70d9f9d6a
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannelSpec.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.messaginglayer
+
+import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity,
ChannelIdentity}
+import org.apache.texera.amber.core.workflow.PortIdentity
+import org.apache.texera.amber.engine.common.ambermessage.{
+ WorkflowFIFOMessage,
+ WorkflowFIFOMessagePayload
+}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class AmberFIFOChannelSpec extends AnyFlatSpec {
+
+ private val cid =
+ ChannelIdentity(ActorVirtualIdentity("from"), ActorVirtualIdentity("to"),
isControl = false)
+
+ // Non-DataFrame payload, so each message has a deterministic 200L size
+ // for credit/queued/stashed accounting.
+ private case class FixedSizePayload() extends WorkflowFIFOMessagePayload
+ private val msgSize: Long = 200L
+
+ private def msg(seq: Long): WorkflowFIFOMessage =
+ WorkflowFIFOMessage(cid, seq, FixedSizePayload())
+
+ //
---------------------------------------------------------------------------
+ // Construction defaults
+ //
---------------------------------------------------------------------------
+
+ "AmberFIFOChannel" should "expose the configured channelId and have an empty
queue at construction" in {
+ val ch = new AmberFIFOChannel(cid)
+ assert(ch.channelId == cid)
+ assert(!ch.hasMessage)
+ assert(ch.getCurrentSeq == 0L)
+ assert(ch.getQueuedCredit == 0L)
+ assert(ch.getTotalMessageSize == 0L)
+ assert(ch.getTotalStashedSize == 0L)
+ }
+
+ it should "default to enabled" in {
+ val ch = new AmberFIFOChannel(cid)
+ assert(ch.isEnabled)
+ }
+
+ //
---------------------------------------------------------------------------
+ // FIFO ordering and stash
+ //
---------------------------------------------------------------------------
+
+ "AmberFIFOChannel.acceptMessage" should "forward an in-order seq=0 message
and advance the current sequence" in {
+ val ch = new AmberFIFOChannel(cid)
+ ch.acceptMessage(msg(0L))
+ assert(ch.hasMessage)
+ assert(ch.getCurrentSeq == 1L)
+ assert(ch.getQueuedCredit == msgSize)
+ assert(ch.getTotalMessageSize == msgSize)
+ }
+
+ it should "stash an out-of-order message until its predecessor arrives, then
drain in FIFO order" in {
+ val ch = new AmberFIFOChannel(cid)
+ // arrives out of order: seq 1 first, then seq 0
+ ch.acceptMessage(msg(1L))
+ assert(!ch.hasMessage, "ahead-of-window message must be stashed, not
delivered")
+ assert(ch.getCurrentSeq == 0L)
+ assert(ch.getTotalStashedSize == msgSize)
+
+ ch.acceptMessage(msg(0L))
+ // both should drain
+ assert(ch.hasMessage)
+ assert(ch.getCurrentSeq == 2L)
+ assert(ch.getQueuedCredit == 2 * msgSize)
+ assert(ch.getTotalStashedSize == 0L)
+
+ val first = ch.take
+ val second = ch.take
+ assert(first.sequenceNumber == 0L)
+ assert(second.sequenceNumber == 1L)
+ assert(!ch.hasMessage)
+ assert(ch.getQueuedCredit == 0L)
+ }
+
+ it should "drop duplicates whose sequence number is below the current
high-water mark" in {
+ val ch = new AmberFIFOChannel(cid)
+ ch.acceptMessage(msg(0L))
+ ch.acceptMessage(msg(0L)) // duplicate
+ assert(ch.getCurrentSeq == 1L, "duplicate must not advance the sequence")
+ // only one message is buffered
+ val out = ch.take
+ assert(out.sequenceNumber == 0L)
+ assert(!ch.hasMessage)
+ }
+
+ it should "drop duplicates that are stashed twice ahead of the current
window" in {
+ val ch = new AmberFIFOChannel(cid)
+ ch.acceptMessage(msg(2L))
+ ch.acceptMessage(msg(2L)) // duplicate stash
+ assert(ch.getTotalStashedSize == msgSize, "duplicate stash must not
double-count")
+ // unblock by delivering 0 and 1
+ ch.acceptMessage(msg(0L))
+ ch.acceptMessage(msg(1L))
+ assert(ch.getCurrentSeq == 3L)
+ val received = (0 until 3).map(_ => ch.take.sequenceNumber).toList
+ assert(received == List(0L, 1L, 2L))
+ }
+
+ //
---------------------------------------------------------------------------
+ // Accounting under take
+ //
---------------------------------------------------------------------------
+
+ "AmberFIFOChannel.take" should "decrement getQueuedCredit by the size of the
dequeued message" in {
+ val ch = new AmberFIFOChannel(cid)
+ ch.acceptMessage(msg(0L))
+ ch.acceptMessage(msg(1L))
+ assert(ch.getQueuedCredit == 2 * msgSize)
+ ch.take
+ assert(ch.getQueuedCredit == msgSize)
+ ch.take
+ assert(ch.getQueuedCredit == 0L)
+ }
+
+ //
---------------------------------------------------------------------------
+ // enable / isEnabled
+ //
---------------------------------------------------------------------------
+
+ "AmberFIFOChannel.enable" should "toggle the enabled flag" in {
+ val ch = new AmberFIFOChannel(cid)
+ ch.enable(false)
+ assert(!ch.isEnabled)
+ ch.enable(true)
+ assert(ch.isEnabled)
+ }
+
+ //
---------------------------------------------------------------------------
+ // PortId association
+ //
---------------------------------------------------------------------------
+
+ "AmberFIFOChannel.getPortId" should "throw when no portId has been set" in {
+ val ch = new AmberFIFOChannel(cid)
+ // Option.get on None
+ assertThrows[NoSuchElementException] {
+ ch.getPortId
+ }
+ }
+
+ it should "return the most recently configured portId" in {
+ val ch = new AmberFIFOChannel(cid)
+ ch.setPortId(PortIdentity(0))
+ assert(ch.getPortId == PortIdentity(0))
+ ch.setPortId(PortIdentity(7))
+ assert(ch.getPortId == PortIdentity(7))
+ }
+}
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/WorkerPortSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/WorkerPortSpec.scala
new file mode 100644
index 0000000000..055e264c37
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/WorkerPortSpec.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.messaginglayer
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity,
ChannelIdentity}
+import org.scalatest.flatspec.AnyFlatSpec
+
+import scala.collection.mutable
+
+class WorkerPortSpec extends AnyFlatSpec {
+
+ private val schema: Schema = Schema().add(new Attribute("v",
AttributeType.INTEGER))
+
+ "WorkerPort" should "default to an empty channel set and not-completed
state" in {
+ val p = WorkerPort(schema)
+ assert(p.schema == schema)
+ assert(p.channels.isEmpty)
+ assert(!p.completed)
+ }
+
+ it should "carry the channel set provided at construction" in {
+ val cid =
+ ChannelIdentity(ActorVirtualIdentity("a"), ActorVirtualIdentity("b"),
isControl = false)
+ val p = WorkerPort(schema, mutable.Set(cid))
+ assert(p.channels == mutable.Set(cid))
+ }
+
+ it should "allow `completed` to be flipped to true" in {
+ val p = WorkerPort(schema)
+ p.completed = true
+ assert(p.completed)
+ }
+
+ it should "allow channels to be appended after construction" in {
+ val p = WorkerPort(schema)
+ val cid =
+ ChannelIdentity(ActorVirtualIdentity("a"), ActorVirtualIdentity("b"),
isControl = false)
+ p.channels += cid
+ assert(p.channels.contains(cid))
+ }
+
+ it should "treat distinct instances with the same fields as case-class
equal" in {
+ val cid =
+ ChannelIdentity(ActorVirtualIdentity("a"), ActorVirtualIdentity("b"),
isControl = false)
+ val a = WorkerPort(schema, mutable.Set(cid), completed = true)
+ val b = WorkerPort(schema, mutable.Set(cid), completed = true)
+ assert(a == b)
+ }
+}