This is an automated email from the ASF dual-hosted git repository.
kunwp1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 30b666eacd fix: assert equal-length worker lists in OneToOne
ChannelConfig (#5031)
30b666eacd is described below
commit 30b666eacd1b2738ee965b2384d253927243683e
Author: Matthew B. <[email protected]>
AuthorDate: Wed May 13 13:34:53 2026 -0700
fix: assert equal-length worker lists in OneToOne ChannelConfig (#5031)
### What changes were proposed in this PR?
`ChannelConfig.generateChannelConfigs` previously used
`fromWorkerIds.zip(toWorkerIds)` for the `OneToOnePartition` arm, which
silently truncated to the shorter side and dropped surplus workers even
though the partition's
contract is a strict 1:1 pairing. This PR adds an
`assert(fromWorkerIds.size == toWorkerIds.size, ...)` precondition
(mirroring the `SinglePartition` arm in the same file) so mismatched
inputs fail loudly with a message that
names both sizes.
### Any related issues, documentation, or discussions?
Closes: #4799
### How was this PR tested?
The existing `ChannelConfigSpec` case that pinned the truncation
behavior is flipped to assert an `AssertionError` is raised in both
asymmetric directions (`from > to` and `to > from`). The equal-length,
empty-input, and other
partition arms are unchanged and still pass. Reproducer from the report
(`from=3, to=2`) now throws instead of returning a 2-element list.
### Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.7 in compliance with ASF
Co-authored-by: Kunwoo (Chris) <[email protected]>
---
.../scheduling/config/ChannelConfig.scala | 5 +++
.../scheduling/config/ChannelConfigSpec.scala | 41 +++++++++++-----------
2 files changed, 25 insertions(+), 21 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/ChannelConfig.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/ChannelConfig.scala
index 83d4ed985a..535a62736e 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/ChannelConfig.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/ChannelConfig.scala
@@ -44,6 +44,11 @@ case object ChannelConfig {
ChannelConfig(ChannelIdentity(fromWorkerId, toWorkerId, isControl =
false), toPortId)
)
case OneToOnePartition() =>
+ assert(
+ fromWorkerIds.size == toWorkerIds.size,
+ s"OneToOnePartition requires equal-length worker lists, " +
+ s"got from=${fromWorkerIds.size} to=${toWorkerIds.size}"
+ )
fromWorkerIds.zip(toWorkerIds).map {
case (fromWorkerId, toWorkerId) =>
ChannelConfig(ChannelIdentity(fromWorkerId, toWorkerId, isControl
= false), toPortId)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/ChannelConfigSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/ChannelConfigSpec.scala
index be5a2d064c..50f7b0fa3e 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/ChannelConfigSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/ChannelConfigSpec.scala
@@ -138,27 +138,26 @@ class ChannelConfigSpec extends AnyFlatSpec with Matchers
{
endpoints(out) shouldBe List(("w1", "u1"), ("w2", "u2"), ("w3", "u3"))
}
- it should "truncate to the shorter list when from and to lengths differ
(current behavior)" in {
- // Pin: Scala List.zip drops the tail of the longer side. Callers are
- // expected to enforce equal lengths upstream; an asymmetric input here
- // silently loses pairings rather than raising. Documenting so a future
- // tightening (e.g. require/asserting equal lengths) breaks this spec
- // on purpose and forces the contract change to be reviewed.
- val out = ChannelConfig.generateChannelConfigs(
- List(w1, w2, w3),
- List(u1, u2),
- port,
- OneToOnePartition()
- )
- endpoints(out) shouldBe List(("w1", "u1"), ("w2", "u2"))
-
- val out2 = ChannelConfig.generateChannelConfigs(
- List(w1),
- List(u1, u2, u3),
- port,
- OneToOnePartition()
- )
- endpoints(out2) shouldBe List(("w1", "u1"))
+ it should "raise an AssertionError when from and to lengths differ" in {
+ // OneToOnePartition contractually pairs each sender with exactly one
+ // receiver, so mismatched lengths must fail loudly rather than silently
+ // truncating to the shorter side (which is what `List.zip` would do).
+ assertThrows[AssertionError] {
+ ChannelConfig.generateChannelConfigs(
+ List(w1, w2, w3),
+ List(u1, u2),
+ port,
+ OneToOnePartition()
+ )
+ }
+ assertThrows[AssertionError] {
+ ChannelConfig.generateChannelConfigs(
+ List(w1),
+ List(u1, u2, u3),
+ port,
+ OneToOnePartition()
+ )
+ }
}
// ----- empty inputs -----