This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new a505475741 test(amber): add unit tests for LinkConfig.toPartitioning 
(#4803)
a505475741 is described below

commit a505475741c72444fd37f56ccb33d6c0c5653b77
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 10:48:16 2026 -0700

    test(amber): add unit tests for LinkConfig.toPartitioning (#4803)
    
    ### What changes were proposed in this PR?
    
    Adds scalatest coverage for
    
`amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfig.scala`.
    The `toPartitioning` partition-routing helper had no dedicated spec.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4801.
    
    Two related bugs are pinned in the spec with explanatory comments and
    filed separately as Bug issues: (1) the `OneToOne` arm uses `List.zip`
    and silently truncates on unequal-length input — the same hazard as the
    parallel `ChannelConfig` issue (#4799). (2) The `BroadcastPartition` arm
    uses `zip` instead of the cross product, both diverging from the
    broadcast contract and from `ChannelConfig`'s broadcast routing in the
    same package.
    
    ### How was this PR tested?
    
    ```
    sbt scalafmtCheckAll
    sbt "WorkflowExecutionService/testOnly 
org.apache.texera.amber.engine.architecture.scheduling.config.LinkConfigSpec"
    ```
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-7)
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../scheduling/config/LinkConfigSpec.scala         | 253 +++++++++++++++++++++
 1 file changed, 253 insertions(+)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfigSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfigSpec.scala
new file mode 100644
index 0000000000..8a5f2b8652
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfigSpec.scala
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.scheduling.config
+
+import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity}
+import org.apache.texera.amber.core.workflow.{
+  BroadcastPartition,
+  HashPartition,
+  OneToOnePartition,
+  RangePartition,
+  SinglePartition,
+  UnknownPartition
+}
+import 
org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.{
+  BroadcastPartitioning,
+  HashBasedShufflePartitioning,
+  OneToOnePartitioning,
+  RangeBasedShufflePartitioning,
+  RoundRobinPartitioning
+}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class LinkConfigSpec extends AnyFlatSpec with Matchers {
+
+  private val w1 = ActorVirtualIdentity("w1")
+  private val w2 = ActorVirtualIdentity("w2")
+  private val w3 = ActorVirtualIdentity("w3")
+  private val u1 = ActorVirtualIdentity("u1")
+  private val u2 = ActorVirtualIdentity("u2")
+  private val u3 = ActorVirtualIdentity("u3")
+  private val batch = 64
+
+  private def endpoints(channels: Seq[ChannelIdentity]): Seq[(String, String)] 
=
+    channels.map(c => (c.fromWorkerId.name, c.toWorkerId.name))
+
+  // ----- HashPartition -----
+
+  "toPartitioning" should "produce a HashBasedShufflePartitioning with full 
cross product channels" in {
+    val out = LinkConfig.toPartitioning(
+      List(w1, w2),
+      List(u1, u2, u3),
+      HashPartition(List("k1", "k2")),
+      batch
+    )
+    out shouldBe a[HashBasedShufflePartitioning]
+    val hp = out.asInstanceOf[HashBasedShufflePartitioning]
+    hp.batchSize shouldBe batch
+    hp.hashAttributeNames shouldBe Seq("k1", "k2")
+    endpoints(hp.channels) shouldBe Seq(
+      ("w1", "u1"),
+      ("w1", "u2"),
+      ("w1", "u3"),
+      ("w2", "u1"),
+      ("w2", "u2"),
+      ("w2", "u3")
+    )
+    hp.channels.foreach(_.isControl shouldBe false)
+  }
+
+  // ----- RangePartition -----
+
+  "RangePartition" should "produce a RangeBasedShufflePartitioning carrying 
the range bounds and cross-product channels" in {
+    val out = LinkConfig.toPartitioning(
+      List(w1),
+      List(u1, u2),
+      RangePartition(List("k"), 0L, 100L),
+      batch
+    )
+    out shouldBe a[RangeBasedShufflePartitioning]
+    val rp = out.asInstanceOf[RangeBasedShufflePartitioning]
+    rp.batchSize shouldBe batch
+    rp.rangeAttributeNames shouldBe Seq("k")
+    rp.rangeMin shouldBe 0L
+    rp.rangeMax shouldBe 100L
+    endpoints(rp.channels) shouldBe Seq(("w1", "u1"), ("w1", "u2"))
+  }
+
+  // ----- SinglePartition -----
+
+  "SinglePartition" should "produce a OneToOnePartitioning with one channel 
per from-worker to the single to-worker" in {
+    val out = LinkConfig.toPartitioning(
+      List(w1, w2, w3),
+      List(u1),
+      SinglePartition(),
+      batch
+    )
+    out shouldBe a[OneToOnePartitioning]
+    val op = out.asInstanceOf[OneToOnePartitioning]
+    op.batchSize shouldBe batch
+    endpoints(op.channels) shouldBe Seq(("w1", "u1"), ("w2", "u1"), ("w3", 
"u1"))
+  }
+
+  it should "raise an AssertionError when more than one to-worker is supplied" 
in {
+    assertThrows[AssertionError] {
+      LinkConfig.toPartitioning(List(w1, w2), List(u1, u2), SinglePartition(), 
batch)
+    }
+  }
+
+  // ----- OneToOnePartition -----
+
+  "OneToOnePartition" should "produce a OneToOnePartitioning with zip pairing 
for equal-length inputs" in {
+    val out = LinkConfig.toPartitioning(
+      List(w1, w2, w3),
+      List(u1, u2, u3),
+      OneToOnePartition(),
+      batch
+    )
+    out shouldBe a[OneToOnePartitioning]
+    val op = out.asInstanceOf[OneToOnePartitioning]
+    endpoints(op.channels) shouldBe Seq(("w1", "u1"), ("w2", "u2"), ("w3", 
"u3"))
+  }
+
+  it should "silently truncate when from and to lengths differ (current 
behavior)" in {
+    // Pin: same `List.zip` truncation hazard as ChannelConfig (Bug #4799).
+    // Documenting the parallel here so a fix that aligns the two helpers
+    // surfaces this spec at the same time.
+    val out = LinkConfig.toPartitioning(
+      List(w1, w2, w3),
+      List(u1, u2),
+      OneToOnePartition(),
+      batch
+    )
+    val op = out.asInstanceOf[OneToOnePartitioning]
+    endpoints(op.channels) shouldBe Seq(("w1", "u1"), ("w2", "u2"))
+  }
+
+  // ----- BroadcastPartition -----
+
+  "BroadcastPartition" should "produce a BroadcastPartitioning whose channels 
follow zip pairing today (current behavior)" in {
+    // Pin: BroadcastPartition currently uses `fromWorkerIds.zip(toWorkerIds)`
+    // — the SAME 1:1 pairing as OneToOnePartition. ChannelConfig in the same
+    // package emits a full cross product for the BroadcastPartition arm,
+    // which matches broadcast semantics ("each sender targets every
+    // receiver"). The two helpers diverge today; pinning this so a fix that
+    // realigns the contract surfaces here. Filed as a Bug.
+    val out = LinkConfig.toPartitioning(
+      List(w1, w2, w3),
+      List(u1, u2, u3),
+      BroadcastPartition(),
+      batch
+    )
+    out shouldBe a[BroadcastPartitioning]
+    val bp = out.asInstanceOf[BroadcastPartitioning]
+    bp.batchSize shouldBe batch
+    endpoints(bp.channels) shouldBe Seq(("w1", "u1"), ("w2", "u2"), ("w3", 
"u3"))
+  }
+
+  it should "silently truncate broadcast pairings when sides differ in length 
(current behavior)" in {
+    val out = LinkConfig.toPartitioning(
+      List(w1, w2, w3),
+      List(u1, u2),
+      BroadcastPartition(),
+      batch
+    )
+    val bp = out.asInstanceOf[BroadcastPartitioning]
+    endpoints(bp.channels) shouldBe Seq(("w1", "u1"), ("w2", "u2"))
+  }
+
+  // ----- UnknownPartition -----
+
+  "UnknownPartition" should "produce a RoundRobinPartitioning with the full 
cross product" in {
+    val out = LinkConfig.toPartitioning(
+      List(w1, w2),
+      List(u1, u2),
+      UnknownPartition(),
+      batch
+    )
+    out shouldBe a[RoundRobinPartitioning]
+    val rr = out.asInstanceOf[RoundRobinPartitioning]
+    rr.batchSize shouldBe batch
+    endpoints(rr.channels) shouldBe Seq(
+      ("w1", "u1"),
+      ("w1", "u2"),
+      ("w2", "u1"),
+      ("w2", "u2")
+    )
+  }
+
+  // ----- empty inputs -----
+
+  // The previous block ended with a `"UnknownPartition" should ...` subject.
+  // Switch back to "toPartitioning" so test reports for the empty-input,
+  // batch-propagation, and unsupported-branch cases below don't get
+  // misattributed to UnknownPartition.
+  "toPartitioning" should "return empty channels when fromWorkerIds is empty 
(cross-product arm)" in {
+    val out = LinkConfig.toPartitioning(
+      Nil,
+      List(u1, u2),
+      HashPartition(),
+      batch
+    )
+    out.asInstanceOf[HashBasedShufflePartitioning].channels shouldBe empty
+  }
+
+  it should "return empty channels when toWorkerIds is empty (cross-product 
arm)" in {
+    val out = LinkConfig.toPartitioning(
+      List(w1, w2),
+      Nil,
+      HashPartition(),
+      batch
+    )
+    out.asInstanceOf[HashBasedShufflePartitioning].channels shouldBe empty
+  }
+
+  // ----- batch size propagation -----
+
+  it should "propagate dataTransferBatchSize verbatim regardless of 
partitioning arm" in {
+    val customBatch = 1024
+    val out = LinkConfig.toPartitioning(
+      List(w1),
+      List(u1),
+      OneToOnePartition(),
+      customBatch
+    )
+    out.asInstanceOf[OneToOnePartitioning].batchSize shouldBe customBatch
+  }
+
+  // ----- unsupported branch -----
+
+  it should "throw UnsupportedOperationException when partitionInfo is 
unrecognized" in {
+    // PartitionInfo is sealed, so the only way to reach the catch-all
+    // `case _` branch from a test is to pass an off-domain value such as
+    // null. This pins the contract that an unknown PartitionInfo subtype
+    // results in UnsupportedOperationException rather than silently
+    // dropping into a default partitioning.
+    assertThrows[UnsupportedOperationException] {
+      LinkConfig.toPartitioning(
+        List(w1),
+        List(u1),
+        null,
+        batch
+      )
+    }
+  }
+}

Reply via email to