This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 21cf0fc380 test(amber): add unit test coverage for WorkerExecution
(#4768)
21cf0fc380 is described below
commit 21cf0fc3809de3c7679dfe052098968642924028
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun May 3 07:01:29 2026 -0700
test(amber): add unit test coverage for WorkerExecution (#4768)
### What changes were proposed in this PR?
Add `WorkerExecutionSpec` covering update semantics and port-execution
allocation in `WorkerExecution`:
- Default state is `UNINITIALIZED` with zeroed stats
- `update(state)` only applies when the timestamp is strictly newer;
non-newer timestamps are ignored
- `update(state, stats)` updates both atomically when newer; ignores
stale timestamps
- `update(stats)` updates only the stats portion when newer; ignores
stale timestamps
- `getInputPortExecution` / `getOutputPortExecution` lazily create and
reuse a single instance per port id, with separate input vs output
namespaces
### Any related issues, documentation, discussions?
Closes #4767
### How was this PR tested?
`sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.deploysemantics.layer.WorkerExecutionSpec"`
— 10/10 tests pass.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../layer/WorkerExecutionSpec.scala | 106 +++++++++++++++++++++
1 file changed, 106 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecutionSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecutionSpec.scala
new file mode 100644
index 0000000000..2c39954d04
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/deploysemantics/layer/WorkerExecutionSpec.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.deploysemantics.layer
+
+import org.apache.texera.amber.core.workflow.PortIdentity
+import
org.apache.texera.amber.engine.architecture.worker.statistics.{WorkerState,
WorkerStatistics}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class WorkerExecutionSpec extends AnyFlatSpec {
+
+ private def stats(idle: Long): WorkerStatistics =
+ WorkerStatistics(Seq.empty, Seq.empty, 0L, 0L, idle)
+
+ "WorkerExecution" should "have UNINITIALIZED state and zeroed stats by
default" in {
+ val we = WorkerExecution()
+ assert(we.getState == WorkerState.UNINITIALIZED)
+ assert(we.getStats.idleTime == 0L)
+ assert(we.getStats.dataProcessingTime == 0L)
+ assert(we.getStats.controlProcessingTime == 0L)
+ }
+
+ "WorkerExecution.update(state)" should "apply when the timestamp is newer"
in {
+ val we = WorkerExecution()
+ we.update(timeStamp = 10L, state = WorkerState.RUNNING)
+ assert(we.getState == WorkerState.RUNNING)
+ }
+
+ it should "ignore updates with a non-newer timestamp" in {
+ val we = WorkerExecution()
+ we.update(timeStamp = 10L, state = WorkerState.RUNNING)
+ we.update(timeStamp = 10L, state = WorkerState.PAUSED) // not strictly
newer
+ we.update(timeStamp = 5L, state = WorkerState.COMPLETED) // older
+ assert(we.getState == WorkerState.RUNNING)
+ }
+
+ "WorkerExecution.update(state, stats)" should "update both atomically when
newer" in {
+ val we = WorkerExecution()
+ we.update(timeStamp = 10L, state = WorkerState.RUNNING, stats = stats(idle
= 7L))
+ assert(we.getState == WorkerState.RUNNING)
+ assert(we.getStats.idleTime == 7L)
+ }
+
+ it should "ignore updates with a non-newer timestamp" in {
+ val we = WorkerExecution()
+ we.update(timeStamp = 10L, state = WorkerState.RUNNING, stats = stats(idle
= 7L))
+ we.update(timeStamp = 5L, state = WorkerState.COMPLETED, stats =
stats(idle = 99L))
+ assert(we.getState == WorkerState.RUNNING)
+ assert(we.getStats.idleTime == 7L)
+ }
+
+ "WorkerExecution.update(stats)" should "update only the stats when newer" in
{
+ val we = WorkerExecution()
+ we.update(timeStamp = 10L, state = WorkerState.RUNNING, stats = stats(idle
= 7L))
+ we.update(timeStamp = 20L, stats = stats(idle = 42L))
+ assert(we.getState == WorkerState.RUNNING)
+ assert(we.getStats.idleTime == 42L)
+ }
+
+ it should "ignore stats updates with a non-newer timestamp" in {
+ val we = WorkerExecution()
+ we.update(timeStamp = 20L, stats = stats(idle = 42L))
+ we.update(timeStamp = 20L, stats = stats(idle = 99L)) // not strictly newer
+ we.update(timeStamp = 5L, stats = stats(idle = 0L)) // older
+ assert(we.getStats.idleTime == 42L)
+ }
+
+ "WorkerExecution.getInputPortExecution" should "lazily create and reuse a
port execution per port id" in {
+ val we = WorkerExecution()
+ val first = we.getInputPortExecution(PortIdentity(0))
+ val same = we.getInputPortExecution(PortIdentity(0))
+ val other = we.getInputPortExecution(PortIdentity(1))
+ assert(first eq same)
+ assert(first ne other)
+ }
+
+ "WorkerExecution.getOutputPortExecution" should "lazily create and reuse a
port execution per port id" in {
+ val we = WorkerExecution()
+ val first = we.getOutputPortExecution(PortIdentity(0))
+ val same = we.getOutputPortExecution(PortIdentity(0))
+ assert(first eq same)
+ }
+
+ it should "use a separate map from getInputPortExecution" in {
+ val we = WorkerExecution()
+ val input = we.getInputPortExecution(PortIdentity(0))
+ val output = we.getOutputPortExecution(PortIdentity(0))
+ assert(input ne output)
+ }
+}