This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new b562b55b72 test(amber): add unit test coverage for logreplay
primitives (#4840)
b562b55b72 is described below
commit b562b55b725985b8ea8ebc9aa47287bcd84021b2
Author: Xinyuan Lin <[email protected]>
AuthorDate: Mon May 4 02:26:33 2026 -0700
test(amber): add unit test coverage for logreplay primitives (#4840)
### What changes were proposed in this PR?
Add `LogreplayPrimitivesSpec` covering five files in
`amber/.../engine/architecture/logreplay`:
`ReplayLoggerImpl`:
- Channel-switch logging
- Same-channel-no-message skip
- Message append (ProcessingStep + MessageContent)
- Channel-switch-without-message append
- Drain buffer clear between drains
- Synthetic ProcessingStep when drain step differs from lastStep
- ReplayDestination append
`OrderEnforcer` trait:
- Implementable by a custom subclass
`ReplayOrderEnforcer`:
- Immediate completion on empty queue (with onComplete fire)
- startStep skip during construction
- Step-driven channel advance via canProceed
- Single onComplete fire even when canProceed is called past the end
`ReplayLogRecord` case-class subtypes:
- Serializable + case-class equality + TerminateSignal singleton
`AsyncReplayLogWriter` is intentionally skipped — it spawns its own
thread and is hard to unit-test without flake. `ReplayLogGenerator` is
likewise skipped since it requires a `SequentialRecordStorage` with
records read from disk.
### Any related issues, documentation, discussions?
Closes #4839
### How was this PR tested?
`sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.logreplay.LogreplayPrimitivesSpec"`
- 13/13 tests pass.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../logreplay/LogreplayPrimitivesSpec.scala | 409 +++++++++++++++++++++
1 file changed, 409 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/LogreplayPrimitivesSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/LogreplayPrimitivesSpec.scala
new file mode 100644
index 0000000000..ddb6440c25
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/LogreplayPrimitivesSpec.scala
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.logreplay
+
+import org.apache.pekko.actor.ActorSystem
+import org.apache.pekko.serialization.{Serialization, SerializationExtension}
+import org.apache.pekko.testkit.TestKit
+import org.apache.texera.amber.core.virtualidentity.{
+ ActorVirtualIdentity,
+ ChannelIdentity,
+ EmbeddedControlMessageIdentity
+}
+import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
+ AsyncRPCContext,
+ ControlInvocation,
+ EmptyRequest
+}
+import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{
+ EmptyReturn,
+ ReturnInvocation
+}
+import org.apache.texera.amber.engine.common.AmberRuntime
+import org.apache.texera.amber.engine.common.ambermessage.{
+ WorkflowFIFOMessage,
+ WorkflowFIFOMessagePayload
+}
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.flatspec.AnyFlatSpec
+
+import scala.collection.mutable
+
+class LogreplayPrimitivesSpec extends AnyFlatSpec with BeforeAndAfterAll {
+
+ private val workerId = ActorVirtualIdentity("worker-1")
+ private val cidA =
+ ChannelIdentity(ActorVirtualIdentity("up1"), workerId, isControl = false)
+ private val cidB =
+ ChannelIdentity(ActorVirtualIdentity("up2"), workerId, isControl = false)
+ private val cidC =
+ ChannelIdentity(ActorVirtualIdentity("up3"), workerId, isControl = false)
+
+ // Suite-local ActorSystem + Serialization injected into AmberRuntime so the
+ // ReplayLogRecord round-trip below uses the same Pekko serialization stack
+ // that SequentialRecordStorage uses in production. Torn down in afterAll
+ // so no Pekko threads outlive the suite. (Same pattern as
+ // CheckpointSubsystemSpec.)
+ private val testSystem: ActorSystem =
+ ActorSystem("LogreplayPrimitivesSpec-test", AmberRuntime.akkaConfig)
+ private val testSerde: Serialization = SerializationExtension(testSystem)
+
+ private def setAmberRuntimeField(name: String, value: AnyRef): Unit = {
+ val field = AmberRuntime.getClass.getDeclaredField(name)
+ field.setAccessible(true)
+ field.set(AmberRuntime, value)
+ }
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ setAmberRuntimeField("_actorSystem", testSystem)
+ setAmberRuntimeField("_serde", testSerde)
+ }
+
+ override protected def afterAll(): Unit = {
+ setAmberRuntimeField("_serde", null)
+ setAmberRuntimeField("_actorSystem", null)
+ TestKit.shutdownActorSystem(testSystem)
+ super.afterAll()
+ }
+
+ private case class FixedSizePayload() extends WorkflowFIFOMessagePayload
+ private def msg(seq: Long): WorkflowFIFOMessage =
+ WorkflowFIFOMessage(cidA, seq, FixedSizePayload())
+
+ //
---------------------------------------------------------------------------
+ // ReplayLoggerImpl
+ //
---------------------------------------------------------------------------
+
+ "ReplayLoggerImpl.logCurrentStepWithMessage" should "append a ProcessingStep
when the channel changes" in {
+ val l = new ReplayLoggerImpl()
+ l.logCurrentStepWithMessage(0L, cidA, None)
+ val drained = l.drainCurrentLogRecords(0L)
+ assert(drained.toList == List(ProcessingStep(cidA, 0L)))
+ }
+
+ it should "skip a same-channel call with no message" in {
+ val l = new ReplayLoggerImpl()
+ l.logCurrentStepWithMessage(0L, cidA, None)
+ l.drainCurrentLogRecords(0L) // reset
+ l.logCurrentStepWithMessage(1L, cidA, None) // same channel, no message
+ val drained = l.drainCurrentLogRecords(1L)
+ // Should still only carry the trailing ProcessingStep emitted by drain.
+ assert(drained.toList == List(ProcessingStep(cidA, 1L)))
+ }
+
+ it should "append both a ProcessingStep and a MessageContent when a message
is provided" in {
+ val l = new ReplayLoggerImpl()
+ val m = msg(7L)
+ l.logCurrentStepWithMessage(2L, cidA, Some(m))
+ val drained = l.drainCurrentLogRecords(2L)
+ assert(drained.toList == List(ProcessingStep(cidA, 2L), MessageContent(m)))
+ }
+
+ it should "still log when a same-channel call carries a message (only
no-message + same-channel is skipped)" in {
+ // The skip guard in logCurrentStepWithMessage is `currentChannelId ==
channelId
+ // && message.isEmpty` — both conditions, not just the channel match.
After a
+ // first call sets the current channel, a *subsequent* same-channel call
with
+ // a non-empty message must still emit ProcessingStep + MessageContent.
+ val l = new ReplayLoggerImpl()
+ l.logCurrentStepWithMessage(0L, cidA, None) // sets currentChannelId = cidA
+ l.drainCurrentLogRecords(0L) // reset
+ val m = msg(11L)
+ l.logCurrentStepWithMessage(1L, cidA, Some(m)) // SAME channel, WITH
message
+ val drained = l.drainCurrentLogRecords(1L)
+ assert(drained.toList == List(ProcessingStep(cidA, 1L), MessageContent(m)))
+ }
+
+ it should "append a ProcessingStep on a channel switch even if no message is
provided" in {
+ val l = new ReplayLoggerImpl()
+ l.logCurrentStepWithMessage(0L, cidA, None)
+ l.logCurrentStepWithMessage(1L, cidB, None) // channel change → must record
+ val drained = l.drainCurrentLogRecords(1L)
+ assert(drained.toList == List(ProcessingStep(cidA, 0L),
ProcessingStep(cidB, 1L)))
+ }
+
+ "ReplayLoggerImpl.markAsReplayDestination" should
+ "preserve exact ordering: in-flight ProcessingStep, then
ReplayDestination, then synthetic trailing step" in {
+ // ReplayLogGenerator depends on the relative position of ReplayDestination
+ // within the record stream — replay stops at it. So a `contains` check
+ // would silently accept a regression that duplicated ReplayDestination or
+ // moved it after the synthetic trailing ProcessingStep emitted by drain.
+ // Pin the full sequence instead.
+ val l = new ReplayLoggerImpl()
+ val ecm = EmbeddedControlMessageIdentity("checkpoint-1")
+ l.logCurrentStepWithMessage(0L, cidA, None) // sets currentChannelId,
appends ProcessingStep
+ l.markAsReplayDestination(ecm)
+ // Drain at a step beyond lastStep so the synthetic trailing ProcessingStep
+ // is also emitted; this is exactly the production drain behavior we need
+ // to lock down (the synthetic step must come AFTER the destination).
+ val drained = l.drainCurrentLogRecords(3L).toList
+ assert(
+ drained == List(
+ ProcessingStep(cidA, 0L),
+ ReplayDestination(ecm),
+ ProcessingStep(cidA, 3L)
+ ),
+ s"unexpected record order: $drained"
+ )
+ }
+
+ "ReplayLoggerImpl.drainCurrentLogRecords" should "clear the buffer between
drains" in {
+ val l = new ReplayLoggerImpl()
+ l.logCurrentStepWithMessage(0L, cidA, None)
+ val first = l.drainCurrentLogRecords(0L)
+ val second = l.drainCurrentLogRecords(0L)
+ assert(first.nonEmpty)
+ assert(second.isEmpty, "second drain must yield no leftover records")
+ }
+
+ it should "append a synthetic ProcessingStep when the requested step differs
from lastStep" in {
+ val l = new ReplayLoggerImpl()
+ l.logCurrentStepWithMessage(0L, cidA, None)
+ val drained = l.drainCurrentLogRecords(5L)
+ // Two records: the original ProcessingStep at step 0 and the synthetic
one at step 5.
+ assert(drained.toList == List(ProcessingStep(cidA, 0L),
ProcessingStep(cidA, 5L)))
+ }
+
+ //
---------------------------------------------------------------------------
+ // OrderEnforcer trait
+ //
---------------------------------------------------------------------------
+
+ "OrderEnforcer trait" should "be implementable as a custom subclass" in {
+ val enf = new OrderEnforcer {
+ override var isCompleted: Boolean = false
+ override def canProceed(channelId: ChannelIdentity): Boolean =
!isCompleted
+ }
+ assert(enf.canProceed(cidA))
+ enf.isCompleted = true
+ assert(!enf.canProceed(cidA))
+ }
+
+ //
---------------------------------------------------------------------------
+ // ReplayOrderEnforcer
+ //
---------------------------------------------------------------------------
+
+ /** Stub that exposes a controllable `getStep`. */
+ private class StubLogManager(
+ handler: Either[
+
org.apache.texera.amber.engine.architecture.worker.WorkflowWorker.MainThreadDelegateMessage,
+ WorkflowFIFOMessage
+ ] => Unit
+ ) extends EmptyReplayLogManagerImpl(handler) {
+ private var step = 0L
+ def setStep(s: Long): Unit = { step = s }
+ override def getStep: Long = step
+ }
+
+ "ReplayOrderEnforcer" should "be completed immediately when the step queue
is empty" in {
+ val mgr = new StubLogManager(_ => ())
+ val empty = mutable.Queue[ProcessingStep]()
+ var fired = false
+ val enf = new ReplayOrderEnforcer(mgr, empty, startStep = 0L, () => fired
= true)
+ assert(enf.isCompleted)
+ assert(fired)
+ }
+
+ it should "skip log entries whose step is at or below startStep during
construction (boundary inclusive)" in {
+ // Use distinct channels at and around the boundary so the test is
sensitive
+ // to a `step < startStep` (vs `<= startStep`) regression. With
startStep=1L:
+ // - correct impl drops steps 0 and 1 → after ctor, head is step 2 (cidC)
+ // - buggy impl that drops only step < 1 leaves step 1 (cidB) at the head
+ // At step=2, canProceed(cidC) consumes step 2 and returns true under the
+ // correct impl, but the buggy impl never matches the leftover step-1 entry
+ // (`head.step == step` is 1 != 2), so currentChannelId stays at cidA (set
+ // from the only forwardNext that ran on step 0) and canProceed(cidC)
returns
+ // false. Either side mismatching this assertion catches the boundary bug.
+ val mgr = new StubLogManager(_ => ())
+ mgr.setStep(2L)
+ val q = mutable.Queue[ProcessingStep](
+ ProcessingStep(cidA, 0L),
+ ProcessingStep(cidB, 1L), // boundary entry — distinct channel
+ ProcessingStep(cidC, 2L),
+ ProcessingStep(cidA, 3L)
+ )
+ val enf = new ReplayOrderEnforcer(mgr, q, startStep = 1L, () => ())
+ assert(!enf.isCompleted)
+ val proceeded = enf.canProceed(cidC)
+ assert(
+ proceeded,
+ "boundary entry must be dropped (step <= startStep), so cidC at step 2
is the next allowed channel"
+ )
+ }
+
+ it should "advance to the next channel and fire onComplete on the
non-empty-to-empty transition" in {
+ val mgr = new StubLogManager(_ => ())
+ mgr.setStep(0L)
+ val q = mutable.Queue[ProcessingStep](
+ ProcessingStep(cidA, 0L),
+ ProcessingStep(cidB, 1L)
+ )
+ var fired = 0
+ val enf = new ReplayOrderEnforcer(mgr, q, startStep = -1L, () => fired +=
1)
+ assert(fired == 0, "onComplete must NOT fire at construction while the
queue is non-empty")
+
+ // At step 0, the head matches and is consumed; currentChannelId becomes
cidA.
+ assert(enf.canProceed(cidA))
+ assert(!enf.canProceed(cidB), "still on cidA until the next step is
observed")
+ assert(!enf.isCompleted)
+ assert(fired == 0, "onComplete must NOT fire while the queue still has
entries")
+
+ mgr.setStep(1L)
+ // The pre-advancement query: cidA is the previous channel, but the while
+ // loop in canProceed will consume step 1 (cidB) before evaluating the
+ // membership check, so cidA is rejected at step 1.
+ assert(!enf.canProceed(cidA), "step 1's channel is cidB, not cidA")
+ assert(enf.isCompleted, "queue is exhausted, replay must mark completed")
+ // Now the previously consumed cidB is the current channel — pin that
+ // a regression that drains the queue without updating the active
+ // channel would NOT just satisfy this test by silence.
+ assert(enf.canProceed(cidB), "cidB must be the active channel after step 1
is consumed")
+ // onComplete must fire exactly once, on the empty-transition.
+ assert(fired == 1, "onComplete must fire on the non-empty-to-empty
transition")
+ enf.canProceed(cidA) // further calls past completion must not refire
+ assert(fired == 1)
+ }
+
+ it should "fire onComplete exactly once even if canProceed is called
repeatedly past the end" in {
+ val mgr = new StubLogManager(_ => ())
+ var fired = 0
+ val enf = new ReplayOrderEnforcer(
+ mgr,
+ mutable.Queue.empty[ProcessingStep],
+ startStep = 0L,
+ () => fired += 1
+ )
+ assert(fired == 1)
+ enf.canProceed(cidA) // already completed → must not refire
+ enf.canProceed(cidB)
+ assert(fired == 1)
+ }
+
+ it should "consume every queue entry sharing the current step (the
duplicate-step while loop)" in {
+ // canProceed contains a `while (head.step == step) forwardNext()` loop
+ // specifically because checkpoints produce duplicate step records (the
+ // MainThreadDelegateMessage path emits an extra ProcessingStep at the
+ // same step). A regression that consumed only one entry per step would
+ // leave a stale duplicate at the head, so a subsequent canProceed at
+ // the next step would still see the old channel — not the real next one.
+ // Pin the multi-consume behavior with two adjacent same-step entries.
+ val mgr = new StubLogManager(_ => ())
+ mgr.setStep(0L)
+ val q = mutable.Queue[ProcessingStep](
+ ProcessingStep(cidA, 0L),
+ ProcessingStep(cidB, 0L), // duplicate step — must be consumed too
+ ProcessingStep(cidC, 1L)
+ )
+ val enf = new ReplayOrderEnforcer(mgr, q, startStep = -1L, () => ())
+
+ // After both step-0 entries are consumed, currentChannelId is the LAST
+ // one (cidB). cidA was the head but is no longer the active channel.
+ assert(enf.canProceed(cidB), "the second step-0 entry (cidB) must be the
active channel")
+ assert(
+ !enf.canProceed(cidA),
+ "cidA was consumed by the duplicate-step while loop and is no longer
active"
+ )
+ assert(!enf.isCompleted, "step 1 (cidC) is still queued")
+
+ // Advancing to step 1 consumes cidC, leaving the queue empty.
+ mgr.setStep(1L)
+ assert(enf.canProceed(cidC))
+ assert(enf.isCompleted)
+ }
+
+ //
---------------------------------------------------------------------------
+ // ReplayLogRecord serde
+ //
---------------------------------------------------------------------------
+
+ // Round-trip each ReplayLogRecord subtype through Pekko Serialization (the
+ // exact path SequentialRecordStorage uses in production via
+ // AmberRuntime.serde). A broken serde registration or a deserialization
+ // mismatch would fail this test, where `isInstanceOf[Serializable]` would
+ // not.
+ private def roundTrip(r: ReplayLogRecord): ReplayLogRecord = {
+ val bytes = AmberRuntime.serde.serialize(r).get
+ AmberRuntime.serde.deserialize(bytes, classOf[ReplayLogRecord]).get
+ }
+
+ // Production never writes a DataFrame to the replay log: both the
+ // controller and DP-thread paths filter for `DirectControlMessagePayload`
+ // before logging (see `Controller.scala` and `DPThread.scala` use of
+ // `_.payload.isInstanceOf[DirectControlMessagePayload]`). The trait has
+ // two concrete subtypes that production actually serializes —
+ // `ControlInvocation` (outgoing call) and `ReturnInvocation` (reply) —
+ // and `processDCM` handles both. Round-trip each so a serializer
+ // regression on either subtype fails this spec.
+
+ "ReplayLogRecord MessageContent" should "round-trip a ControlInvocation
payload through AmberRuntime.serde" in {
+ val payload = ControlInvocation(
+ methodName = "doNothing",
+ command = EmptyRequest(),
+ context = AsyncRPCContext(workerId, workerId),
+ commandId = 42L
+ )
+ val msg = WorkflowFIFOMessage(cidA, 1L, payload)
+ val original: ReplayLogRecord = MessageContent(msg)
+ val restored = roundTrip(original)
+ assert(restored == original)
+ val restoredMsg = restored.asInstanceOf[MessageContent].message
+ assert(restoredMsg == msg)
+ val restoredPayload = restoredMsg.payload.asInstanceOf[ControlInvocation]
+ assert(restoredPayload.methodName == "doNothing")
+ assert(restoredPayload.commandId == 42L)
+ }
+
+ it should "round-trip a ReturnInvocation payload through AmberRuntime.serde"
in {
+ val payload = ReturnInvocation(commandId = 42L, returnValue =
EmptyReturn())
+ val msg = WorkflowFIFOMessage(cidA, 2L, payload)
+ val original: ReplayLogRecord = MessageContent(msg)
+ val restored = roundTrip(original)
+ assert(restored == original)
+ val restoredMsg = restored.asInstanceOf[MessageContent].message
+ assert(restoredMsg == msg)
+ val restoredPayload = restoredMsg.payload.asInstanceOf[ReturnInvocation]
+ assert(restoredPayload.commandId == 42L)
+ assert(restoredPayload.returnValue == EmptyReturn())
+ }
+
+ "ReplayLogRecord ProcessingStep" should "round-trip through
AmberRuntime.serde" in {
+ val original: ReplayLogRecord = ProcessingStep(cidA, 7L)
+ val restored = roundTrip(original)
+ assert(restored == original)
+ val ps = restored.asInstanceOf[ProcessingStep]
+ assert(ps.channelId == cidA)
+ assert(ps.step == 7L)
+ }
+
+ "ReplayLogRecord ReplayDestination" should "round-trip through
AmberRuntime.serde" in {
+ val ecm = EmbeddedControlMessageIdentity("ecm-1")
+ val original: ReplayLogRecord = ReplayDestination(ecm)
+ val restored = roundTrip(original)
+ assert(restored == original)
+ assert(restored.asInstanceOf[ReplayDestination].id == ecm)
+ }
+
+ // NOTE: TerminateSignal is intentionally NOT round-tripped here. It is an
+ // in-memory shutdown sentinel for AsyncReplayLogWriter and is filtered
+ // out before records are written to storage, so a Pekko-serialization
+ // round-trip is not on a real production path. Pinning `eq`-identity
+ // post-deserialization would over-constrain the serializer (a future
+ // change that re-creates the case-object via reflection — still
+ // semantically correct — would fail). Subtype membership is already
+ // pinned by the case-object's compile-time `extends ReplayLogRecord`.
+}