This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new cc37ee7f99 test(amber): add unit test coverage for worker managers
(Statistics/Serialization/Pause) (#4836)
cc37ee7f99 is described below
commit cc37ee7f999b0e3cfce4c7d1b0c7880dc9317a69
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun May 3 13:23:58 2026 -0700
test(amber): add unit test coverage for worker managers
(Statistics/Serialization/Pause) (#4836)
### What changes were proposed in this PR?
Add `WorkerManagersSpec` covering the three worker-side state managers
under `amber/.../worker` and `amber/.../worker/managers`.
`StatisticsManager`:
- Defaults all counters to zero
- Per-port input/output count + size accumulation
- Negative tuple sizes rejected on both input and output paths
- Processing-time accumulation (data and control), with negative
rejection
- `idleTime = totalExecutionTime - dataProcessingTime -
controlProcessingTime`
- `updateTotalExecutionTime` rejects current time before
`workerStartTime`
`SerializationManager`:
- `applySerialization` is a no-op when no callback is registered
- Registered callback fires exactly once and is then cleared
- Latest registered callback overwrites any previous one
`PauseManager` (with a stub `InputGateway` and three real
`AmberFIFOChannel` instances):
- `isPaused` is false initially
- Global `pause` disables every data channel (control channels left
alone) and reports paused
- `resume` re-enables all data channels when no specific-input pauses
remain
- Stays paused when other global pauses are still active
- `pauseInputChannel` disables only the listed channels
- One specific-pause resume leaves still-paused channels disabled
`EmbeddedControlMessageManager` is intentionally skipped because it
requires the full `InputManager` + ECM infrastructure and is best
exercised by an integration test.
### Any related issues, documentation, discussions?
Closes #4835
### How was this PR tested?
`sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.worker.managers.WorkerManagersSpec"`
- 18/18 tests pass.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../worker/managers/WorkerManagersSpec.scala | 268 +++++++++++++++++++++
1 file changed, 268 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala
new file mode 100644
index 0000000000..3fbff39148
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/worker/managers/WorkerManagersSpec.scala
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.worker.managers
+
+import org.apache.texera.amber.core.executor.OperatorExecutor
+import org.apache.texera.amber.core.tuple.{Tuple, TupleLike}
+import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity,
ChannelIdentity}
+import org.apache.texera.amber.core.workflow.PortIdentity
+import org.scalatest.flatspec.AnyFlatSpec
+
+class WorkerManagersSpec extends AnyFlatSpec {
+
+ //
---------------------------------------------------------------------------
+ // StatisticsManager
+ //
---------------------------------------------------------------------------
+
+ // Minimal OperatorExecutor instance — StatisticsManager.getStatistics
ignores
+ // its argument today, so any concrete impl works.
+ private val nullExec: OperatorExecutor = new OperatorExecutor {
+ override def processTuple(t: Tuple, port: Int): Iterator[TupleLike] =
Iterator.empty
+ }
+
+ "StatisticsManager" should "default all counters to zero" in {
+ val sm = new StatisticsManager()
+ assert(sm.getInputTupleCount == 0L)
+ assert(sm.getOutputTupleCount == 0L)
+ val s = sm.getStatistics(nullExec)
+ assert(s.inputTupleMetrics.isEmpty)
+ assert(s.outputTupleMetrics.isEmpty)
+ assert(s.dataProcessingTime == 0L)
+ assert(s.controlProcessingTime == 0L)
+ // totalExecutionTime - data - control = 0 - 0 - 0 = 0
+ assert(s.idleTime == 0L)
+ }
+
+ "StatisticsManager.increaseInputStatistics" should "accumulate count and
size per port" in {
+ val sm = new StatisticsManager()
+ sm.increaseInputStatistics(PortIdentity(0), 100)
+ sm.increaseInputStatistics(PortIdentity(0), 50)
+ sm.increaseInputStatistics(PortIdentity(1), 25)
+ assert(sm.getInputTupleCount == 3L)
+ val byPort = sm
+ .getStatistics(nullExec)
+ .inputTupleMetrics
+ .map(m => m.portId -> (m.tupleMetrics.count, m.tupleMetrics.size))
+ .toMap
+ assert(byPort(PortIdentity(0)) == (2L, 150L))
+ assert(byPort(PortIdentity(1)) == (1L, 25L))
+ }
+
+ it should "reject negative tuple sizes" in {
+ val sm = new StatisticsManager()
+ assertThrows[IllegalArgumentException] {
+ sm.increaseInputStatistics(PortIdentity(0), -1)
+ }
+ }
+
+ "StatisticsManager.increaseOutputStatistics" should "accumulate count and
size per port" in {
+ val sm = new StatisticsManager()
+ sm.increaseOutputStatistics(PortIdentity(0), 30)
+ sm.increaseOutputStatistics(PortIdentity(0), 70)
+ assert(sm.getOutputTupleCount == 2L)
+ val out = sm.getStatistics(nullExec).outputTupleMetrics
+ assert(out.size == 1)
+ assert(out.head.tupleMetrics.count == 2L)
+ assert(out.head.tupleMetrics.size == 100L)
+ }
+
+ it should "reject negative tuple sizes" in {
+ val sm = new StatisticsManager()
+ assertThrows[IllegalArgumentException] {
+ sm.increaseOutputStatistics(PortIdentity(0), -1)
+ }
+ }
+
+ "StatisticsManager.increaseDataProcessingTime" should "accumulate time and
reject negatives" in {
+ val sm = new StatisticsManager()
+ sm.increaseDataProcessingTime(100)
+ sm.increaseDataProcessingTime(50)
+ assert(sm.getStatistics(nullExec).dataProcessingTime == 150L)
+ assertThrows[IllegalArgumentException] {
+ sm.increaseDataProcessingTime(-1)
+ }
+ }
+
+ "StatisticsManager.increaseControlProcessingTime" should "accumulate time
and reject negatives" in {
+ val sm = new StatisticsManager()
+ sm.increaseControlProcessingTime(20)
+ sm.increaseControlProcessingTime(40)
+ assert(sm.getStatistics(nullExec).controlProcessingTime == 60L)
+ assertThrows[IllegalArgumentException] {
+ sm.increaseControlProcessingTime(-1)
+ }
+ }
+
+ "StatisticsManager.updateTotalExecutionTime" should "compute idleTime as
total - data - control" in {
+ val sm = new StatisticsManager()
+ sm.initializeWorkerStartTime(1000L)
+ sm.increaseDataProcessingTime(200L)
+ sm.increaseControlProcessingTime(100L)
+ sm.updateTotalExecutionTime(2000L)
+ val s = sm.getStatistics(nullExec)
+ assert(s.dataProcessingTime == 200L)
+ assert(s.controlProcessingTime == 100L)
+ assert(s.idleTime == 2000L - 1000L - 200L - 100L)
+ }
+
+ it should "reject a current time before workerStartTime" in {
+ val sm = new StatisticsManager()
+ sm.initializeWorkerStartTime(1000L)
+ assertThrows[IllegalArgumentException] {
+ sm.updateTotalExecutionTime(500L)
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // SerializationManager
+ //
---------------------------------------------------------------------------
+
+ "SerializationManager.applySerialization" should "be a no-op when no
callback is registered" in {
+ val sm = new SerializationManager(ActorVirtualIdentity("worker-1"))
+ sm.applySerialization() // does not throw
+ succeed
+ }
+
+ it should "invoke the registered callback exactly once and then clear it" in
{
+ val sm = new SerializationManager(ActorVirtualIdentity("worker-1"))
+ var calls = 0
+ sm.registerSerialization(() => calls += 1)
+ sm.applySerialization()
+ sm.applySerialization() // second call must be a no-op (callback was
cleared)
+ assert(calls == 1)
+ }
+
+ it should "let the latest registered callback overwrite any previous one" in
{
+ val sm = new SerializationManager(ActorVirtualIdentity("worker-1"))
+ var firstCalls = 0
+ var secondCalls = 0
+ sm.registerSerialization(() => firstCalls += 1)
+ sm.registerSerialization(() => secondCalls += 1)
+ sm.applySerialization()
+ assert(firstCalls == 0)
+ assert(secondCalls == 1)
+ }
+
+ //
---------------------------------------------------------------------------
+ // PauseManager (with a stub InputGateway)
+ //
---------------------------------------------------------------------------
+
+ import org.apache.texera.amber.engine.architecture.logreplay.OrderEnforcer
+ import
org.apache.texera.amber.engine.architecture.messaginglayer.{AmberFIFOChannel,
InputGateway}
+ import org.apache.texera.amber.engine.architecture.worker.{
+ BackpressurePause,
+ OperatorLogicPause,
+ PauseManager,
+ UserPause
+ }
+
+ /**
+ * Stub gateway with a fixed set of channels. `tryPickChannel` /
+ * `tryPickControlChannel` are unused by PauseManager and return None.
+ */
+ private class StubGateway(channels: Map[ChannelIdentity, AmberFIFOChannel])
extends InputGateway {
+ override def tryPickControlChannel: Option[AmberFIFOChannel] = None
+ override def tryPickChannel: Option[AmberFIFOChannel] = None
+ override def getAllChannels: Iterable[AmberFIFOChannel] = channels.values
+ override def getAllDataChannels: Iterable[AmberFIFOChannel] =
+ channels.collect { case (cid, ch) if !cid.isControl => ch }
+ override def getChannel(channelId: ChannelIdentity): AmberFIFOChannel =
channels(channelId)
+ override def getAllControlChannels: Iterable[AmberFIFOChannel] =
+ channels.collect { case (cid, ch) if cid.isControl => ch }
+ override def addEnforcer(enforcer: OrderEnforcer): Unit = ()
+ }
+
+ private val workerId = ActorVirtualIdentity("w")
+ private val dataA =
+ ChannelIdentity(ActorVirtualIdentity("up1"), workerId, isControl = false)
+ private val dataB =
+ ChannelIdentity(ActorVirtualIdentity("up2"), workerId, isControl = false)
+ private val ctrl =
+ ChannelIdentity(ActorVirtualIdentity("ctrl"), workerId, isControl = true)
+
+ private def newGateway(): (StubGateway, AmberFIFOChannel, AmberFIFOChannel,
AmberFIFOChannel) = {
+ val a = new AmberFIFOChannel(dataA)
+ val b = new AmberFIFOChannel(dataB)
+ val c = new AmberFIFOChannel(ctrl)
+ val gw = new StubGateway(Map(dataA -> a, dataB -> b, ctrl -> c))
+ (gw, a, b, c)
+ }
+
+ "PauseManager.isPaused" should "be false initially" in {
+ val (gw, _, _, _) = newGateway()
+ val pm = new PauseManager(workerId, gw)
+ assert(!pm.isPaused)
+ }
+
+ "PauseManager.pause" should "disable every data channel and report paused"
in {
+ val (gw, a, b, c) = newGateway()
+ val pm = new PauseManager(workerId, gw)
+ pm.pause(UserPause)
+ assert(pm.isPaused)
+ assert(!a.isEnabled)
+ assert(!b.isEnabled)
+ // control channel is not in getAllDataChannels, so it stays enabled
+ assert(c.isEnabled)
+ }
+
+ "PauseManager.resume" should "re-enable all data channels when no specific
input pauses remain" in {
+ val (gw, a, b, c) = newGateway()
+ val pm = new PauseManager(workerId, gw)
+ pm.pause(UserPause)
+ pm.resume(UserPause)
+ assert(!pm.isPaused)
+ assert(a.isEnabled)
+ assert(b.isEnabled)
+ assert(c.isEnabled)
+ }
+
+ it should "stay paused if other global pauses are still active" in {
+ val (gw, a, _, _) = newGateway()
+ val pm = new PauseManager(workerId, gw)
+ pm.pause(UserPause)
+ pm.pause(BackpressurePause)
+ pm.resume(UserPause)
+ // backpressure still pausing → channels stay disabled
+ assert(pm.isPaused)
+ assert(!a.isEnabled)
+ }
+
+ "PauseManager.pauseInputChannel" should "disable only the listed channels"
in {
+ val (gw, a, b, _) = newGateway()
+ val pm = new PauseManager(workerId, gw)
+ pm.pauseInputChannel(OperatorLogicPause, List(dataA))
+ // global pauses are empty → not "isPaused"
+ assert(!pm.isPaused)
+ assert(!a.isEnabled)
+ assert(b.isEnabled)
+ }
+
+ it should "leave still-paused specific channels disabled when only one of
multiple specific pauses is resumed" in {
+ val (gw, a, b, _) = newGateway()
+ val pm = new PauseManager(workerId, gw)
+ pm.pauseInputChannel(OperatorLogicPause, List(dataA))
+ pm.pauseInputChannel(BackpressurePause, List(dataB))
+ pm.resume(OperatorLogicPause)
+ // dataA's only specific pause was OperatorLogicPause → re-enabled.
+ // dataB still has BackpressurePause → still disabled.
+ assert(a.isEnabled)
+ assert(!b.isEnabled)
+ }
+}