This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d617b612c test(amber): add unit test coverage for AmberFIFOChannel 
and WorkerPort (#4831)
6d617b612c is described below

commit 6d617b612c924a5082af503f39d4caecb0624631
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun May 3 12:58:43 2026 -0700

    test(amber): add unit test coverage for AmberFIFOChannel and WorkerPort 
(#4831)
    
    ### What changes were proposed in this PR?
    
    Add two specs covering the messaging-layer pure-logic primitives.
    
    `AmberFIFOChannelSpec` (per-channel exactly-once FIFO ordering):
    - Construction defaults (empty queue, sequence 0, zero credit, enabled)
    - `acceptMessage` in-order delivery advances `currentSeq` and credit
    - Out-of-order seq stashes; subsequent in-order seq drains the stash in
    FIFO order
    - Duplicate-below-current is silently dropped
    - Duplicate stash does not double-count the stash size
    - `take` decrements `getQueuedCredit` by the dequeued message size
    - `enable` / `isEnabled` toggle
    - `getPortId` throws when no port id is set; returns the most recently
    configured port id
    
    `WorkerPortSpec` (WorkerPort case-class shape):
    - Default empty channel set / not-completed
    - Channel set wiring at construction and post-construction append
    - `completed` mutable flip
    - Case-class equality across distinct instances
    
    `WorkerTimerService` is intentionally skipped because it is a thin
    wrapper around `AkkaActorService.sendToSelfWithFixedDelay` and would
    require a full Pekko harness for low-value scheduling assertions.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4830
    
    ### How was this PR tested?
    
    `sbt "WorkflowExecutionService/testOnly
    
org.apache.texera.amber.engine.architecture.messaginglayer.AmberFIFOChannelSpec
    org.apache.texera.amber.engine.architecture.messaginglayer.WorkerPortSpec"`
    — 15/15 tests pass.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.7)
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../messaginglayer/AmberFIFOChannelSpec.scala      | 168 +++++++++++++++++++++
 .../messaginglayer/WorkerPortSpec.scala            |  67 ++++++++
 2 files changed, 235 insertions(+)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannelSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannelSpec.scala
new file mode 100644
index 0000000000..b70d9f9d6a
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannelSpec.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.messaginglayer
+
+import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity}
+import org.apache.texera.amber.core.workflow.PortIdentity
+import org.apache.texera.amber.engine.common.ambermessage.{
+  WorkflowFIFOMessage,
+  WorkflowFIFOMessagePayload
+}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class AmberFIFOChannelSpec extends AnyFlatSpec {
+
+  private val cid =
+    ChannelIdentity(ActorVirtualIdentity("from"), ActorVirtualIdentity("to"), 
isControl = false)
+
+  // Non-DataFrame payload, so each message has a deterministic 200L size
+  // for credit/queued/stashed accounting.
+  private case class FixedSizePayload() extends WorkflowFIFOMessagePayload
+  private val msgSize: Long = 200L
+
+  private def msg(seq: Long): WorkflowFIFOMessage =
+    WorkflowFIFOMessage(cid, seq, FixedSizePayload())
+
+  // 
---------------------------------------------------------------------------
+  // Construction defaults
+  // 
---------------------------------------------------------------------------
+
+  "AmberFIFOChannel" should "expose the configured channelId and have an empty 
queue at construction" in {
+    val ch = new AmberFIFOChannel(cid)
+    assert(ch.channelId == cid)
+    assert(!ch.hasMessage)
+    assert(ch.getCurrentSeq == 0L)
+    assert(ch.getQueuedCredit == 0L)
+    assert(ch.getTotalMessageSize == 0L)
+    assert(ch.getTotalStashedSize == 0L)
+  }
+
+  it should "default to enabled" in {
+    val ch = new AmberFIFOChannel(cid)
+    assert(ch.isEnabled)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // FIFO ordering and stash
+  // 
---------------------------------------------------------------------------
+
+  "AmberFIFOChannel.acceptMessage" should "forward an in-order seq=0 message 
and advance the current sequence" in {
+    val ch = new AmberFIFOChannel(cid)
+    ch.acceptMessage(msg(0L))
+    assert(ch.hasMessage)
+    assert(ch.getCurrentSeq == 1L)
+    assert(ch.getQueuedCredit == msgSize)
+    assert(ch.getTotalMessageSize == msgSize)
+  }
+
+  it should "stash an out-of-order message until its predecessor arrives, then 
drain in FIFO order" in {
+    val ch = new AmberFIFOChannel(cid)
+    // arrives out of order: seq 1 first, then seq 0
+    ch.acceptMessage(msg(1L))
+    assert(!ch.hasMessage, "ahead-of-window message must be stashed, not 
delivered")
+    assert(ch.getCurrentSeq == 0L)
+    assert(ch.getTotalStashedSize == msgSize)
+
+    ch.acceptMessage(msg(0L))
+    // both should drain
+    assert(ch.hasMessage)
+    assert(ch.getCurrentSeq == 2L)
+    assert(ch.getQueuedCredit == 2 * msgSize)
+    assert(ch.getTotalStashedSize == 0L)
+
+    val first = ch.take
+    val second = ch.take
+    assert(first.sequenceNumber == 0L)
+    assert(second.sequenceNumber == 1L)
+    assert(!ch.hasMessage)
+    assert(ch.getQueuedCredit == 0L)
+  }
+
+  it should "drop duplicates whose sequence number is below the current 
high-water mark" in {
+    val ch = new AmberFIFOChannel(cid)
+    ch.acceptMessage(msg(0L))
+    ch.acceptMessage(msg(0L)) // duplicate
+    assert(ch.getCurrentSeq == 1L, "duplicate must not advance the sequence")
+    // only one message is buffered
+    val out = ch.take
+    assert(out.sequenceNumber == 0L)
+    assert(!ch.hasMessage)
+  }
+
+  it should "drop duplicates that are stashed twice ahead of the current 
window" in {
+    val ch = new AmberFIFOChannel(cid)
+    ch.acceptMessage(msg(2L))
+    ch.acceptMessage(msg(2L)) // duplicate stash
+    assert(ch.getTotalStashedSize == msgSize, "duplicate stash must not 
double-count")
+    // unblock by delivering 0 and 1
+    ch.acceptMessage(msg(0L))
+    ch.acceptMessage(msg(1L))
+    assert(ch.getCurrentSeq == 3L)
+    val received = (0 until 3).map(_ => ch.take.sequenceNumber).toList
+    assert(received == List(0L, 1L, 2L))
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Accounting under take
+  // 
---------------------------------------------------------------------------
+
+  "AmberFIFOChannel.take" should "decrement getQueuedCredit by the size of the 
dequeued message" in {
+    val ch = new AmberFIFOChannel(cid)
+    ch.acceptMessage(msg(0L))
+    ch.acceptMessage(msg(1L))
+    assert(ch.getQueuedCredit == 2 * msgSize)
+    ch.take
+    assert(ch.getQueuedCredit == msgSize)
+    ch.take
+    assert(ch.getQueuedCredit == 0L)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // enable / isEnabled
+  // 
---------------------------------------------------------------------------
+
+  "AmberFIFOChannel.enable" should "toggle the enabled flag" in {
+    val ch = new AmberFIFOChannel(cid)
+    ch.enable(false)
+    assert(!ch.isEnabled)
+    ch.enable(true)
+    assert(ch.isEnabled)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // PortId association
+  // 
---------------------------------------------------------------------------
+
+  "AmberFIFOChannel.getPortId" should "throw when no portId has been set" in {
+    val ch = new AmberFIFOChannel(cid)
+    // Option.get on None
+    assertThrows[NoSuchElementException] {
+      ch.getPortId
+    }
+  }
+
+  it should "return the most recently configured portId" in {
+    val ch = new AmberFIFOChannel(cid)
+    ch.setPortId(PortIdentity(0))
+    assert(ch.getPortId == PortIdentity(0))
+    ch.setPortId(PortIdentity(7))
+    assert(ch.getPortId == PortIdentity(7))
+  }
+}
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/WorkerPortSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/WorkerPortSpec.scala
new file mode 100644
index 0000000000..055e264c37
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/messaginglayer/WorkerPortSpec.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.messaginglayer
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity}
+import org.scalatest.flatspec.AnyFlatSpec
+
+import scala.collection.mutable
+
+class WorkerPortSpec extends AnyFlatSpec {
+
+  private val schema: Schema = Schema().add(new Attribute("v", 
AttributeType.INTEGER))
+
+  "WorkerPort" should "default to an empty channel set and not-completed 
state" in {
+    val p = WorkerPort(schema)
+    assert(p.schema == schema)
+    assert(p.channels.isEmpty)
+    assert(!p.completed)
+  }
+
+  it should "carry the channel set provided at construction" in {
+    val cid =
+      ChannelIdentity(ActorVirtualIdentity("a"), ActorVirtualIdentity("b"), 
isControl = false)
+    val p = WorkerPort(schema, mutable.Set(cid))
+    assert(p.channels == mutable.Set(cid))
+  }
+
+  it should "allow `completed` to be flipped to true" in {
+    val p = WorkerPort(schema)
+    p.completed = true
+    assert(p.completed)
+  }
+
+  it should "allow channels to be appended after construction" in {
+    val p = WorkerPort(schema)
+    val cid =
+      ChannelIdentity(ActorVirtualIdentity("a"), ActorVirtualIdentity("b"), 
isControl = false)
+    p.channels += cid
+    assert(p.channels.contains(cid))
+  }
+
+  it should "treat distinct instances with the same fields as case-class 
equal" in {
+    val cid =
+      ChannelIdentity(ActorVirtualIdentity("a"), ActorVirtualIdentity("b"), 
isControl = false)
+    val a = WorkerPort(schema, mutable.Set(cid), completed = true)
+    val b = WorkerPort(schema, mutable.Set(cid), completed = true)
+    assert(a == b)
+  }
+}

Reply via email to