This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 99b0963b47 test(workflow-core): add unit test coverage for 
PartitionInfo (#4755)
99b0963b47 is described below

commit 99b0963b472121b41d87622d1bd3659a73d7f7d5
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun May 3 05:40:41 2026 -0700

    test(workflow-core): add unit test coverage for PartitionInfo (#4755)
    
    ### What changes were proposed in this PR?
    
    Add `PartitionInfoSpec` covering the compiler-layer partition lattice in
    `PartitionInfo`:
    
    - `satisfies` reflexivity, the `UnknownPartition` absorption rule,
    cross-class refusal, and `HashPartition` attribute-list sensitivity
    - `merge` self-preservation, fallback to `UnknownPartition` across
    classes, and the `RangePartition` override that always merges to
    `UnknownPartition`
    - `RangePartition.apply` empty-attributes shortcut to `UnknownPartition`
    
    ### Any related issues, documentation, discussions?
    
    Closes #4754
    
    ### How was this PR tested?
    
    `sbt "WorkflowCore/testOnly
    org.apache.texera.amber.core.workflow.PartitionInfoSpec"` — 9/9 tests
    pass.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.7)
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../amber/core/workflow/PartitionInfoSpec.scala    | 159 +++++++++++++++++++++
 1 file changed, 159 insertions(+)

diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/PartitionInfoSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/PartitionInfoSpec.scala
new file mode 100644
index 0000000000..ad7c7d65dc
--- /dev/null
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/workflow/PartitionInfoSpec.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.core.workflow
+
+import org.scalatest.flatspec.AnyFlatSpec
+
+class PartitionInfoSpec extends AnyFlatSpec {
+
+  // The full set of "named" partition kinds we care about cross-checking.
+  // Two HashPartitions with different attribute lists count as a "different
+  // partition" too, so we include both shapes.
+  private val hashA: PartitionInfo = HashPartition(List("a"))
+  private val hashB: PartitionInfo = HashPartition(List("b"))
+  private val rangeA: PartitionInfo = new RangePartition(List("a"), 0L, 10L)
+  private val single: PartitionInfo = SinglePartition()
+  private val broadcast: PartitionInfo = BroadcastPartition()
+  private val oneToOne: PartitionInfo = OneToOnePartition()
+  private val unknown: PartitionInfo = UnknownPartition()
+
+  // Five "primary" partition kinds (excluding the duplicate Hash and the
+  // catch-all Unknown — both handled separately) used for the cross product.
+  private val primaryKinds: List[(String, PartitionInfo)] = List(
+    "HashPartition" -> hashA,
+    "RangePartition" -> rangeA,
+    "SinglePartition" -> single,
+    "BroadcastPartition" -> broadcast,
+    "OneToOnePartition" -> oneToOne
+  )
+
+  "PartitionInfo.satisfies" should "hold reflexively (each partition satisfies 
itself)" in {
+    primaryKinds.foreach {
+      case (name, p) =>
+        assert(p.satisfies(p), s"$name should satisfy itself")
+    }
+    // UnknownPartition reflexively satisfies itself too.
+    assert(unknown.satisfies(unknown))
+    // HashPartition with the same attribute list satisfies itself even
+    // across distinct instances.
+    assert(HashPartition(List("a")).satisfies(HashPartition(List("a"))))
+  }
+
+  it should "fail across the full 5x5 cross-product of distinct primary kinds" 
in {
+    // For every pair of distinct primary partition kinds, satisfies must be
+    // false. This covers the full 5x5 = 25 cell matrix; the diagonal is
+    // covered by the reflexivity test above.
+    for {
+      (lname, lhs) <- primaryKinds
+      (rname, rhs) <- primaryKinds
+      if lhs != rhs
+    } {
+      assert(!lhs.satisfies(rhs), s"$lname must not satisfy $rname")
+    }
+  }
+
+  it should "hold for any primary partition against UnknownPartition" in {
+    primaryKinds.foreach {
+      case (name, p) =>
+        assert(p.satisfies(unknown), s"$name should satisfy UnknownPartition")
+    }
+    // And UnknownPartition satisfies itself.
+    assert(unknown.satisfies(unknown))
+  }
+
+  it should "fail when UnknownPartition is on the LHS against any primary 
kind" in {
+    primaryKinds.foreach {
+      case (name, p) =>
+        assert(!unknown.satisfies(p), s"UnknownPartition must not satisfy 
$name")
+    }
+  }
+
+  it should "fail for HashPartition with different attribute lists (and 
otherwise-equal shape)" in {
+    assert(!hashA.satisfies(hashB))
+    assert(!hashB.satisfies(hashA))
+    // But both still satisfy UnknownPartition.
+    assert(hashA.satisfies(unknown))
+    assert(hashB.satisfies(unknown))
+  }
+
+  "PartitionInfo.merge" should "preserve the partition when merged with itself 
across every kind" in {
+    primaryKinds.foreach {
+      case (name, p) =>
+        // RangePartition has its own override that always returns
+        // UnknownPartition (covered separately below); skip it here.
+        if (!p.isInstanceOf[RangePartition]) {
+          assert(p.merge(p) == p, s"$name should merge with itself to itself")
+        }
+    }
+    // UnknownPartition merges with itself to itself.
+    assert(unknown.merge(unknown) == unknown)
+    // HashPartition with same attributes merges to itself.
+    assert(HashPartition(List("a")).merge(HashPartition(List("a"))) == 
HashPartition(List("a")))
+  }
+
+  it should "fall back to UnknownPartition for the full 5x5 cross-product of 
distinct primary kinds" in {
+    // Every distinct-pair merge produces UnknownPartition.
+    for {
+      (lname, lhs) <- primaryKinds
+      (rname, rhs) <- primaryKinds
+      if lhs != rhs
+    } {
+      assert(
+        lhs.merge(rhs) == unknown,
+        s"$lname.merge($rname) must be UnknownPartition"
+      )
+    }
+  }
+
+  it should "fall back to UnknownPartition when either side is 
UnknownPartition (excluding self-merge)" in {
+    primaryKinds.foreach {
+      case (name, p) =>
+        assert(p.merge(unknown) == unknown, s"$name.merge(Unknown) must be 
Unknown")
+        assert(unknown.merge(p) == unknown, s"Unknown.merge($name) must be 
Unknown")
+    }
+  }
+
+  it should "always return UnknownPartition for RangePartition merges, 
including with itself" in {
+    val r = new RangePartition(List("a"), 0L, 10L)
+    assert(r.merge(r) == unknown, "RangePartition self-merge is overridden to 
Unknown")
+    primaryKinds.foreach {
+      case (name, p) =>
+        assert(r.merge(p) == unknown, s"RangePartition.merge($name) must be 
Unknown")
+    }
+  }
+
+  it should "treat HashPartitions with different attribute lists as distinct 
(merge → Unknown)" in {
+    assert(hashA.merge(hashB) == unknown)
+    assert(hashB.merge(hashA) == unknown)
+  }
+
+  "RangePartition.apply" should "return an UnknownPartition when no range 
attributes are provided" in {
+    assert(RangePartition(List.empty, 0L, 10L) == UnknownPartition())
+  }
+
+  it should "return a RangePartition when at least one range attribute is 
provided" in {
+    val result = RangePartition(List("a"), 0L, 10L)
+    assert(result.isInstanceOf[RangePartition])
+    val rp = result.asInstanceOf[RangePartition]
+    assert(rp.rangeAttributeNames == List("a"))
+    assert(rp.rangeMin == 0L)
+    assert(rp.rangeMax == 10L)
+  }
+}

Reply via email to