This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c895ccba5 test(amber): add unit test coverage for resourcePolicies 
(#4824)
1c895ccba5 is described below

commit 1c895ccba5a63d6ef693a766049ab37e9d417dd3
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun May 3 12:34:26 2026 -0700

    test(amber): add unit test coverage for resourcePolicies (#4824)
    
    ### What changes were proposed in this PR?
    
    Add `ResourcePoliciesSpec` covering `DefaultResourceAllocator` and
    `ExecutionClusterInfo`:
    
    - `ExecutionClusterInfo` no-arg construction
    - `DefaultResourceAllocator.allocate` returns zero cost (current
    placeholder)
    - Produces an `OperatorConfig` entry for every operator in the region
    - Respects `parallelizable` / `suggestedWorkerNum` on each `PhysicalOp`
    - Emits distinct worker ids
    - Produces a `LinkConfig` entry for every link with a non-empty channel
    layout and a non-null `Partitioning`
    - `portConfigs` is empty when the region has no prior `resourceConfig`
    
    ### Any related issues, documentation, discussions?
    
    Closes #4822
    
    ### How was this PR tested?
    
    `sbt "WorkflowExecutionService/testOnly
    
org.apache.texera.amber.engine.architecture.scheduling.resourcePolicies.ResourcePoliciesSpec"`
    — 8/8 tests pass.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.7)
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../resourcePolicies/ResourcePoliciesSpec.scala    | 174 +++++++++++++++++++++
 1 file changed, 174 insertions(+)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/resourcePolicies/ResourcePoliciesSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/resourcePolicies/ResourcePoliciesSpec.scala
new file mode 100644
index 0000000000..63a08e899f
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/resourcePolicies/ResourcePoliciesSpec.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.scheduling.resourcePolicies
+
+import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.engine.architecture.scheduling.{Region, 
RegionIdentity}
+import 
org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.{
+  BroadcastPartitioning,
+  HashBasedShufflePartitioning,
+  OneToOnePartitioning,
+  Partitioning,
+  RangeBasedShufflePartitioning,
+  RoundRobinPartitioning
+}
+import org.apache.texera.amber.engine.e2e.TestUtils.buildWorkflow
+import org.apache.texera.amber.operator.TestOperators
+import org.apache.texera.workflow.LogicalLink
+import org.scalatest.flatspec.AnyFlatSpec
+
+class ResourcePoliciesSpec extends AnyFlatSpec {
+
+  // 
---------------------------------------------------------------------------
+  // ExecutionClusterInfo
+  // 
---------------------------------------------------------------------------
+
+  "ExecutionClusterInfo" should "construct without arguments" in {
+    // No-arg constructor must not throw; the type currently has no observable
+    // state to assert beyond that.
+    new ExecutionClusterInfo()
+  }
+
+  // 
---------------------------------------------------------------------------
+  // DefaultResourceAllocator (helpers + tests)
+  // 
---------------------------------------------------------------------------
+
+  /** Build a small linear `csv -> keyword` workflow to feed the allocator. */
+  private def buildLinearWorkflow() = {
+    val csv = TestOperators.headerlessSmallCsvScanOpDesc()
+    val keyword = TestOperators.keywordSearchOpDesc("column-1", "Asia")
+    buildWorkflow(
+      List(csv, keyword),
+      List(
+        LogicalLink(
+          csv.operatorIdentifier,
+          PortIdentity(0),
+          keyword.operatorIdentifier,
+          PortIdentity(0)
+        )
+      ),
+      new WorkflowContext()
+    )
+  }
+
+  private def newAllocator(): (DefaultResourceAllocator, Region) = {
+    val workflow = buildLinearWorkflow()
+    val allocator = new DefaultResourceAllocator(
+      workflow.physicalPlan,
+      new ExecutionClusterInfo(),
+      workflow.context.workflowSettings
+    )
+    val region = Region(
+      id = RegionIdentity(0),
+      physicalOps = workflow.physicalPlan.operators,
+      physicalLinks = workflow.physicalPlan.links
+    )
+    (allocator, region)
+  }
+
+  "DefaultResourceAllocator.allocate" should "return zero cost (placeholder)" 
in {
+    val (allocator, region) = newAllocator()
+    val (_, cost) = allocator.allocate(region)
+    assert(cost == 0d)
+  }
+
+  it should "produce an OperatorConfig entry for every operator in the region" 
in {
+    val (allocator, region) = newAllocator()
+    val (resourceConfig, _) = allocator.allocate(region)
+    val opIds = region.getOperators.map(_.id)
+    assert(resourceConfig.operatorConfigs.keySet == opIds)
+  }
+
+  it should "respect parallelizable / suggested-worker settings on each 
PhysicalOp" in {
+    val (allocator, region) = newAllocator()
+    val (resourceConfig, _) = allocator.allocate(region)
+    region.getOperators.foreach { op =>
+      val workers = resourceConfig.operatorConfigs(op.id).workerConfigs.size
+      val expected =
+        if (!op.parallelizable) 1
+        else
+          op.suggestedWorkerNum.getOrElse(
+            
org.apache.texera.amber.config.ApplicationConfig.numWorkerPerOperatorByDefault
+          )
+      assert(workers == expected, s"unexpected worker count for ${op.id}")
+    }
+  }
+
+  it should "honor an explicit suggestedWorkerNum on a parallelizable op" in {
+    val workflow = buildLinearWorkflow()
+    val keywordPhysicalOpId =
+      workflow.physicalPlan.operators.find(_.parallelizable).map(_.id).get
+    val rebuiltOps = workflow.physicalPlan.operators.map { op =>
+      if (op.id == keywordPhysicalOpId) op.withSuggestedWorkerNum(7) else op
+    }
+    val rebuiltPlan = workflow.physicalPlan.copy(operators = rebuiltOps)
+    val allocator = new DefaultResourceAllocator(
+      rebuiltPlan,
+      new ExecutionClusterInfo(),
+      workflow.context.workflowSettings
+    )
+    val region = Region(
+      id = RegionIdentity(0),
+      physicalOps = rebuiltOps,
+      physicalLinks = rebuiltPlan.links
+    )
+    val (resourceConfig, _) = allocator.allocate(region)
+    
assert(resourceConfig.operatorConfigs(keywordPhysicalOpId).workerConfigs.size 
== 7)
+  }
+
+  it should "emit distinct worker ids per operator" in {
+    val (allocator, region) = newAllocator()
+    val (resourceConfig, _) = allocator.allocate(region)
+    val ids = 
resourceConfig.operatorConfigs.values.flatMap(_.workerConfigs.map(_.workerId)).toList
+    assert(ids.distinct.size == ids.size, s"duplicate worker ids in $ids")
+  }
+
+  it should "produce a LinkConfig entry for every physical link in the region" 
in {
+    val (allocator, region) = newAllocator()
+    val (resourceConfig, _) = allocator.allocate(region)
+    assert(resourceConfig.linkConfigs.keySet == region.getLinks)
+  }
+
+  it should "wire each LinkConfig so its Partitioning channels match its 
channelConfigs" in {
+    val (allocator, region) = newAllocator()
+    val (resourceConfig, _) = allocator.allocate(region)
+    resourceConfig.linkConfigs.values.foreach { link =>
+      assert(link.channelConfigs.nonEmpty)
+      val partitioningChannels = partitioningOf(link.partitioning)
+      assert(partitioningChannels == link.channelConfigs.map(_.channelId))
+    }
+  }
+
+  private def partitioningOf(p: Partitioning) =
+    p match {
+      case x: OneToOnePartitioning          => x.channels
+      case x: RoundRobinPartitioning        => x.channels
+      case x: HashBasedShufflePartitioning  => x.channels
+      case x: RangeBasedShufflePartitioning => x.channels
+      case x: BroadcastPartitioning         => x.channels
+      case other                            => fail(s"allocator emitted 
unexpected Partitioning: $other")
+    }
+
+  it should "leave portConfigs empty when the region has no prior 
resourceConfig" in {
+    val (allocator, region) = newAllocator()
+    val (resourceConfig, _) = allocator.allocate(region)
+    assert(resourceConfig.portConfigs.isEmpty)
+  }
+}

Reply via email to