This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 0d8cf3c05a test(amber): add unit test coverage for AmberMessage
envelopes (#4829)
0d8cf3c05a is described below
commit 0d8cf3c05a5d8c0dfa8aa4b28f386856c64cf294
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun May 3 12:46:57 2026 -0700
test(amber): add unit test coverage for AmberMessage envelopes (#4829)
### What changes were proposed in this PR?
Add `AmberMessageEnvelopesSpec` covering the engine message envelope
layer:
- `WorkflowFIFOMessage` / `WorkflowRecoveryMessage` round-trip via case
class accessors and the `WorkflowMessage` / `Serializable` traits
- `RecoveryPayload` subtypes (`UpdateRecoveryStatus`, `ResendOutputTo`,
`NotifyFailedNode`) carry their args; the `ResendOutputTo` ActorRef
field is exercised via a real Pekko `ActorSystem`
- `WorkflowMessage.getInMemSize` for a DataFrame payload, an empty-frame
DataFrame, and the 200L default for any other
`WorkflowFIFOMessagePayload`
- `WorkflowFIFOMessagePayload` / `DirectControlMessagePayload` trait
wiring
The catch-all `case _ => 200L` for non-`WorkflowFIFOMessage` subtypes is
intentionally not tested because `WorkflowMessage` is sealed and today
has only one subtype, so that branch is dead by construction.
### Any related issues, documentation, discussions?
Closes #4828
### How was this PR tested?
`sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.common.ambermessage.AmberMessageEnvelopesSpec"`
— 11/11 tests pass.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../ambermessage/AmberMessageEnvelopesSpec.scala | 150 +++++++++++++++++++++
1 file changed, 150 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/common/ambermessage/AmberMessageEnvelopesSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/common/ambermessage/AmberMessageEnvelopesSpec.scala
new file mode 100644
index 0000000000..fa03b5acef
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/common/ambermessage/AmberMessageEnvelopesSpec.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.common.ambermessage
+
+import org.apache.pekko.actor.{Address, ActorSystem}
+import org.apache.pekko.testkit.TestKit
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema,
Tuple}
+import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity,
ChannelIdentity}
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.flatspec.AnyFlatSpec
+
+class AmberMessageEnvelopesSpec extends AnyFlatSpec with BeforeAndAfterAll {
+
+ // Suite-local actor system used only by the ResendOutputTo test below;
+ // shut down via TestKit.shutdownActorSystem in afterAll so threads do not
+ // outlive the test, matching the cleanup pattern in ControllerSpec /
+ // WorkerSpec.
+ private val pekkoSystem: ActorSystem =
ActorSystem("amber-message-envelopes-test")
+
+ override protected def afterAll(): Unit = {
+ TestKit.shutdownActorSystem(pekkoSystem)
+ super.afterAll()
+ }
+
+ private val channel =
+ ChannelIdentity(ActorVirtualIdentity("from"), ActorVirtualIdentity("to"),
isControl = false)
+
+ private val intSchema: Schema = Schema().add(new Attribute("v",
AttributeType.INTEGER))
+ private def tuple(v: Int): Tuple =
+ Tuple
+ .builder(intSchema)
+ .add(intSchema.getAttribute("v"), Integer.valueOf(v))
+ .build()
+
+ //
---------------------------------------------------------------------------
+ // WorkflowFIFOMessage / WorkflowRecoveryMessage envelope shape
+ //
---------------------------------------------------------------------------
+
+ "WorkflowFIFOMessage" should "carry channelId, sequenceNumber, and payload
as constructed" in {
+ val payload = DataFrame(Array(tuple(1)))
+ val msg = WorkflowFIFOMessage(channel, 7L, payload)
+ assert(msg.channelId == channel)
+ assert(msg.sequenceNumber == 7L)
+ assert(msg.payload == payload)
+ }
+
+ it should "be a WorkflowMessage and Serializable" in {
+ val msg = WorkflowFIFOMessage(channel, 0L, DataFrame(Array.empty))
+ assert(msg.isInstanceOf[WorkflowMessage])
+ assert(msg.isInstanceOf[Serializable])
+ }
+
+ "WorkflowRecoveryMessage" should "carry the sender and payload as
constructed" in {
+ val from = ActorVirtualIdentity("worker-1")
+ val payload = UpdateRecoveryStatus(isRecovering = true)
+ val msg = WorkflowRecoveryMessage(from, payload)
+ assert(msg.from == from)
+ assert(msg.payload == payload)
+ }
+
+ //
---------------------------------------------------------------------------
+ // RecoveryPayload subtypes
+ //
---------------------------------------------------------------------------
+
+ "RecoveryPayload subtypes" should "carry their constructor arguments" in {
+ val update = UpdateRecoveryStatus(isRecovering = true)
+ assert(update.isRecovering)
+
+ val updateOff = UpdateRecoveryStatus(isRecovering = false)
+ assert(!updateOff.isRecovering)
+
+ val nodeFailure = NotifyFailedNode(Address("pekko", "test"))
+ assert(nodeFailure.addr == Address("pekko", "test"))
+ }
+
+ it should "exercise ResendOutputTo via a real ActorRef so the case class
wires correctly" in {
+ val deadRef = pekkoSystem.deadLetters
+ val vid = ActorVirtualIdentity("downstream")
+ val payload = ResendOutputTo(vid, deadRef)
+ assert(payload.vid == vid)
+ assert(payload.ref == deadRef)
+ }
+
+ it should "be Serializable on every subtype" in {
+ val payloads: Seq[RecoveryPayload] = Seq(
+ UpdateRecoveryStatus(isRecovering = true),
+ NotifyFailedNode(Address("pekko", "n"))
+ )
+ payloads.foreach(p => assert(p.isInstanceOf[Serializable]))
+ }
+
+ //
---------------------------------------------------------------------------
+ // WorkflowMessage.getInMemSize
+ //
---------------------------------------------------------------------------
+
+ // A non-DataFrame payload so getInMemSize falls into the 200L default
branch.
+ private case class FixedSizePayload() extends WorkflowFIFOMessagePayload
+
+ "WorkflowMessage.getInMemSize" should "be the DataFrame's inMemSize for a
WorkflowFIFOMessage carrying a DataFrame" in {
+ val df = DataFrame(Array(tuple(1), tuple(2)))
+ val msg = WorkflowFIFOMessage(channel, 0L, df)
+ assert(WorkflowMessage.getInMemSize(msg) == df.inMemSize)
+ }
+
+ it should "be zero for an empty-frame WorkflowFIFOMessage" in {
+ val msg = WorkflowFIFOMessage(channel, 0L, DataFrame(Array.empty))
+ assert(WorkflowMessage.getInMemSize(msg) == 0L)
+ }
+
+ it should "default to 200L for a non-DataFrame WorkflowFIFOMessagePayload"
in {
+ val msg = WorkflowFIFOMessage(channel, 0L, FixedSizePayload())
+ assert(WorkflowMessage.getInMemSize(msg) == 200L)
+ }
+
+ // The catch-all `case _ => 200L` for non-WorkflowFIFOMessage subtypes is
+ // guarded by `WorkflowMessage` being sealed. Today the sealed hierarchy
+ // only has `WorkflowFIFOMessage`, so this branch is dead by construction;
+ // we leave it untested rather than open the seal.
+
+ //
---------------------------------------------------------------------------
+ // WorkflowFIFOMessagePayload trait wiring (sanity)
+ //
---------------------------------------------------------------------------
+
+ "WorkflowFIFOMessagePayload trait" should "be implementable as a custom
payload" in {
+ val payload: WorkflowFIFOMessagePayload = FixedSizePayload()
+ assert(payload.isInstanceOf[Serializable])
+ }
+
+ "DirectControlMessagePayload trait" should "be a WorkflowFIFOMessagePayload
subtype" in {
+ val custom: DirectControlMessagePayload = new DirectControlMessagePayload
{}
+ assert(custom.isInstanceOf[WorkflowFIFOMessagePayload])
+ }
+}