This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 1872ca0a91 test(workflow-operator): add unit test coverage for 
ProgressiveUtils (#4912)
1872ca0a91 is described below

commit 1872ca0a91b7acc3140cedb4b103bdaa71883ca2
Author: Xinyuan Lin <[email protected]>
AuthorDate: Mon May 4 03:03:20 2026 -0700

    test(workflow-operator): add unit test coverage for ProgressiveUtils (#4912)
    
    ### What changes were proposed in this PR?
    
    Adds `ProgressiveUtilsSpec` covering `ProgressiveUtils`
    
(common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/sink/ProgressiveUtils.scala),
    which defines the insertion / retraction flag machinery used by
    progressive sinks but has no test coverage today.
    
    The new spec pins:
    - `insertRetractFlagAttr`: the internal flag column is a `BOOLEAN`
    attribute named `__internal_is_insertion`.
    - `addInsertionFlag` / `addRetractionFlag`: prepend the flag column with
    `true` / `false` while preserving the rest of the tuple, and fail an
    assertion if the input tuple already carries the flag column.
    - `isInsertion`: returns `true` for unflagged tuples (the engine default
    for "+"), and reads the flag column when present.
    - `getTupleFlagAndValue`: splits a flagged tuple into `(Boolean, base
    tuple)` with the flag column stripped, and passes unflagged tuples
    through unchanged with the original schema.
    
    No production code changed; this is test-only.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4911
    
    ### How was this PR tested?
    
    Added 10 new unit tests in `ProgressiveUtilsSpec`. Verified locally:
    
    ```
    sbt 'WorkflowOperator/Test/testOnly 
org.apache.texera.amber.operator.sink.ProgressiveUtilsSpec'
    # → Tests: succeeded 10, failed 0
    
    sbt 'WorkflowOperator/Test/scalafmtCheck'
    # → clean
    ```
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../amber/operator/sink/ProgressiveUtilsSpec.scala | 219 +++++++++++++++++++++
 1 file changed, 219 insertions(+)

diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sink/ProgressiveUtilsSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sink/ProgressiveUtilsSpec.scala
new file mode 100644
index 0000000000..e2a5674058
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/sink/ProgressiveUtilsSpec.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.sink
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class ProgressiveUtilsSpec extends AnyFlatSpec {
+
+  // --- helpers 
---------------------------------------------------------------
+
+  private val baseSchema: Schema = new Schema(
+    new Attribute("id", AttributeType.INTEGER),
+    new Attribute("name", AttributeType.STRING)
+  )
+
+  // outputSchema = flag column prepended to baseSchema
+  private val outputSchema: Schema = new Schema(
+    ProgressiveUtils.insertRetractFlagAttr,
+    new Attribute("id", AttributeType.INTEGER),
+    new Attribute("name", AttributeType.STRING)
+  )
+
+  private def baseTuple(id: Int, name: String): Tuple =
+    Tuple
+      .builder(baseSchema)
+      .add(new Attribute("id", AttributeType.INTEGER), Int.box(id))
+      .add(new Attribute("name", AttributeType.STRING), name)
+      .build()
+
+  // --- insertRetractFlagAttr 
-------------------------------------------------
+
+  "ProgressiveUtils.insertRetractFlagAttr" should "be a BOOLEAN attribute 
named __internal_is_insertion" in {
+    val attr = ProgressiveUtils.insertRetractFlagAttr
+    assert(attr.getName == "__internal_is_insertion")
+    assert(attr.getType == AttributeType.BOOLEAN)
+  }
+
+  // --- addInsertionFlag / addRetractionFlag 
----------------------------------
+
+  "ProgressiveUtils.addInsertionFlag" should "prepend the flag column with 
value true" in {
+    val flagged = ProgressiveUtils.addInsertionFlag(baseTuple(1, "alice"), 
outputSchema)
+    assert(flagged.getSchema == outputSchema)
+    
assert(flagged.getField[Boolean](ProgressiveUtils.insertRetractFlagAttr.getName)
 == true)
+    assert(flagged.getField[Integer]("id") == 1)
+    assert(flagged.getField[String]("name") == "alice")
+  }
+
+  "ProgressiveUtils.addRetractionFlag" should "prepend the flag column with 
value false" in {
+    val flagged = ProgressiveUtils.addRetractionFlag(baseTuple(2, "bob"), 
outputSchema)
+    
assert(flagged.getField[Boolean](ProgressiveUtils.insertRetractFlagAttr.getName)
 == false)
+    assert(flagged.getField[Integer]("id") == 2)
+    assert(flagged.getField[String]("name") == "bob")
+  }
+
+  it should "fail an assertion if addRetractionFlag is called on an 
already-flagged tuple" in {
+    val alreadyFlagged = ProgressiveUtils.addInsertionFlag(baseTuple(3, "x"), 
outputSchema)
+    intercept[AssertionError] {
+      ProgressiveUtils.addRetractionFlag(alreadyFlagged, outputSchema)
+    }
+  }
+
+  it should "fail an assertion if addInsertionFlag is called on an 
already-flagged tuple" in {
+    // Symmetric guard: both addInsertionFlag and addRetractionFlag carry the
+    // same `assert(!containsAttribute(flagAttr))` precondition, and either
+    // one may be called on already-flagged data, so each path should fail.
+    val alreadyFlagged = ProgressiveUtils.addRetractionFlag(baseTuple(4, "y"), 
outputSchema)
+    intercept[AssertionError] {
+      ProgressiveUtils.addInsertionFlag(alreadyFlagged, outputSchema)
+    }
+  }
+
+  // --- isInsertion 
-----------------------------------------------------------
+
+  "ProgressiveUtils.isInsertion" should "return true for an unflagged tuple" 
in {
+    // Tuples without the flag column default to insertion (the unflagged
+    // default in the engine is "+").
+    assert(ProgressiveUtils.isInsertion(baseTuple(1, "x")))
+  }
+
+  it should "return true when the flag column is present and true" in {
+    val flagged = ProgressiveUtils.addInsertionFlag(baseTuple(1, "x"), 
outputSchema)
+    assert(ProgressiveUtils.isInsertion(flagged))
+  }
+
+  it should "return false when the flag column is present and false" in {
+    val flagged = ProgressiveUtils.addRetractionFlag(baseTuple(1, "x"), 
outputSchema)
+    assert(!ProgressiveUtils.isInsertion(flagged))
+  }
+
+  // --- getTupleFlagAndValue 
--------------------------------------------------
+
+  "ProgressiveUtils.getTupleFlagAndValue" should "split an insertion-flagged 
tuple into (true, base tuple)" in {
+    val flagged = ProgressiveUtils.addInsertionFlag(baseTuple(1, "alice"), 
outputSchema)
+    val (flag, stripped) = ProgressiveUtils.getTupleFlagAndValue(flagged)
+    assert(flag)
+    // Full schema equality (names + types + order) — name-only would let a
+    // type drift on the payload columns slip through.
+    assert(stripped.getSchema == baseSchema)
+    assert(stripped.getField[Integer]("id") == 1)
+    assert(stripped.getField[String]("name") == "alice")
+  }
+
+  it should "split a retraction-flagged tuple into (false, base tuple)" in {
+    val flagged = ProgressiveUtils.addRetractionFlag(baseTuple(2, "bob"), 
outputSchema)
+    val (flag, stripped) = ProgressiveUtils.getTupleFlagAndValue(flagged)
+    assert(!flag)
+    assert(stripped.getSchema == baseSchema)
+    assert(stripped.getField[Integer]("id") == 2)
+    assert(stripped.getField[String]("name") == "bob")
+  }
+
+  it should "treat an unflagged tuple as insertion and pass an equivalent 
schema through unchanged" in {
+    // For a tuple that doesn't carry the flag column, isInsertion returns
+    // true and getPartialSchema returns an equivalent Schema — same attributes
+    // in the same order (filterNot removes nothing). Note that
+    // Schema.getPartialSchema constructs a new instance every time, so this
+    // is structural equality, not reference identity.
+    val raw = baseTuple(3, "carol")
+    val (flag, stripped) = ProgressiveUtils.getTupleFlagAndValue(raw)
+    assert(flag)
+    assert(stripped.getSchema == raw.getSchema)
+    assert(stripped.getField[Integer]("id") == 3)
+    assert(stripped.getField[String]("name") == "carol")
+  }
+
+  // --- typed payload round-trips --------------------------------------------
+  // Nothing in `addInsertionFlag` / `getTupleFlagAndValue` is type-specific —
+  // they only care about the BOOLEAN flag column they prepend / strip — but
+  // it is worth pinning that arbitrary AttributeType payload columns survive
+  // the flag → strip → unflag round-trip across the engine's value types.
+
+  private def flagRoundTrip(payloadAttr: Attribute, payloadValue: AnyRef): 
(Boolean, AnyRef) = {
+    val payloadSchema = new Schema(payloadAttr)
+    val flaggedSchema = new Schema(ProgressiveUtils.insertRetractFlagAttr, 
payloadAttr)
+    val raw = Tuple.builder(payloadSchema).add(payloadAttr, 
payloadValue).build()
+    val flagged = ProgressiveUtils.addInsertionFlag(raw, flaggedSchema)
+    val (flag, stripped) = ProgressiveUtils.getTupleFlagAndValue(flagged)
+    (flag, stripped.getField[AnyRef](payloadAttr.getName))
+  }
+
+  "Flag round-trip" should "preserve INTEGER payload values" in {
+    val (flag, value) =
+      flagRoundTrip(new Attribute("v", AttributeType.INTEGER), Int.box(42))
+    assert(flag)
+    assert(value == Int.box(42))
+  }
+
+  it should "preserve LONG payload values" in {
+    val (flag, value) =
+      flagRoundTrip(new Attribute("v", AttributeType.LONG), 
Long.box(9876543210L))
+    assert(flag)
+    assert(value == Long.box(9876543210L))
+  }
+
+  it should "preserve DOUBLE payload values" in {
+    val (flag, value) =
+      flagRoundTrip(new Attribute("v", AttributeType.DOUBLE), 
Double.box(3.14159))
+    assert(flag)
+    assert(value == Double.box(3.14159))
+  }
+
+  it should "preserve BOOLEAN payload values (distinct from the flag column)" 
in {
+    // The flag column is also BOOLEAN; this verifies the implementation
+    // selects the correct attribute by name, not by type.
+    val (flag, value) =
+      flagRoundTrip(new Attribute("active", AttributeType.BOOLEAN), 
Boolean.box(false))
+    assert(flag, "outer flag must still be insertion")
+    assert(value == Boolean.box(false), "inner BOOLEAN payload must be 
preserved")
+  }
+
+  it should "preserve TIMESTAMP payload values" in {
+    val ts = new java.sql.Timestamp(1_700_000_000_000L)
+    val (flag, value) =
+      flagRoundTrip(new Attribute("ts", AttributeType.TIMESTAMP), ts)
+    assert(flag)
+    assert(value == ts)
+  }
+
+  it should "preserve BINARY payload values" in {
+    val bytes = Array[Byte](0, 1, 2, 3, -1)
+    val (flag, value) =
+      flagRoundTrip(new Attribute("blob", AttributeType.BINARY), bytes)
+    assert(flag)
+    // Use value-based equality (the Tuple contract elsewhere uses
+    // `sameElements` for Array[Byte]); requiring the *same* array instance
+    // would over-constrain the flag/strip path against future copy-on-write
+    // changes.
+    assert(value.asInstanceOf[Array[Byte]].sameElements(bytes))
+  }
+
+  it should "preserve null payload values for every AttributeType" in {
+    // Cover every member of `AttributeType` (Java enum). Avoid hand-listing —
+    // a future addition to the enum would still be tested.
+    AttributeType.values.foreach { tpe =>
+      val attr = new Attribute(s"v_${tpe.name().toLowerCase}", tpe)
+      val (flag, value) = flagRoundTrip(attr, null)
+      assert(flag)
+      assert(value == null, s"null payload must survive round-trip for $tpe")
+    }
+  }
+}

Reply via email to