This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new f33baafd1a test(amber): add unit test coverage for scheduling configs 
(#4821)
f33baafd1a is described below

commit f33baafd1a353f542ea8aac7587d75ac5170b2fa
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun May 3 12:13:23 2026 -0700

    test(amber): add unit test coverage for scheduling configs (#4821)
    
    ### What changes were proposed in this PR?
    
    Add `SchedulingConfigsSpec` covering the six value/builder classes in
    
`amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config`:
    
    - `ChannelConfig.generateChannelConfigs` across every `PartitionInfo`
    branch (cross-product, fan-in with size-1 assertion, zip, catch-all
    empty)
    - `LinkConfig.toPartitioning` across every `PartitionInfo` branch with
    parameter propagation (hash attrs, range bounds, batch size)
    - `PortConfig` hierarchy (`OutputPortConfig` /
    `IntermediateInputPortConfig` / `InputPortConfig`) and the `storageURIs`
    projection
    - `OperatorConfig.empty` and case-class round-trip
    - `ResourceConfig` three-empty-maps default
    - `WorkerConfig.generateWorkerConfigs` non-parallelizable / suggested /
    configured-default branches
    
    ### Any related issues, documentation, discussions?
    
    Closes #4820
    
    ### How was this PR tested?
    
    `sbt "WorkflowExecutionService/testOnly
    
org.apache.texera.amber.engine.architecture.scheduling.config.SchedulingConfigsSpec"`
    — 25/25 tests pass.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.7)
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../scheduling/config/SchedulingConfigsSpec.scala  | 311 +++++++++++++++++++++
 1 file changed, 311 insertions(+)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/SchedulingConfigsSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/SchedulingConfigsSpec.scala
new file mode 100644
index 0000000000..25ef739251
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/SchedulingConfigsSpec.scala
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.scheduling.config
+
+import org.apache.texera.amber.config.ApplicationConfig
+import org.apache.texera.amber.core.executor.OpExecInitInfo
+import org.apache.texera.amber.core.virtualidentity.{
+  ActorVirtualIdentity,
+  ChannelIdentity,
+  ExecutionIdentity,
+  OperatorIdentity,
+  PhysicalOpIdentity,
+  WorkflowIdentity
+}
+import org.apache.texera.amber.core.workflow._
+import 
org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._
+import org.scalatest.flatspec.AnyFlatSpec
+
+import java.net.URI
+
+class SchedulingConfigsSpec extends AnyFlatSpec {
+
+  private def actor(name: String): ActorVirtualIdentity = 
ActorVirtualIdentity(name)
+  private def chan(from: ActorVirtualIdentity, to: ActorVirtualIdentity): 
ChannelIdentity =
+    ChannelIdentity(from, to, isControl = false)
+
+  // 
---------------------------------------------------------------------------
+  // ChannelConfig.generateChannelConfigs
+  // 
---------------------------------------------------------------------------
+
+  "ChannelConfig.generateChannelConfigs" should "produce a full cross-product 
for HashPartition" in {
+    val from = List(actor("f1"), actor("f2"))
+    val to = List(actor("t1"), actor("t2"), actor("t3"))
+    val configs =
+      ChannelConfig.generateChannelConfigs(from, to, PortIdentity(0), 
HashPartition(List("k")))
+    assert(configs.size == 6)
+    assert(configs.map(_.channelId).toSet == (for (f <- from; t <- to) yield 
chan(f, t)).toSet)
+    configs.foreach(c => assert(c.toPortId == PortIdentity(0)))
+  }
+
+  it should "produce a full cross-product for RangePartition" in {
+    val from = List(actor("f1"))
+    val to = List(actor("t1"), actor("t2"))
+    val configs = ChannelConfig.generateChannelConfigs(
+      from,
+      to,
+      PortIdentity(1),
+      new RangePartition(List("k"), 0L, 10L)
+    )
+    assert(configs.size == 2)
+  }
+
+  it should "produce a full cross-product for BroadcastPartition" in {
+    val from = List(actor("f1"), actor("f2"))
+    val to = List(actor("t1"), actor("t2"))
+    val configs =
+      ChannelConfig.generateChannelConfigs(from, to, PortIdentity(0), 
BroadcastPartition())
+    assert(configs.size == 4)
+  }
+
+  it should "produce a full cross-product for UnknownPartition" in {
+    val from = List(actor("f1"))
+    val to = List(actor("t1"), actor("t2"))
+    val configs =
+      ChannelConfig.generateChannelConfigs(from, to, PortIdentity(0), 
UnknownPartition())
+    assert(configs.size == 2)
+  }
+
+  it should "fan-in to a single receiver for SinglePartition" in {
+    val from = List(actor("f1"), actor("f2"), actor("f3"))
+    val to = List(actor("only-receiver"))
+    val configs =
+      ChannelConfig.generateChannelConfigs(from, to, PortIdentity(0), 
SinglePartition())
+    assert(configs.size == 3)
+    assert(configs.forall(_.channelId.toWorkerId == actor("only-receiver")))
+  }
+
+  it should "fail the SinglePartition assertion when toWorkerIds has more than 
one entry" in {
+    val from = List(actor("f1"))
+    val to = List(actor("t1"), actor("t2"))
+    assertThrows[AssertionError] {
+      ChannelConfig.generateChannelConfigs(from, to, PortIdentity(0), 
SinglePartition())
+    }
+  }
+
+  it should "zip from/to in OneToOnePartition" in {
+    val from = List(actor("f1"), actor("f2"), actor("f3"))
+    val to = List(actor("t1"), actor("t2"), actor("t3"))
+    val configs =
+      ChannelConfig.generateChannelConfigs(from, to, PortIdentity(0), 
OneToOnePartition())
+    assert(configs.size == 3)
+    val pairs = configs.map(c => (c.channelId.fromWorkerId, 
c.channelId.toWorkerId))
+    assert(
+      pairs == List(
+        (actor("f1"), actor("t1")),
+        (actor("f2"), actor("t2")),
+        (actor("f3"), actor("t3"))
+      )
+    )
+  }
+
+  it should "produce empty list for unhandled partition cases" in {
+    // PartitionInfo is sealed, so `null` is the only value that falls through
+    // the named cases without adding a new subtype. This pins the catch-all
+    // `case _ => List()` branch.
+    val configs = ChannelConfig.generateChannelConfigs(
+      List(actor("f")),
+      List(actor("t")),
+      PortIdentity(0),
+      null.asInstanceOf[PartitionInfo]
+    )
+    assert(configs.isEmpty)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // LinkConfig.toPartitioning
+  // 
---------------------------------------------------------------------------
+
+  "LinkConfig.toPartitioning" should "map HashPartition to 
HashBasedShufflePartitioning carrying its hash attributes" in {
+    val from = List(actor("f"))
+    val to = List(actor("t1"), actor("t2"))
+    val partitioning =
+      LinkConfig.toPartitioning(from, to, HashPartition(List("a", "b")), 
dataTransferBatchSize = 50)
+    val hashed = partitioning.asInstanceOf[HashBasedShufflePartitioning]
+    assert(hashed.batchSize == 50)
+    assert(hashed.hashAttributeNames == List("a", "b"))
+    assert(hashed.channels.size == 2)
+  }
+
+  it should "map RangePartition to RangeBasedShufflePartitioning carrying its 
range bounds" in {
+    val from = List(actor("f"))
+    val to = List(actor("t1"))
+    val partitioning = LinkConfig.toPartitioning(
+      from,
+      to,
+      new RangePartition(List("a"), 0L, 99L),
+      dataTransferBatchSize = 10
+    )
+    val ranged = partitioning.asInstanceOf[RangeBasedShufflePartitioning]
+    assert(ranged.batchSize == 10)
+    assert(ranged.rangeMin == 0L)
+    assert(ranged.rangeMax == 99L)
+    assert(ranged.rangeAttributeNames == List("a"))
+  }
+
+  it should "map SinglePartition to OneToOnePartitioning fanned in to the 
single receiver" in {
+    val from = List(actor("f1"), actor("f2"))
+    val to = List(actor("only"))
+    val partitioning =
+      LinkConfig.toPartitioning(from, to, SinglePartition(), 
dataTransferBatchSize = 1)
+    val one = partitioning.asInstanceOf[OneToOnePartitioning]
+    assert(one.channels.forall(_.toWorkerId == actor("only")))
+    assert(one.channels.size == 2)
+  }
+
+  it should "fail the SinglePartition assertion when toWorkerIds has more than 
one entry" in {
+    val from = List(actor("f"))
+    val to = List(actor("t1"), actor("t2"))
+    assertThrows[AssertionError] {
+      LinkConfig.toPartitioning(from, to, SinglePartition(), 
dataTransferBatchSize = 1)
+    }
+  }
+
+  it should "map OneToOnePartition to OneToOnePartitioning over zipped pairs" 
in {
+    val from = List(actor("f1"), actor("f2"))
+    val to = List(actor("t1"), actor("t2"))
+    val partitioning =
+      LinkConfig.toPartitioning(from, to, OneToOnePartition(), 
dataTransferBatchSize = 1)
+    val one = partitioning.asInstanceOf[OneToOnePartitioning]
+    assert(one.channels.size == 2)
+    assert(one.channels.head == chan(actor("f1"), actor("t1")))
+  }
+
+  it should "map BroadcastPartition to BroadcastPartitioning over zipped 
pairs" in {
+    val from = List(actor("f1"), actor("f2"))
+    val to = List(actor("t1"), actor("t2"))
+    val partitioning =
+      LinkConfig.toPartitioning(from, to, BroadcastPartition(), 
dataTransferBatchSize = 1)
+    assert(partitioning.isInstanceOf[BroadcastPartitioning])
+  }
+
+  it should "map UnknownPartition to RoundRobinPartitioning across the 
cross-product" in {
+    val from = List(actor("f1"), actor("f2"))
+    val to = List(actor("t1"), actor("t2"))
+    val partitioning =
+      LinkConfig.toPartitioning(from, to, UnknownPartition(), 
dataTransferBatchSize = 1)
+    val rr = partitioning.asInstanceOf[RoundRobinPartitioning]
+    assert(rr.channels.size == 4)
+  }
+
+  it should "throw UnsupportedOperationException for unhandled partition 
cases" in {
+    // PartitionInfo is sealed; `null` is the only value that falls through
+    // the named cases without adding a new subtype. This pins the catch-all
+    // `case _ => throw new UnsupportedOperationException()` branch.
+    assertThrows[UnsupportedOperationException] {
+      LinkConfig.toPartitioning(
+        List(actor("f")),
+        List(actor("t")),
+        null.asInstanceOf[PartitionInfo],
+        dataTransferBatchSize = 1
+      )
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // PortConfig hierarchy
+  // 
---------------------------------------------------------------------------
+
+  "OutputPortConfig" should "expose its single storage URI via storageURIs" in 
{
+    val uri = new URI("vfs:///wid/1/eid/1/result")
+    val cfg = OutputPortConfig(uri)
+    assert(cfg.storageURIs == List(uri))
+  }
+
+  "IntermediateInputPortConfig" should "expose every URI it was constructed 
with" in {
+    val uris = List(new URI("vfs:///a"), new URI("vfs:///b"))
+    val cfg = IntermediateInputPortConfig(uris)
+    assert(cfg.storageURIs == uris)
+  }
+
+  "InputPortConfig" should "expose the URI projection of its storage pairs in 
order" in {
+    val a = new URI("vfs:///a")
+    val b = new URI("vfs:///b")
+    val partitioningA = OneToOnePartitioning(1, Seq.empty)
+    val partitioningB = OneToOnePartitioning(2, Seq.empty)
+    val cfg = InputPortConfig(List((a, partitioningA), (b, partitioningB)))
+    assert(cfg.storageURIs == List(a, b))
+  }
+
+  // 
---------------------------------------------------------------------------
+  // OperatorConfig
+  // 
---------------------------------------------------------------------------
+
+  "OperatorConfig.empty" should "have no worker configs" in {
+    assert(OperatorConfig.empty.workerConfigs.isEmpty)
+  }
+
+  it should "preserve the workerConfigs given at construction" in {
+    val configs = List(WorkerConfig(actor("w1")), WorkerConfig(actor("w2")))
+    val op = OperatorConfig(configs)
+    assert(op.workerConfigs == configs)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // ResourceConfig defaults
+  // 
---------------------------------------------------------------------------
+
+  "ResourceConfig" should "default all three maps to empty" in {
+    val rc = ResourceConfig()
+    assert(rc.operatorConfigs.isEmpty)
+    assert(rc.linkConfigs.isEmpty)
+    assert(rc.portConfigs.isEmpty)
+  }
+
+  // 
---------------------------------------------------------------------------
+  // WorkerConfig.generateWorkerConfigs
+  // 
---------------------------------------------------------------------------
+
+  private def physicalOp(parallelizable: Boolean, suggested: Option[Int]): 
PhysicalOp =
+    PhysicalOp(
+      PhysicalOpIdentity(OperatorIdentity("op"), "main"),
+      WorkflowIdentity(0),
+      ExecutionIdentity(0),
+      OpExecInitInfo.Empty,
+      parallelizable = parallelizable,
+      suggestedWorkerNum = suggested
+    )
+
+  "WorkerConfig.generateWorkerConfigs" should "produce exactly one 
WorkerConfig for non-parallelizable ops" in {
+    val configs =
+      WorkerConfig.generateWorkerConfigs(physicalOp(parallelizable = false, 
suggested = None))
+    assert(configs.size == 1)
+  }
+
+  it should "ignore a suggested worker count for non-parallelizable ops" in {
+    val configs =
+      WorkerConfig.generateWorkerConfigs(physicalOp(parallelizable = false, 
suggested = Some(8)))
+    assert(configs.size == 1)
+  }
+
+  it should "honor the suggested worker count for parallelizable ops" in {
+    val configs =
+      WorkerConfig.generateWorkerConfigs(physicalOp(parallelizable = true, 
suggested = Some(5)))
+    assert(configs.size == 5)
+    // distinct worker ids
+    assert(configs.map(_.workerId).distinct.size == 5)
+  }
+
+  it should "fall back to the configured default when no suggested count is 
given for a parallelizable op" in {
+    val configs =
+      WorkerConfig.generateWorkerConfigs(physicalOp(parallelizable = true, 
suggested = None))
+    assert(configs.size == ApplicationConfig.numWorkerPerOperatorByDefault)
+  }
+}

Reply via email to