This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new f46335deba test(workflow-operator): add unit test coverage for 
MapOpExec (#4778)
f46335deba is described below

commit f46335deba1a876cfa7dbb9a1d15ae7eee47848a
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun May 3 02:05:17 2026 -0700

    test(workflow-operator): add unit test coverage for MapOpExec (#4778)
    
    ### What changes were proposed in this PR?
    
    Add `MapOpExecSpec` covering the function-delegating contract of
    `MapOpExec`:
    
    - `processTuple` emits exactly one tuple per input by applying the
    configured `mapFunc`
    - When `mapFunc` returns the input tuple, `processTuple` returns the
    same instance (reference identity)
    - Output is always a single-element iterator
    - `setMapFunc` overwrites a previously installed function
    - `processTuple` throws `NullPointerException` when invoked before
    `setMapFunc`
    
    ### Any related issues, documentation, discussions?
    
    Closes #4777
    
    ### How was this PR tested?
    
    `sbt "WorkflowOperator/testOnly
    org.apache.texera.amber.operator.map.MapOpExecSpec"` — 5/5 tests pass.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.7)
    
    ---------
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../texera/amber/operator/map/MapOpExecSpec.scala  | 125 +++++++++++++++++++++
 1 file changed, 125 insertions(+)

diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/map/MapOpExecSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/map/MapOpExecSpec.scala
new file mode 100644
index 0000000000..dcb27a8543
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/map/MapOpExecSpec.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.map
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class MapOpExecSpec extends AnyFlatSpec {
+
+  private val vAttr = new Attribute("v", AttributeType.INTEGER)
+  private val schema: Schema = Schema().add(vAttr)
+
+  // Use the schema's Attribute when adding fields so the helper stays
+  // consistent with the schema under test.
+  private def tuple(v: Int): Tuple =
+    Tuple.builder(schema).add(schema.getAttribute("v"), 
Integer.valueOf(v)).build()
+
+  private class TestMap extends MapOpExec
+
+  "MapOpExec.processTuple" should "emit exactly one tuple per input by 
applying the configured mapFunc" in {
+    val exec = new TestMap()
+    exec.setMapFunc((t: Tuple) => tuple(t.getField[Int]("v") * 2))
+
+    val out = exec.processTuple(tuple(3), 0).toList
+    assert(out == List(tuple(6)))
+  }
+
+  it should "apply a doubling map function to a stream of tuples" in {
+    val exec = new TestMap()
+    exec.setMapFunc((t: Tuple) => tuple(t.getField[Int]("v") * 2))
+    val out = (1 to 5).flatMap(v => exec.processTuple(tuple(v), 0).toList)
+    assert(out.map(_.asInstanceOf[Tuple]) == (1 to 5).map(v => tuple(v * 2)))
+  }
+
+  it should "apply a constant map function regardless of input" in {
+    val exec = new TestMap()
+    exec.setMapFunc((_: Tuple) => tuple(99))
+    val out = Seq(1, 2, 3).map(v => exec.processTuple(tuple(v), 
0).toList.head.asInstanceOf[Tuple])
+    assert(out.forall(_ == tuple(99)))
+  }
+
+  it should "apply a stateful map function (closes over an external counter)" 
in {
+    val exec = new TestMap()
+    var counter = 0
+    exec.setMapFunc { (t: Tuple) =>
+      counter += 1
+      tuple(t.getField[Int]("v") + counter)
+    }
+    val out = (1 to 3).map(v => exec.processTuple(tuple(v), 
0).toList.head.asInstanceOf[Tuple])
+    // counter goes 1, 2, 3 → outputs 1+1, 2+2, 3+3
+    assert(out == List(tuple(2), tuple(4), tuple(6)))
+    assert(counter == 3)
+  }
+
+  it should "support a map function that produces a tuple with a different 
schema" in {
+    val outSchema =
+      Schema().add(new Attribute("name", AttributeType.STRING))
+    val exec = new TestMap()
+    exec.setMapFunc { (t: Tuple) =>
+      Tuple
+        .builder(outSchema)
+        .add(outSchema.getAttribute("name"), s"v=${t.getField[Int]("v")}")
+        .build()
+    }
+    val out = exec.processTuple(tuple(7), 0).toList
+    assert(out.size == 1)
+    val mapped = out.head.asInstanceOf[Tuple]
+    assert(mapped.getField[String]("name") == "v=7")
+  }
+
+  it should "return the same instance when mapFunc returns the input tuple" in 
{
+    val exec = new TestMap()
+    exec.setMapFunc((t: Tuple) => t)
+
+    val input = tuple(7)
+    val out = exec.processTuple(input, 0).toList
+    assert(out.size == 1)
+    // Reference identity: processTuple should not copy or rebuild the tuple
+    // when mapFunc returns the same instance.
+    assert(out.head eq input)
+  }
+
+  it should "always wrap the result in a single-element iterator" in {
+    val exec = new TestMap()
+    exec.setMapFunc((_: Tuple) => tuple(0))
+
+    val it = exec.processTuple(tuple(99), 0)
+    assert(it.hasNext)
+    it.next()
+    assert(!it.hasNext)
+  }
+
+  "MapOpExec.setMapFunc" should "overwrite a previously installed function" in 
{
+    val exec = new TestMap()
+    exec.setMapFunc((_: Tuple) => tuple(1))
+    exec.setMapFunc((_: Tuple) => tuple(2))
+
+    val out = exec.processTuple(tuple(0), 0).toList
+    assert(out == List(tuple(2)))
+  }
+
+  it should "throw NullPointerException when mapFunc is invoked before 
setMapFunc" in {
+    val exec = new TestMap()
+    assertThrows[NullPointerException] {
+      exec.processTuple(tuple(0), 0).toList
+    }
+  }
+}

Reply via email to