This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 1c2a8c533f test(amber): add unit test coverage for non-range
partitioners (#4746)
1c2a8c533f is described below
commit 1c2a8c533fdee922f93a0cfbb2c94f407df54faa
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun May 3 03:42:04 2026 -0700
test(amber): add unit test coverage for non-range partitioners (#4746)
### What changes were proposed in this PR?
Add `PartitionersSpec` covering the four send-semantics partitioners
that currently lack unit tests (only `RangeBasedShufflePartitioner` had
existing coverage in `RangeBasedShuffleSpec`):
- `OneToOnePartitioner` — `getBucketIndex` always emits `Iterator(0)`;
`allReceivers` selects the channel whose `fromWorkerId` matches the
actor id
- `BroadcastPartitioner` — `getBucketIndex` yields every receiver index
for any tuple; `allReceivers` is deduplicated
- `RoundRobinPartitioner` — `getBucketIndex` cycles bucket indices
(asserting the current contract that the first emitted index is 1, since
the implementation increments before emitting); `allReceivers` preserves
channel order while deduplicating
- `HashBasedShufflePartitioner` — `getBucketIndex` is in-range;
deterministic for the *same tuple instance* across consecutive calls;
depends only on the configured hash-attribute subset (swept across
multiple keys × multiple varying non-hash fields); falls back to the
full tuple when no hash attributes are configured (verified by spanning
>1 bucket across 50 samples that vary only the non-key field);
`allReceivers` deduplicates
### Any related issues, documentation, discussions?
Closes #4745
### How was this PR tested?
`sbt "WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.sendsemantics.partitioners.PartitionersSpec"`
— 11/11 tests pass.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../partitioners/PartitionersSpec.scala | 217 +++++++++++++++++++++
1 file changed, 217 insertions(+)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/sendsemantics/partitioners/PartitionersSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/sendsemantics/partitioners/PartitionersSpec.scala
new file mode 100644
index 0000000000..10d244b744
--- /dev/null
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/sendsemantics/partitioners/PartitionersSpec.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.sendsemantics.partitioners
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema,
Tuple}
+import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity,
ChannelIdentity}
+import
org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.{
+ BroadcastPartitioning,
+ HashBasedShufflePartitioning,
+ OneToOnePartitioning,
+ RoundRobinPartitioning
+}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class PartitionersSpec extends AnyFlatSpec {
+
+ private val sender: ActorVirtualIdentity = ActorVirtualIdentity("sender")
+ private val r1: ActorVirtualIdentity = ActorVirtualIdentity("rec1")
+ private val r2: ActorVirtualIdentity = ActorVirtualIdentity("rec2")
+ private val r3: ActorVirtualIdentity = ActorVirtualIdentity("rec3")
+
+ private def channel(to: ActorVirtualIdentity): ChannelIdentity =
+ ChannelIdentity(sender, to, isControl = false)
+
+ private val intAttr: Attribute = new Attribute("v", AttributeType.INTEGER)
+ private val intSchema: Schema = Schema().add(intAttr)
+
+ private def intTuple(value: Int): Tuple =
+ Tuple.builder(intSchema).add(intAttr, value).build()
+
+ private val twoStringSchema: Schema = Schema()
+ .add(new Attribute("k", AttributeType.STRING))
+ .add(new Attribute("v", AttributeType.STRING))
+
+ private def stringTuple(k: String, v: String): Tuple =
+ Tuple
+ .builder(twoStringSchema)
+ .add(new Attribute("k", AttributeType.STRING), k)
+ .add(new Attribute("v", AttributeType.STRING), v)
+ .build()
+
+ // -- OneToOnePartitioner --------------------------------------------------
+
+ "OneToOnePartitioner.getBucketIndex" should "always return Iterator(0)" in {
+ val partitioning = OneToOnePartitioning(
+ batchSize = 100,
+ channels = Seq(channel(r1))
+ )
+ val partitioner = OneToOnePartitioner(partitioning, sender)
+ assert(partitioner.getBucketIndex(intTuple(7)).toList == List(0))
+ assert(partitioner.getBucketIndex(intTuple(42)).toList == List(0))
+ }
+
+ "OneToOnePartitioner.allReceivers" should "return the receiver from the
channel matching the actor id" in {
+ val partitioning = OneToOnePartitioning(
+ batchSize = 100,
+ channels = Seq(
+ ChannelIdentity(ActorVirtualIdentity("other-sender"), r2, isControl =
false),
+ channel(r1)
+ )
+ )
+ val partitioner = OneToOnePartitioner(partitioning, sender)
+ assert(partitioner.allReceivers == Seq(r1))
+ }
+
+ // -- BroadcastPartitioner -------------------------------------------------
+
+ "BroadcastPartitioner.getBucketIndex" should "yield every receiver index for
any tuple" in {
+ val partitioning = BroadcastPartitioning(
+ batchSize = 100,
+ channels = Seq(channel(r1), channel(r2), channel(r3))
+ )
+ val partitioner = BroadcastPartitioner(partitioning)
+ assert(partitioner.getBucketIndex(intTuple(0)).toList == List(0, 1, 2))
+ }
+
+ "BroadcastPartitioner" should "deduplicate receivers when channels list a
worker twice" in {
+ val partitioning = BroadcastPartitioning(
+ batchSize = 100,
+ channels = Seq(channel(r1), channel(r1), channel(r2))
+ )
+ val partitioner = BroadcastPartitioner(partitioning)
+ assert(partitioner.allReceivers == Seq(r1, r2))
+ assert(partitioner.getBucketIndex(intTuple(0)).toList == List(0, 1))
+ }
+
+ // -- RoundRobinPartitioner ------------------------------------------------
+
+ "RoundRobinPartitioner.getBucketIndex" should "cycle through bucket indices"
in {
+ val partitioning = RoundRobinPartitioning(
+ batchSize = 100,
+ channels = Seq(channel(r1), channel(r2), channel(r3))
+ )
+ val partitioner = RoundRobinPartitioner(partitioning)
+
+ val indices = (1 to 7).map(_ =>
partitioner.getBucketIndex(intTuple(0)).next()).toList
+ // Implementation increments first, then emits. Starting from 0, the first
+ // emitted index is therefore 1, then 2, then 0, repeating.
+ assert(indices == List(1, 2, 0, 1, 2, 0, 1))
+ }
+
+ "RoundRobinPartitioner.allReceivers" should "preserve channel order while
deduplicating" in {
+ val partitioning = RoundRobinPartitioning(
+ batchSize = 100,
+ channels = Seq(channel(r2), channel(r1), channel(r2))
+ )
+ val partitioner = RoundRobinPartitioner(partitioning)
+ assert(partitioner.allReceivers == Seq(r2, r1))
+ }
+
+ // -- HashBasedShufflePartitioner ------------------------------------------
+
+ "HashBasedShufflePartitioner.getBucketIndex" should "return a non-negative
index within the receiver count" in {
+ val partitioning = HashBasedShufflePartitioning(
+ batchSize = 100,
+ channels = Seq(channel(r1), channel(r2), channel(r3)),
+ hashAttributeNames = Seq("k")
+ )
+ val partitioner = HashBasedShufflePartitioner(partitioning)
+
+ (0 until 50).foreach { i =>
+ val idx = partitioner.getBucketIndex(stringTuple(s"key-$i", "v")).next()
+ assert(idx >= 0 && idx < 3, s"index $idx out of range for tuple key-$i")
+ }
+ }
+
+ it should "be deterministic for the same input tuple" in {
+ val partitioning = HashBasedShufflePartitioning(
+ batchSize = 100,
+ channels = Seq(channel(r1), channel(r2), channel(r3)),
+ hashAttributeNames = Seq("k")
+ )
+ val partitioner = HashBasedShufflePartitioner(partitioning)
+
+ // Same tuple instance, two consecutive calls — the contract says the
+ // second call must produce the same bucket as the first.
+ val tuple = stringTuple("alpha", "ignored")
+ val first = partitioner.getBucketIndex(tuple).next()
+ val second = partitioner.getBucketIndex(tuple).next()
+ assert(first == second)
+ }
+
+ it should "depend only on the hash-attribute subset, not on other fields" in
{
+ val partitioning = HashBasedShufflePartitioning(
+ batchSize = 100,
+ channels = Seq(channel(r1), channel(r2), channel(r3)),
+ hashAttributeNames = Seq("k")
+ )
+ val partitioner = HashBasedShufflePartitioner(partitioning)
+
+ // Sweep several (k, v) pairs so a buggy implementation that hashes the
+ // full tuple would have to collide modulo 3 on every single key — which
+ // is not realistic for any reasonable hash. For each k, vary the second
+ // field across multiple values; the bucket must be the same for all of
+ // them.
+ val keys = Seq("alpha", "beta", "gamma", "delta", "epsilon", "zeta")
+ val varyingSecondField = (0 until 8).map(i => s"v-$i")
+ keys.foreach { k =>
+ val buckets =
+ varyingSecondField.map(v => partitioner.getBucketIndex(stringTuple(k,
v)).next())
+ assert(
+ buckets.distinct.size == 1,
+ s"key=$k produced different buckets when varying the non-hash field:
$buckets"
+ )
+ }
+ }
+
+ it should "use the full tuple when no hash attributes are configured" in {
+ val partitioning = HashBasedShufflePartitioning(
+ batchSize = 100,
+ channels = Seq(channel(r1), channel(r2), channel(r3)),
+ hashAttributeNames = Seq.empty
+ )
+ val partitioner = HashBasedShufflePartitioner(partitioning)
+
+ // Hold k constant; vary the second field across many values. If the
+ // partitioner hashed only the (empty) hash-attr subset, every bucket
+ // would collapse to a single value. With the full tuple feeding the
+ // hash, varying v across enough samples must produce more than one
+ // distinct bucket among 3 receivers.
+ val sampleSize = 50
+ val buckets =
+ (0 until sampleSize).map(i =>
partitioner.getBucketIndex(stringTuple("k", s"v-$i")).next())
+ buckets.foreach(idx => assert(idx >= 0 && idx < 3))
+ assert(
+ buckets.distinct.size > 1,
+ s"empty hashAttributeNames should hash the full tuple, but $sampleSize
samples all landed in: ${buckets.distinct}"
+ )
+ }
+
+ "HashBasedShufflePartitioner.allReceivers" should "deduplicate channel
destinations" in {
+ val partitioning = HashBasedShufflePartitioning(
+ batchSize = 100,
+ channels = Seq(channel(r1), channel(r2), channel(r1)),
+ hashAttributeNames = Seq("k")
+ )
+ val partitioner = HashBasedShufflePartitioner(partitioning)
+ assert(partitioner.allReceivers == Seq(r1, r2))
+ }
+}