This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new e448db30a0 test(workflow-core): add unit test coverage for workflow
core types (#4842)
e448db30a0 is described below
commit e448db30a01688d2f01286d2be63e26ebf12b862
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun May 3 14:10:05 2026 -0700
test(workflow-core): add unit test coverage for workflow core types (#4842)
### What changes were proposed in this PR?
Add `WorkflowCoreTypesSpec` covering five workflow-core domain types in
`common/workflow-core/.../core/workflow`:
- `LocationPreference`: `PreferController` / `RoundRobinPreference` are
distinct singletons; both Serializable
- `WorkflowSettings`: defaults (`dataTransferBatchSize = 400`,
`outputPortsNeedingStorage = Set.empty`); custom-value round-trip
- `WorkflowContext`: defaults equal documented constants; `var` fields
can be reassigned
- `PhysicalOp`:
- `isSourceOperator` true on no inputs / false once an input port is
added
- `withLocationPreference`, `withParallelizable`,
`withSuggestedWorkerNum` round-trip through `copy`
- `addInputLink` succeeds; assertion failures on wrong target id and on
unknown port
- `addOutputLink` succeeds
- `removeInputLink` / `removeOutputLink` drop only the matching link
- `propagateSchema` fills outputs once all inputs are known; throws
`IllegalArgumentException` on conflicting input schema; leaves outputs
as `Left` when the propagation function throws
- `isOutputLinkBlocking` reflects the source ports `blocking` flag
- `PhysicalPlan`:
- `getOperator`, `getSourceOperatorIds` with and without links
- `topologicalIterator` ordering
- `getUpstreamPhysicalOpIds`, `getUpstreamPhysicalLinks`,
`getDownstreamPhysicalLinks`
- `getSubPlan` only includes the requested operators and their internal
links
- `getPhysicalOpsOfLogicalOp` returns the matching physical ops
### Any related issues, documentation, discussions?
Closes #4841
### How was this PR tested?
`sbt "WorkflowCore/testOnly
org.apache.texera.amber.core.workflow.WorkflowCoreTypesSpec"` - 31/31
tests pass.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../core/workflow/WorkflowCoreTypesSpec.scala | 340 +++++++++++++++++++++
1 file changed, 340 insertions(+)
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala
new file mode 100644
index 0000000000..11f73013bf
--- /dev/null
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/WorkflowCoreTypesSpec.scala
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.core.workflow
+
+import org.apache.texera.amber.core.executor.OpExecInitInfo
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema}
+import org.apache.texera.amber.core.virtualidentity.{
+ ExecutionIdentity,
+ OperatorIdentity,
+ PhysicalOpIdentity,
+ WorkflowIdentity
+}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class WorkflowCoreTypesSpec extends AnyFlatSpec {
+
+ //
---------------------------------------------------------------------------
+ // LocationPreference
+ //
---------------------------------------------------------------------------
+
+ "LocationPreference" should "have PreferController and RoundRobinPreference
as singleton subtypes" in {
+ val a: LocationPreference = PreferController
+ val b: LocationPreference = RoundRobinPreference
+ assert(a eq PreferController)
+ assert(b eq RoundRobinPreference)
+ assert(a != b)
+ }
+
+ it should "be Serializable on every subtype" in {
+ val all: Seq[LocationPreference] = Seq(PreferController,
RoundRobinPreference)
+ all.foreach(p => assert(p.isInstanceOf[Serializable]))
+ }
+
+ //
---------------------------------------------------------------------------
+ // WorkflowSettings
+ //
---------------------------------------------------------------------------
+
+ "WorkflowSettings" should "default dataTransferBatchSize to 400 and
outputPortsNeedingStorage to empty" in {
+ val s = WorkflowSettings()
+ assert(s.dataTransferBatchSize == 400)
+ assert(s.outputPortsNeedingStorage.isEmpty)
+ }
+
+ it should "carry custom values constructed via case-class apply" in {
+ val portId = GlobalPortIdentity(
+ PhysicalOpIdentity(OperatorIdentity("op"), "main"),
+ PortIdentity(0),
+ input = false
+ )
+ val s = WorkflowSettings(
+ dataTransferBatchSize = 50,
+ outputPortsNeedingStorage = Set(portId)
+ )
+ assert(s.dataTransferBatchSize == 50)
+ assert(s.outputPortsNeedingStorage == Set(portId))
+ }
+
+ //
---------------------------------------------------------------------------
+ // WorkflowContext
+ //
---------------------------------------------------------------------------
+
+ "WorkflowContext" should "default to DEFAULT_WORKFLOW_ID,
DEFAULT_EXECUTION_ID, and DEFAULT_WORKFLOW_SETTINGS" in {
+ val ctx = new WorkflowContext()
+ assert(ctx.workflowId == WorkflowContext.DEFAULT_WORKFLOW_ID)
+ assert(ctx.executionId == WorkflowContext.DEFAULT_EXECUTION_ID)
+ assert(ctx.workflowSettings == WorkflowContext.DEFAULT_WORKFLOW_SETTINGS)
+ }
+
+ it should "expose the documented default constants" in {
+ assert(WorkflowContext.DEFAULT_WORKFLOW_ID == WorkflowIdentity(1L))
+ assert(WorkflowContext.DEFAULT_EXECUTION_ID == ExecutionIdentity(1L))
+ }
+
+ it should "allow workflowId / executionId / workflowSettings to be
reassigned" in {
+ val ctx = new WorkflowContext()
+ ctx.workflowId = WorkflowIdentity(7L)
+ ctx.executionId = ExecutionIdentity(11L)
+ val custom = WorkflowSettings(dataTransferBatchSize = 1)
+ ctx.workflowSettings = custom
+ assert(ctx.workflowId == WorkflowIdentity(7L))
+ assert(ctx.executionId == ExecutionIdentity(11L))
+ assert(ctx.workflowSettings eq custom)
+ }
+
+ //
---------------------------------------------------------------------------
+ // PhysicalOp helpers
+ //
---------------------------------------------------------------------------
+
+ private val workflowId = WorkflowIdentity(0L)
+ private val executionId = ExecutionIdentity(0L)
+ private def opId(name: String): PhysicalOpIdentity =
+ PhysicalOpIdentity(OperatorIdentity(name), "main")
+ private val intSchema: Schema = Schema().add(new Attribute("v",
AttributeType.INTEGER))
+
+ private def newPhysicalOp(name: String, parallelizable: Boolean = true):
PhysicalOp =
+ PhysicalOp
+ .oneToOnePhysicalOp(
+ opId(name),
+ workflowId,
+ executionId,
+ OpExecInitInfo.Empty
+ )
+ .copy(parallelizable = parallelizable)
+
+ "PhysicalOp.isSourceOperator" should "be true when there are no input ports"
in {
+ val op = newPhysicalOp("a")
+ assert(op.inputPorts.isEmpty)
+ assert(op.isSourceOperator)
+ }
+
+ it should "be false once an input port is added" in {
+ val op =
newPhysicalOp("a").withInputPorts(List(InputPort(PortIdentity(0))))
+ assert(!op.isSourceOperator)
+ }
+
+ "PhysicalOp.withLocationPreference" should "store the location preference"
in {
+ val op = newPhysicalOp("a").withLocationPreference(Some(PreferController))
+ assert(op.locationPreference.contains(PreferController))
+ }
+
+ "PhysicalOp.withParallelizable" should "set the parallelizable flag and
round-trip through copy" in {
+ val op = newPhysicalOp("a", parallelizable = true)
+ val flipped = op.withParallelizable(false)
+ assert(!flipped.parallelizable)
+ assert(op.parallelizable, "the original instance is immutable")
+ }
+
+ "PhysicalOp.withSuggestedWorkerNum" should "set the suggested worker count"
in {
+ val op = newPhysicalOp("a").withSuggestedWorkerNum(7)
+ assert(op.suggestedWorkerNum.contains(7))
+ }
+
+ "PhysicalOp.addInputLink" should "append the link to the matching input
port" in {
+ val op =
newPhysicalOp("a").withInputPorts(List(InputPort(PortIdentity(0))))
+ val link = PhysicalLink(opId("up"), PortIdentity(0), opId("a"),
PortIdentity(0))
+ val updated = op.addInputLink(link)
+ assert(updated.getInputLinks(Some(PortIdentity(0))) == List(link))
+ assert(updated.getInputLinks() == List(link))
+ }
+
+ it should "fail the assertion when the link does not target this op id" in {
+ val op =
newPhysicalOp("a").withInputPorts(List(InputPort(PortIdentity(0))))
+ val mismatched = PhysicalLink(opId("up"), PortIdentity(0), opId("other"),
PortIdentity(0))
+ assertThrows[AssertionError] {
+ op.addInputLink(mismatched)
+ }
+ }
+
+ it should "fail the assertion when the target port is not declared" in {
+ val op =
newPhysicalOp("a").withInputPorts(List(InputPort(PortIdentity(0))))
+ val unknownPort = PhysicalLink(opId("up"), PortIdentity(0), opId("a"),
PortIdentity(99))
+ assertThrows[AssertionError] {
+ op.addInputLink(unknownPort)
+ }
+ }
+
+ "PhysicalOp.addOutputLink" should "append the link to the matching output
port" in {
+ val op =
newPhysicalOp("a").withOutputPorts(List(OutputPort(PortIdentity(0))))
+ val link = PhysicalLink(opId("a"), PortIdentity(0), opId("dn"),
PortIdentity(0))
+ val updated = op.addOutputLink(link)
+ assert(updated.getOutputLinks(PortIdentity(0)) == List(link))
+ }
+
+ "PhysicalOp.removeInputLink" should "drop the matching link, leaving others
intact" in {
+ val op =
newPhysicalOp("a").withInputPorts(List(InputPort(PortIdentity(0))))
+ val l1 = PhysicalLink(opId("u1"), PortIdentity(0), opId("a"),
PortIdentity(0))
+ val l2 = PhysicalLink(opId("u2"), PortIdentity(0), opId("a"),
PortIdentity(0))
+ val updated = op.addInputLink(l1).addInputLink(l2).removeInputLink(l1)
+ assert(updated.getInputLinks() == List(l2))
+ }
+
+ "PhysicalOp.removeOutputLink" should "drop the matching link, leaving others
intact" in {
+ val op =
newPhysicalOp("a").withOutputPorts(List(OutputPort(PortIdentity(0))))
+ val l1 = PhysicalLink(opId("a"), PortIdentity(0), opId("d1"),
PortIdentity(0))
+ val l2 = PhysicalLink(opId("a"), PortIdentity(0), opId("d2"),
PortIdentity(0))
+ val updated = op.addOutputLink(l1).addOutputLink(l2).removeOutputLink(l1)
+ assert(updated.getOutputLinks(PortIdentity(0)) == List(l2))
+ }
+
+ "PhysicalOp.propagateSchema" should "fill in output schemas once every input
schema is known" in {
+ val out = OutputPort(PortIdentity(0))
+ val in = InputPort(PortIdentity(0))
+ val op = newPhysicalOp("a")
+ .withInputPorts(List(in))
+ .withOutputPorts(List(out))
+ .withPropagateSchema(SchemaPropagationFunc(inputs => Map(out.id ->
inputs(in.id))))
+ val updated = op.propagateSchema(Some((in.id, intSchema)))
+ val outSchema = updated.outputPorts(out.id)._3
+ assert(outSchema.toOption.contains(intSchema))
+ }
+
+ it should "raise IllegalArgumentException when a conflicting schema arrives
on an already-known port" in {
+ val out = OutputPort(PortIdentity(0))
+ val in = InputPort(PortIdentity(0))
+ val op = newPhysicalOp("a")
+ .withInputPorts(List(in))
+ .withOutputPorts(List(out))
+ .withPropagateSchema(SchemaPropagationFunc(inputs => Map(out.id ->
inputs(in.id))))
+ .propagateSchema(Some((in.id, intSchema)))
+ val different = Schema().add(new Attribute("w", AttributeType.STRING))
+ assertThrows[IllegalArgumentException] {
+ op.propagateSchema(Some((in.id, different)))
+ }
+ }
+
+ it should "leave output schemas as a Left when the propagation function
throws" in {
+ val out = OutputPort(PortIdentity(0))
+ val in = InputPort(PortIdentity(0))
+ val op = newPhysicalOp("a")
+ .withInputPorts(List(in))
+ .withOutputPorts(List(out))
+ .withPropagateSchema(SchemaPropagationFunc(_ => throw new
RuntimeException("boom")))
+ val updated = op.propagateSchema(Some((in.id, intSchema)))
+ assert(updated.outputPorts(out.id)._3.isLeft)
+ }
+
+ "PhysicalOp.isOutputLinkBlocking" should "reflect the configured blocking
flag on the source port" in {
+ val opBlocking =
+ newPhysicalOp("a").withOutputPorts(List(OutputPort(PortIdentity(0),
blocking = true)))
+ val opOpen =
+ newPhysicalOp("b").withOutputPorts(List(OutputPort(PortIdentity(0),
blocking = false)))
+ // Each link's `fromOpId` is set to the operator under test, so the test
+ // remains correct if `isOutputLinkBlocking` is later tightened to
+ // validate `fromOpId == this.id`.
+ val blockingLink =
+ PhysicalLink(opId("a"), PortIdentity(0), opId("downstream"),
PortIdentity(0))
+ val openLink =
+ PhysicalLink(opId("b"), PortIdentity(0), opId("downstream"),
PortIdentity(0))
+ assert(opBlocking.isOutputLinkBlocking(blockingLink))
+ assert(!opOpen.isOutputLinkBlocking(openLink))
+ }
+
+ //
---------------------------------------------------------------------------
+ // PhysicalPlan
+ //
---------------------------------------------------------------------------
+
+ private def physicalOp(name: String): PhysicalOp =
+ newPhysicalOp(name)
+ .withInputPorts(List(InputPort(PortIdentity(0))))
+ .withOutputPorts(List(OutputPort(PortIdentity(0))))
+
+ private def link(from: String, to: String): PhysicalLink =
+ PhysicalLink(opId(from), PortIdentity(0), opId(to), PortIdentity(0))
+
+ "PhysicalPlan.getOperator" should "look up by physical id" in {
+ val a = physicalOp("a")
+ val b = physicalOp("b")
+ val plan = PhysicalPlan(Set(a, b), Set.empty)
+ assert(plan.getOperator(a.id) == a)
+ }
+
+ "PhysicalPlan.getSourceOperatorIds" should "return operators with no
incoming links in the DAG" in {
+ val a = physicalOp("a")
+ val b = physicalOp("b")
+ val c = physicalOp("c")
+ val plan = PhysicalPlan(Set(a, b, c), Set(link("a", "b"), link("b", "c")))
+ assert(plan.getSourceOperatorIds == Set(a.id))
+ }
+
+ it should "return all operators when there are no links" in {
+ val a = physicalOp("a")
+ val b = physicalOp("b")
+ val plan = PhysicalPlan(Set(a, b), Set.empty)
+ assert(plan.getSourceOperatorIds == Set(a.id, b.id))
+ }
+
+ "PhysicalPlan.topologicalIterator" should "produce a topological ordering
across the DAG" in {
+ val a = physicalOp("a")
+ val b = physicalOp("b")
+ val c = physicalOp("c")
+ val plan = PhysicalPlan(Set(a, b, c), Set(link("a", "b"), link("b", "c")))
+ assert(plan.topologicalIterator().toList == List(a.id, b.id, c.id))
+ }
+
+ "PhysicalPlan.getUpstreamPhysicalOpIds" should "return direct predecessors
only" in {
+ val a = physicalOp("a")
+ val b = physicalOp("b")
+ val c = physicalOp("c")
+ val plan = PhysicalPlan(Set(a, b, c), Set(link("a", "b"), link("a", "c"),
link("b", "c")))
+ assert(plan.getUpstreamPhysicalOpIds(c.id) == Set(a.id, b.id))
+ assert(plan.getUpstreamPhysicalOpIds(a.id).isEmpty)
+ }
+
+ "PhysicalPlan.getUpstreamPhysicalLinks" should "return only links targeting
the operator" in {
+ val a = physicalOp("a")
+ val b = physicalOp("b")
+ val c = physicalOp("c")
+ val l1 = link("a", "c")
+ val l2 = link("b", "c")
+ val plan = PhysicalPlan(Set(a, b, c), Set(l1, l2))
+ assert(plan.getUpstreamPhysicalLinks(c.id) == Set(l1, l2))
+ assert(plan.getUpstreamPhysicalLinks(a.id).isEmpty)
+ }
+
+ "PhysicalPlan.getDownstreamPhysicalLinks" should "return only links sourcing
from the operator" in {
+ val a = physicalOp("a")
+ val b = physicalOp("b")
+ val c = physicalOp("c")
+ val l1 = link("a", "b")
+ val l2 = link("a", "c")
+ val plan = PhysicalPlan(Set(a, b, c), Set(l1, l2))
+ assert(plan.getDownstreamPhysicalLinks(a.id) == Set(l1, l2))
+ assert(plan.getDownstreamPhysicalLinks(c.id).isEmpty)
+ }
+
+ "PhysicalPlan.getSubPlan" should "include only the requested operators and
the links between them" in {
+ val a = physicalOp("a")
+ val b = physicalOp("b")
+ val c = physicalOp("c")
+ val plan = PhysicalPlan(Set(a, b, c), Set(link("a", "b"), link("b", "c"),
link("a", "c")))
+ val sub = plan.getSubPlan(Set(a.id, b.id))
+ assert(sub.operators.map(_.id) == Set(a.id, b.id))
+ assert(sub.links == Set(link("a", "b")))
+ }
+
+ "PhysicalPlan.getPhysicalOpsOfLogicalOp" should "return every physical op
sharing a logical id, in topological order" in {
+ val a = physicalOp("a")
+ val b = physicalOp("b")
+ val plan = PhysicalPlan(Set(a, b), Set(link("a", "b")))
+ val onlyB = plan.getPhysicalOpsOfLogicalOp(OperatorIdentity("b"))
+ assert(onlyB.map(_.id) == List(b.id))
+ }
+}