This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new f624949ece test(amber): add unit tests for
ChannelConfig.generateChannelConfigs (#4800)
f624949ece is described below
commit f624949ece260e974385ad9d19d3009403affb46
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 09:59:00 2026 -0700
test(amber): add unit tests for ChannelConfig.generateChannelConfigs (#4800)
### What changes were proposed in this PR?
Adds scalatest coverage for
`amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/ChannelConfig.scala`.
The `generateChannelConfigs` partition-routing helper had no dedicated
spec.
### Any related issues, documentation, discussions?
Closes #4798.
Bug pinned in the spec with explanatory comment (filed separately as a
Bug issue): the `OneToOnePartition` arm uses `List.zip`, which silently
truncates to the shorter input. Other arms either compute the full cross
product or assert exact lengths (`SinglePartition` requires
`toWorkerIds.size == 1`); only `OneToOne` drops surplus pairings without
warning.
### How was this PR tested?
```
sbt scalafmtCheckAll
sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.scheduling.config.ChannelConfigSpec"
```
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../scheduling/config/ChannelConfigSpec.scala | 212 +++++++++++++++++++++
1 file changed, 212 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/ChannelConfigSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/ChannelConfigSpec.scala
new file mode 100644
index 0000000000..be5a2d064c
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/ChannelConfigSpec.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.scheduling.config
+
+import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
+import org.apache.texera.amber.core.workflow.{
+ BroadcastPartition,
+ HashPartition,
+ OneToOnePartition,
+ PortIdentity,
+ RangePartition,
+ SinglePartition,
+ UnknownPartition
+}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class ChannelConfigSpec extends AnyFlatSpec with Matchers {
+
+ private val port: PortIdentity = PortIdentity(id = 0, internal = false)
+
+ private def actor(name: String): ActorVirtualIdentity =
ActorVirtualIdentity(name)
+
+ private val w1 = actor("w1")
+ private val w2 = actor("w2")
+ private val w3 = actor("w3")
+ private val u1 = actor("u1")
+ private val u2 = actor("u2")
+ private val u3 = actor("u3")
+
+ // Helper: extract the (sender, receiver) endpoint pairs from a list of
+ // ChannelConfigs to make the assertions readable.
+ private def endpoints(cs: List[ChannelConfig]): List[(String, String)] =
+ cs.map(c => (c.channelId.fromWorkerId.name, c.channelId.toWorkerId.name))
+
+ // ----- cross-product partition arms -----
+
+ "generateChannelConfigs" should "produce the full from*to cross product for
HashPartition" in {
+ val out = ChannelConfig.generateChannelConfigs(
+ List(w1, w2),
+ List(u1, u2, u3),
+ port,
+ HashPartition()
+ )
+ endpoints(out) shouldBe List(
+ ("w1", "u1"),
+ ("w1", "u2"),
+ ("w1", "u3"),
+ ("w2", "u1"),
+ ("w2", "u2"),
+ ("w2", "u3")
+ )
+ out.foreach(_.channelId.isControl shouldBe false)
+ out.foreach(_.toPortId shouldBe port)
+ }
+
+ it should "produce the full from*to cross product for BroadcastPartition" in
{
+ val out = ChannelConfig.generateChannelConfigs(
+ List(w1, w2),
+ List(u1, u2),
+ port,
+ BroadcastPartition()
+ )
+ endpoints(out) shouldBe List(("w1", "u1"), ("w1", "u2"), ("w2", "u1"),
("w2", "u2"))
+ }
+
+ it should "produce the full from*to cross product for RangePartition" in {
+ val out = ChannelConfig.generateChannelConfigs(
+ List(w1),
+ List(u1, u2),
+ port,
+ RangePartition(List("k"), 0L, 100L)
+ )
+ endpoints(out) shouldBe List(("w1", "u1"), ("w1", "u2"))
+ }
+
+ it should "produce the full from*to cross product for UnknownPartition" in {
+ val out = ChannelConfig.generateChannelConfigs(
+ List(w1, w2),
+ List(u1),
+ port,
+ UnknownPartition()
+ )
+ endpoints(out) shouldBe List(("w1", "u1"), ("w2", "u1"))
+ }
+
+ // ----- SinglePartition arm -----
+
+ "SinglePartition" should "produce one channel per from-worker to the single
to-worker" in {
+ val out = ChannelConfig.generateChannelConfigs(
+ List(w1, w2, w3),
+ List(u1),
+ port,
+ SinglePartition()
+ )
+ endpoints(out) shouldBe List(("w1", "u1"), ("w2", "u1"), ("w3", "u1"))
+ }
+
+ it should "raise an AssertionError when more than one to-worker is supplied"
in {
+ // Pin: SinglePartition is only valid when collapsing onto exactly one
+ // downstream worker; passing more violates the assertion in the source.
+ assertThrows[AssertionError] {
+ ChannelConfig.generateChannelConfigs(
+ List(w1, w2),
+ List(u1, u2),
+ port,
+ SinglePartition()
+ )
+ }
+ }
+
+ // ----- OneToOnePartition arm -----
+
+ "OneToOnePartition" should "zip equal-length from and to lists pairwise" in {
+ val out = ChannelConfig.generateChannelConfigs(
+ List(w1, w2, w3),
+ List(u1, u2, u3),
+ port,
+ OneToOnePartition()
+ )
+ endpoints(out) shouldBe List(("w1", "u1"), ("w2", "u2"), ("w3", "u3"))
+ }
+
+ it should "truncate to the shorter list when from and to lengths differ
(current behavior)" in {
+ // Pin: Scala List.zip drops the tail of the longer side. Callers are
+ // expected to enforce equal lengths upstream; an asymmetric input here
+ // silently loses pairings rather than raising. Documenting so a future
+ // tightening (e.g. require/asserting equal lengths) breaks this spec
+ // on purpose and forces the contract change to be reviewed.
+ val out = ChannelConfig.generateChannelConfigs(
+ List(w1, w2, w3),
+ List(u1, u2),
+ port,
+ OneToOnePartition()
+ )
+ endpoints(out) shouldBe List(("w1", "u1"), ("w2", "u2"))
+
+ val out2 = ChannelConfig.generateChannelConfigs(
+ List(w1),
+ List(u1, u2, u3),
+ port,
+ OneToOnePartition()
+ )
+ endpoints(out2) shouldBe List(("w1", "u1"))
+ }
+
+ // ----- empty inputs -----
+
+ // The previous block ended with `"OneToOnePartition" should ...`, so switch
+ // back to `generateChannelConfigs` here. Otherwise the empty-input cases
+ // (which exercise Hash/Broadcast arms too) and the toPortId test below
+ // would be reported as `"OneToOnePartition" should ...`.
+ "generateChannelConfigs" should "return an empty list when fromWorkerIds is
empty (cross-product arm)" in {
+ val out = ChannelConfig.generateChannelConfigs(
+ Nil,
+ List(u1, u2),
+ port,
+ HashPartition()
+ )
+ out shouldBe empty
+ }
+
+ it should "return an empty list when toWorkerIds is empty (cross-product
arm)" in {
+ val out = ChannelConfig.generateChannelConfigs(
+ List(w1, w2),
+ Nil,
+ port,
+ HashPartition()
+ )
+ out shouldBe empty
+ }
+
+ it should "return an empty list when both inputs are empty (OneToOne)" in {
+ val out = ChannelConfig.generateChannelConfigs(
+ Nil,
+ Nil,
+ port,
+ OneToOnePartition()
+ )
+ out shouldBe empty
+ }
+
+ // ----- toPortId propagation -----
+
+ it should "propagate the same toPortId onto every produced ChannelConfig" in
{
+ val customPort = PortIdentity(id = 7, internal = true)
+ val out = ChannelConfig.generateChannelConfigs(
+ List(w1, w2),
+ List(u1, u2),
+ customPort,
+ BroadcastPartition()
+ )
+ out.foreach(_.toPortId shouldBe customPort)
+ }
+}