This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new f624949ece test(amber): add unit tests for 
ChannelConfig.generateChannelConfigs (#4800)
f624949ece is described below

commit f624949ece260e974385ad9d19d3009403affb46
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 09:59:00 2026 -0700

    test(amber): add unit tests for ChannelConfig.generateChannelConfigs (#4800)
    
    ### What changes were proposed in this PR?
    
    Adds scalatest coverage for
    
`amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/ChannelConfig.scala`.
    The `generateChannelConfigs` partition-routing helper had no dedicated
    spec.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4798.
    
    Bug pinned in the spec with explanatory comment (filed separately as a
    Bug issue): the `OneToOnePartition` arm uses `List.zip`, which silently
    truncates to the shorter input. Other arms either compute the full cross
    product or assert exact lengths (`SinglePartition` requires
    `toWorkerIds.size == 1`); only `OneToOne` drops surplus pairings without
    warning.
    
    ### How was this PR tested?
    
    ```
    sbt scalafmtCheckAll
    sbt "WorkflowExecutionService/testOnly 
org.apache.texera.amber.engine.architecture.scheduling.config.ChannelConfigSpec"
    ```
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-7)
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../scheduling/config/ChannelConfigSpec.scala      | 212 +++++++++++++++++++++
 1 file changed, 212 insertions(+)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/ChannelConfigSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/ChannelConfigSpec.scala
new file mode 100644
index 0000000000..be5a2d064c
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/ChannelConfigSpec.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.scheduling.config
+
+import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
+import org.apache.texera.amber.core.workflow.{
+  BroadcastPartition,
+  HashPartition,
+  OneToOnePartition,
+  PortIdentity,
+  RangePartition,
+  SinglePartition,
+  UnknownPartition
+}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class ChannelConfigSpec extends AnyFlatSpec with Matchers {
+
+  private val port: PortIdentity = PortIdentity(id = 0, internal = false)
+
+  private def actor(name: String): ActorVirtualIdentity = 
ActorVirtualIdentity(name)
+
+  private val w1 = actor("w1")
+  private val w2 = actor("w2")
+  private val w3 = actor("w3")
+  private val u1 = actor("u1")
+  private val u2 = actor("u2")
+  private val u3 = actor("u3")
+
+  // Helper: extract the (sender, receiver) endpoint pairs from a list of
+  // ChannelConfigs to make the assertions readable.
+  private def endpoints(cs: List[ChannelConfig]): List[(String, String)] =
+    cs.map(c => (c.channelId.fromWorkerId.name, c.channelId.toWorkerId.name))
+
+  // ----- cross-product partition arms -----
+
+  "generateChannelConfigs" should "produce the full from*to cross product for 
HashPartition" in {
+    val out = ChannelConfig.generateChannelConfigs(
+      List(w1, w2),
+      List(u1, u2, u3),
+      port,
+      HashPartition()
+    )
+    endpoints(out) shouldBe List(
+      ("w1", "u1"),
+      ("w1", "u2"),
+      ("w1", "u3"),
+      ("w2", "u1"),
+      ("w2", "u2"),
+      ("w2", "u3")
+    )
+    out.foreach(_.channelId.isControl shouldBe false)
+    out.foreach(_.toPortId shouldBe port)
+  }
+
+  it should "produce the full from*to cross product for BroadcastPartition" in 
{
+    val out = ChannelConfig.generateChannelConfigs(
+      List(w1, w2),
+      List(u1, u2),
+      port,
+      BroadcastPartition()
+    )
+    endpoints(out) shouldBe List(("w1", "u1"), ("w1", "u2"), ("w2", "u1"), 
("w2", "u2"))
+  }
+
+  it should "produce the full from*to cross product for RangePartition" in {
+    val out = ChannelConfig.generateChannelConfigs(
+      List(w1),
+      List(u1, u2),
+      port,
+      RangePartition(List("k"), 0L, 100L)
+    )
+    endpoints(out) shouldBe List(("w1", "u1"), ("w1", "u2"))
+  }
+
+  it should "produce the full from*to cross product for UnknownPartition" in {
+    val out = ChannelConfig.generateChannelConfigs(
+      List(w1, w2),
+      List(u1),
+      port,
+      UnknownPartition()
+    )
+    endpoints(out) shouldBe List(("w1", "u1"), ("w2", "u1"))
+  }
+
+  // ----- SinglePartition arm -----
+
+  "SinglePartition" should "produce one channel per from-worker to the single 
to-worker" in {
+    val out = ChannelConfig.generateChannelConfigs(
+      List(w1, w2, w3),
+      List(u1),
+      port,
+      SinglePartition()
+    )
+    endpoints(out) shouldBe List(("w1", "u1"), ("w2", "u1"), ("w3", "u1"))
+  }
+
+  it should "raise an AssertionError when more than one to-worker is supplied" 
in {
+    // Pin: SinglePartition is only valid when collapsing onto exactly one
+    // downstream worker; passing more violates the assertion in the source.
+    assertThrows[AssertionError] {
+      ChannelConfig.generateChannelConfigs(
+        List(w1, w2),
+        List(u1, u2),
+        port,
+        SinglePartition()
+      )
+    }
+  }
+
+  // ----- OneToOnePartition arm -----
+
+  "OneToOnePartition" should "zip equal-length from and to lists pairwise" in {
+    val out = ChannelConfig.generateChannelConfigs(
+      List(w1, w2, w3),
+      List(u1, u2, u3),
+      port,
+      OneToOnePartition()
+    )
+    endpoints(out) shouldBe List(("w1", "u1"), ("w2", "u2"), ("w3", "u3"))
+  }
+
+  it should "truncate to the shorter list when from and to lengths differ 
(current behavior)" in {
+    // Pin: Scala List.zip drops the tail of the longer side. Callers are
+    // expected to enforce equal lengths upstream; an asymmetric input here
+    // silently loses pairings rather than raising. Documenting so a future
+    // tightening (e.g. require/asserting equal lengths) breaks this spec
+    // on purpose and forces the contract change to be reviewed.
+    val out = ChannelConfig.generateChannelConfigs(
+      List(w1, w2, w3),
+      List(u1, u2),
+      port,
+      OneToOnePartition()
+    )
+    endpoints(out) shouldBe List(("w1", "u1"), ("w2", "u2"))
+
+    val out2 = ChannelConfig.generateChannelConfigs(
+      List(w1),
+      List(u1, u2, u3),
+      port,
+      OneToOnePartition()
+    )
+    endpoints(out2) shouldBe List(("w1", "u1"))
+  }
+
+  // ----- empty inputs -----
+
+  // The previous block ended with `"OneToOnePartition" should ...`, so switch
+  // back to `generateChannelConfigs` here. Otherwise the empty-input cases
+  // (which exercise Hash/Broadcast arms too) and the toPortId test below
+  // would be reported as `"OneToOnePartition" should ...`.
+  "generateChannelConfigs" should "return an empty list when fromWorkerIds is 
empty (cross-product arm)" in {
+    val out = ChannelConfig.generateChannelConfigs(
+      Nil,
+      List(u1, u2),
+      port,
+      HashPartition()
+    )
+    out shouldBe empty
+  }
+
+  it should "return an empty list when toWorkerIds is empty (cross-product 
arm)" in {
+    val out = ChannelConfig.generateChannelConfigs(
+      List(w1, w2),
+      Nil,
+      port,
+      HashPartition()
+    )
+    out shouldBe empty
+  }
+
+  it should "return an empty list when both inputs are empty (OneToOne)" in {
+    val out = ChannelConfig.generateChannelConfigs(
+      Nil,
+      Nil,
+      port,
+      OneToOnePartition()
+    )
+    out shouldBe empty
+  }
+
+  // ----- toPortId propagation -----
+
+  it should "propagate the same toPortId onto every produced ChannelConfig" in 
{
+    val customPort = PortIdentity(id = 7, internal = true)
+    val out = ChannelConfig.generateChannelConfigs(
+      List(w1, w2),
+      List(u1, u2),
+      customPort,
+      BroadcastPartition()
+    )
+    out.foreach(_.toPortId shouldBe customPort)
+  }
+}

Reply via email to