This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 5bff1b4e1f fix: align BroadcastPartition routing in LinkConfig with
broadcast semantics (#5032)
5bff1b4e1f is described below
commit 5bff1b4e1f2bb4dddcf5febb19a8c00cafaef0e0
Author: Matthew B. <[email protected]>
AuthorDate: Tue May 12 19:53:31 2026 -0700
fix: align BroadcastPartition routing in LinkConfig with broadcast
semantics (#5032)
---
.../scheduling/config/LinkConfig.scala | 7 +++--
.../scheduling/config/LinkConfigSpec.scala | 31 +++++++++++++++-------
2 files changed, 24 insertions(+), 14 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfig.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfig.scala
index ef0117834c..ef92afef59 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfig.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfig.scala
@@ -72,10 +72,9 @@ case object LinkConfig {
case BroadcastPartition() =>
BroadcastPartitioning(
dataTransferBatchSize,
- fromWorkerIds.zip(toWorkerIds).map {
- case (fromWorkerId, toWorkerId) =>
- ChannelIdentity(fromWorkerId, toWorkerId, isControl = false)
- }
+ fromWorkerIds.flatMap(fromId =>
+ toWorkerIds.map(toId => ChannelIdentity(fromId, toId, isControl =
false))
+ )
)
case UnknownPartition() =>
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfigSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfigSpec.scala
index 8a5f2b8652..7dd8a3de08 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfigSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfigSpec.scala
@@ -144,13 +144,7 @@ class LinkConfigSpec extends AnyFlatSpec with Matchers {
// ----- BroadcastPartition -----
- "BroadcastPartition" should "produce a BroadcastPartitioning whose channels
follow zip pairing today (current behavior)" in {
- // Pin: BroadcastPartition currently uses `fromWorkerIds.zip(toWorkerIds)`
- // — the SAME 1:1 pairing as OneToOnePartition. ChannelConfig in the same
- // package emits a full cross product for the BroadcastPartition arm,
- // which matches broadcast semantics ("each sender targets every
- // receiver"). The two helpers diverge today; pinning this so a fix that
- // realigns the contract surfaces here. Filed as a Bug.
+ "BroadcastPartition" should "produce a BroadcastPartitioning with the full
sender x receiver cross product" in {
val out = LinkConfig.toPartitioning(
List(w1, w2, w3),
List(u1, u2, u3),
@@ -160,10 +154,20 @@ class LinkConfigSpec extends AnyFlatSpec with Matchers {
out shouldBe a[BroadcastPartitioning]
val bp = out.asInstanceOf[BroadcastPartitioning]
bp.batchSize shouldBe batch
- endpoints(bp.channels) shouldBe Seq(("w1", "u1"), ("w2", "u2"), ("w3",
"u3"))
+ endpoints(bp.channels) shouldBe Seq(
+ ("w1", "u1"),
+ ("w1", "u2"),
+ ("w1", "u3"),
+ ("w2", "u1"),
+ ("w2", "u2"),
+ ("w2", "u3"),
+ ("w3", "u1"),
+ ("w3", "u2"),
+ ("w3", "u3")
+ )
}
- it should "silently truncate broadcast pairings when sides differ in length
(current behavior)" in {
+ it should "emit the full cross product even when sender and receiver counts
differ" in {
val out = LinkConfig.toPartitioning(
List(w1, w2, w3),
List(u1, u2),
@@ -171,7 +175,14 @@ class LinkConfigSpec extends AnyFlatSpec with Matchers {
batch
)
val bp = out.asInstanceOf[BroadcastPartitioning]
- endpoints(bp.channels) shouldBe Seq(("w1", "u1"), ("w2", "u2"))
+ endpoints(bp.channels) shouldBe Seq(
+ ("w1", "u1"),
+ ("w1", "u2"),
+ ("w2", "u1"),
+ ("w2", "u2"),
+ ("w3", "u1"),
+ ("w3", "u2")
+ )
}
// ----- UnknownPartition -----