This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d8cf3c05a test(amber): add unit test coverage for AmberMessage 
envelopes (#4829)
0d8cf3c05a is described below

commit 0d8cf3c05a5d8c0dfa8aa4b28f386856c64cf294
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun May 3 12:46:57 2026 -0700

    test(amber): add unit test coverage for AmberMessage envelopes (#4829)
    
    ### What changes were proposed in this PR?
    
    Add `AmberMessageEnvelopesSpec` covering the engine message envelope
    layer:
    
    - `WorkflowFIFOMessage` / `WorkflowRecoveryMessage` round-trip via case
    class accessors and the `WorkflowMessage` / `Serializable` traits
    - `RecoveryPayload` subtypes (`UpdateRecoveryStatus`, `ResendOutputTo`,
    `NotifyFailedNode`) carry their args; the `ResendOutputTo` ActorRef
    field is exercised via a real Pekko `ActorSystem`
    - `WorkflowMessage.getInMemSize` for a DataFrame payload, an empty-frame
    DataFrame, and the 200L default for any other
    `WorkflowFIFOMessagePayload`
    - `WorkflowFIFOMessagePayload` / `DirectControlMessagePayload` trait
    wiring
    
    The catch-all `case _ => 200L` for non-`WorkflowFIFOMessage` subtypes is
    intentionally not tested because `WorkflowMessage` is sealed and today
    has only one subtype, so that branch is dead by construction.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4828
    
    ### How was this PR tested?
    
    `sbt "WorkflowExecutionService/testOnly
    
org.apache.texera.amber.engine.common.ambermessage.AmberMessageEnvelopesSpec"`
    — 11/11 tests pass.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.7)
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../ambermessage/AmberMessageEnvelopesSpec.scala   | 150 +++++++++++++++++++++
 1 file changed, 150 insertions(+)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/common/ambermessage/AmberMessageEnvelopesSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/common/ambermessage/AmberMessageEnvelopesSpec.scala
new file mode 100644
index 0000000000..fa03b5acef
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/common/ambermessage/AmberMessageEnvelopesSpec.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.common.ambermessage
+
+import org.apache.pekko.actor.{Address, ActorSystem}
+import org.apache.pekko.testkit.TestKit
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
+import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity}
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.flatspec.AnyFlatSpec
+
+class AmberMessageEnvelopesSpec extends AnyFlatSpec with BeforeAndAfterAll {
+
+  // Suite-local actor system used only by the ResendOutputTo test below;
+  // shut down via TestKit.shutdownActorSystem in afterAll so threads do not
+  // outlive the test, matching the cleanup pattern in ControllerSpec /
+  // WorkerSpec.
+  private val pekkoSystem: ActorSystem = 
ActorSystem("amber-message-envelopes-test")
+
+  override protected def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(pekkoSystem)
+    super.afterAll()
+  }
+
+  private val channel =
+    ChannelIdentity(ActorVirtualIdentity("from"), ActorVirtualIdentity("to"), 
isControl = false)
+
+  private val intSchema: Schema = Schema().add(new Attribute("v", 
AttributeType.INTEGER))
+  private def tuple(v: Int): Tuple =
+    Tuple
+      .builder(intSchema)
+      .add(intSchema.getAttribute("v"), Integer.valueOf(v))
+      .build()
+
+  // 
---------------------------------------------------------------------------
+  // WorkflowFIFOMessage / WorkflowRecoveryMessage envelope shape
+  // 
---------------------------------------------------------------------------
+
+  "WorkflowFIFOMessage" should "carry channelId, sequenceNumber, and payload 
as constructed" in {
+    val payload = DataFrame(Array(tuple(1)))
+    val msg = WorkflowFIFOMessage(channel, 7L, payload)
+    assert(msg.channelId == channel)
+    assert(msg.sequenceNumber == 7L)
+    assert(msg.payload == payload)
+  }
+
+  it should "be a WorkflowMessage and Serializable" in {
+    val msg = WorkflowFIFOMessage(channel, 0L, DataFrame(Array.empty))
+    assert(msg.isInstanceOf[WorkflowMessage])
+    assert(msg.isInstanceOf[Serializable])
+  }
+
+  "WorkflowRecoveryMessage" should "carry the sender and payload as 
constructed" in {
+    val from = ActorVirtualIdentity("worker-1")
+    val payload = UpdateRecoveryStatus(isRecovering = true)
+    val msg = WorkflowRecoveryMessage(from, payload)
+    assert(msg.from == from)
+    assert(msg.payload == payload)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // RecoveryPayload subtypes
+  // 
---------------------------------------------------------------------------
+
+  "RecoveryPayload subtypes" should "carry their constructor arguments" in {
+    val update = UpdateRecoveryStatus(isRecovering = true)
+    assert(update.isRecovering)
+
+    val updateOff = UpdateRecoveryStatus(isRecovering = false)
+    assert(!updateOff.isRecovering)
+
+    val nodeFailure = NotifyFailedNode(Address("pekko", "test"))
+    assert(nodeFailure.addr == Address("pekko", "test"))
+  }
+
+  it should "exercise ResendOutputTo via a real ActorRef so the case class 
wires correctly" in {
+    val deadRef = pekkoSystem.deadLetters
+    val vid = ActorVirtualIdentity("downstream")
+    val payload = ResendOutputTo(vid, deadRef)
+    assert(payload.vid == vid)
+    assert(payload.ref == deadRef)
+  }
+
+  it should "be Serializable on every subtype" in {
+    val payloads: Seq[RecoveryPayload] = Seq(
+      UpdateRecoveryStatus(isRecovering = true),
+      NotifyFailedNode(Address("pekko", "n"))
+    )
+    payloads.foreach(p => assert(p.isInstanceOf[Serializable]))
+  }
+
+  // 
---------------------------------------------------------------------------
+  // WorkflowMessage.getInMemSize
+  // 
---------------------------------------------------------------------------
+
+  // A non-DataFrame payload so getInMemSize falls into the 200L default 
branch.
+  private case class FixedSizePayload() extends WorkflowFIFOMessagePayload
+
+  "WorkflowMessage.getInMemSize" should "be the DataFrame's inMemSize for a 
WorkflowFIFOMessage carrying a DataFrame" in {
+    val df = DataFrame(Array(tuple(1), tuple(2)))
+    val msg = WorkflowFIFOMessage(channel, 0L, df)
+    assert(WorkflowMessage.getInMemSize(msg) == df.inMemSize)
+  }
+
+  it should "be zero for an empty-frame WorkflowFIFOMessage" in {
+    val msg = WorkflowFIFOMessage(channel, 0L, DataFrame(Array.empty))
+    assert(WorkflowMessage.getInMemSize(msg) == 0L)
+  }
+
+  it should "default to 200L for a non-DataFrame WorkflowFIFOMessagePayload" 
in {
+    val msg = WorkflowFIFOMessage(channel, 0L, FixedSizePayload())
+    assert(WorkflowMessage.getInMemSize(msg) == 200L)
+  }
+
+  // The catch-all `case _ => 200L` for non-WorkflowFIFOMessage subtypes is
+  // guarded by `WorkflowMessage` being sealed. Today the sealed hierarchy
+  // only has `WorkflowFIFOMessage`, so this branch is dead by construction;
+  // we leave it untested rather than open the seal.
+
+  // 
---------------------------------------------------------------------------
+  // WorkflowFIFOMessagePayload trait wiring (sanity)
+  // 
---------------------------------------------------------------------------
+
+  "WorkflowFIFOMessagePayload trait" should "be implementable as a custom 
payload" in {
+    val payload: WorkflowFIFOMessagePayload = FixedSizePayload()
+    assert(payload.isInstanceOf[Serializable])
+  }
+
+  "DirectControlMessagePayload trait" should "be a WorkflowFIFOMessagePayload 
subtype" in {
+    val custom: DirectControlMessagePayload = new DirectControlMessagePayload 
{}
+    assert(custom.isInstanceOf[WorkflowFIFOMessagePayload])
+  }
+}

Reply via email to