This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 57640a74a3 test(workflow-core): add unit test coverage for core 
executor traits + factory (#5400)
57640a74a3 is described below

commit 57640a74a333d0ba01b723704becac5d7e73238d
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Jun 6 15:40:08 2026 -0700

    test(workflow-core): add unit test coverage for core executor traits + 
factory (#5400)
    
    ### What changes were proposed in this PR?
    
    `CoreExecutorReflectionSpec` (same package) already covered the basic
    defaults of `OperatorExecutor` / `SourceOperatorExecutor` and the
    `ExecFactory.newExecFromJavaClassName` happy paths +
    `JavaRuntimeCompilation` failure path. This PR extends the existing spec
    with five additional regression pins, without duplicating any existing
    assertion.
    
    | Surface | Pinned (new in this PR) |
    | --- | --- |
    | `OperatorExecutor.open` / `close` | A subclass override is reachable
    from every call site that invokes `open` / `close` on the trait
    (counter-based pin; a direct no-op-stub regression that bypassed
    dispatch wouldn't fail the existing default-stays-no-op tests). |
    | `OperatorExecutor.processTupleMultiPort` | The default wrapper hands
    the input `port` verbatim to the subclass override (a regression that
    hard-coded `port = 0` would be invisible to the basic delegation test).
    Zero-emission and many-emission fan-out from the underlying
    `processTuple` both pass through with `Option.empty` port. |
    | `SourceOperatorExecutor.processTupleMultiPort` | The input-side path
    never invokes `produceTuple` (counter-based pin; an accidental drain
    that discarded results wouldn't fail an empty-iterator check). |
    | `ExecFactory.newExecFromJavaClassName` | `NoSuchMethodException`
    propagates when neither factory branch matches a fixture with only a
    `(Long)` constructor (the existing tests pin the success paths and
    `ClassNotFoundException`, not the "no constructor matches" branch). |
    
    The PR body's earlier claim that "this package has no unit tests today"
    was inaccurate; the existing spec was missed during my initial coverage
    check because its `Reflection` suffix doesn't match any source-class
    name pattern. Consolidated per Copilot review.
    
    No production code changed; this is test-only.
    
    ### Any related issues, documentation, discussions?
    
    Closes #5396
    
    ### How was this PR tested?
    
    ```
    sbt "WorkflowCore/Test/testOnly 
org.apache.texera.amber.core.executor.CoreExecutorReflectionSpec"
    # → 23 tests (18 pre-existing + 5 new), all pass
    
    sbt "WorkflowCore/Test/scalafmtCheck"
    # → clean
    ```
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.7)
---
 .../core/executor/CoreExecutorReflectionSpec.scala | 86 ++++++++++++++++++++++
 1 file changed, 86 insertions(+)

diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/executor/CoreExecutorReflectionSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/executor/CoreExecutorReflectionSpec.scala
index 6357e336c6..3cec8b0e97 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/executor/CoreExecutorReflectionSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/executor/CoreExecutorReflectionSpec.scala
@@ -51,6 +51,26 @@ class CoreExecutorReflectionSpec extends AnyFlatSpec {
     succeed
   }
 
+  "OperatorExecutor.open / close" should
+    "honor a subclass override (counting invocations across the lifecycle)" in 
{
+    // Pin that the defaults are real method dispatches, not direct calls
+    // to no-op stubs — a subclass override must be reachable from any
+    // call site that invokes open / close on the trait.
+    class CountingExec extends OperatorExecutor {
+      var opens = 0
+      var closes = 0
+      override def open(): Unit = opens += 1
+      override def close(): Unit = closes += 1
+      override def processTuple(t: Tuple, p: Int): Iterator[TupleLike] = 
Iterator.empty
+    }
+    val exec = new CountingExec
+    exec.open()
+    exec.open()
+    exec.close()
+    assert(exec.opens == 2)
+    assert(exec.closes == 1)
+  }
+
   "OperatorExecutor.produceStateOnStart" should "default to None for any port" 
in {
     val exec = new IdentityExec
     assert(exec.produceStateOnStart(0).isEmpty)
@@ -71,6 +91,36 @@ class CoreExecutorReflectionSpec extends AnyFlatSpec {
     assert(out.head._2.isEmpty)
   }
 
+  it should "forward the input port id to the underlying processTuple 
verbatim" in {
+    // Pin: the default wrapper must hand the same `port` to the subclass
+    // override, not substitute a constant. A regression that hard-codes
+    // port = 0 would be invisible to the basic delegation test above.
+    var seenPort = -1
+    val exec = new OperatorExecutor {
+      override def processTuple(t: Tuple, p: Int): Iterator[TupleLike] = {
+        seenPort = p
+        Iterator.single(t)
+      }
+    }
+    exec.processTupleMultiPort(tuple(0), port = 9).toList
+    assert(seenPort == 9)
+  }
+
+  it should "produce as many output pairs as the underlying processTuple emits 
(zero / many fan-out)" in {
+    val empty = new OperatorExecutor {
+      override def processTuple(t: Tuple, p: Int): Iterator[TupleLike] = 
Iterator.empty
+    }
+    assert(empty.processTupleMultiPort(tuple(0), 0).isEmpty)
+
+    val fanOut = new OperatorExecutor {
+      override def processTuple(t: Tuple, p: Int): Iterator[TupleLike] =
+        Iterator(tuple(1), tuple(2), tuple(3))
+    }
+    val outs = fanOut.processTupleMultiPort(tuple(0), 0).toList
+    assert(outs.size == 3)
+    assert(outs.forall(_._2.isEmpty), "every emitted pair must have port = 
None under the default")
+  }
+
   "OperatorExecutor.produceStateOnFinish" should "default to None for any 
port" in {
     val exec = new IdentityExec
     assert(exec.produceStateOnFinish(0).isEmpty)
@@ -105,6 +155,22 @@ class CoreExecutorReflectionSpec extends AnyFlatSpec {
     assert(exec.processTupleMultiPort(tuple(99), 0).isEmpty)
   }
 
+  it should "never invoke produceTuple on the input-side path" in {
+    // Sources only emit through onFinishMultiPort. A regression in
+    // processTupleMultiPort that accidentally drained produceTuple
+    // (discarding the output) wouldn't be caught by an empty-iterator
+    // check alone — pin via a counter.
+    var producedCalls = 0
+    val src = new SourceOperatorExecutor {
+      override def produceTuple(): Iterator[TupleLike] = {
+        producedCalls += 1
+        Iterator.empty
+      }
+    }
+    src.processTupleMultiPort(tuple(0), 0).toList
+    assert(producedCalls == 0, "input-side path must not call produceTuple")
+  }
+
   "SourceOperatorExecutor.onFinishMultiPort" should "delegate to produceTuple 
with no port routing" in {
     val exec = new CountingSource
     val out = exec.onFinishMultiPort(0).toList
@@ -163,6 +229,18 @@ class CoreExecutorReflectionSpec extends AnyFlatSpec {
     }
   }
 
+  it should "propagate NoSuchMethodException when no constructor matches 
either factory branch" in {
+    // A fixture with only a `(Long)` constructor — neither the (no-arg)
+    // branch (which also tries (Int, Int) on catch) nor the (String)
+    // branch (which falls back to (String, Int, Int)) matches a single
+    // `Long` argument. Both throws propagate as NoSuchMethodException.
+    assertThrows[NoSuchMethodException] {
+      ExecFactory.newExecFromJavaClassName(
+        classOf[CoreExecutorReflectionSpec.LongArgExec].getName
+      )
+    }
+  }
+
   // 
---------------------------------------------------------------------------
   // JavaRuntimeCompilation.compileCode
   //
@@ -226,4 +304,12 @@ private object CoreExecutorReflectionSpec {
         port: Int
     ): Iterator[org.apache.texera.amber.core.tuple.TupleLike] = Iterator.empty
   }
+
+  /** Only a `(Long)` constructor — neither factory branch matches. */
+  class LongArgExec(val n: Long) extends OperatorExecutor {
+    override def processTuple(
+        tuple: org.apache.texera.amber.core.tuple.Tuple,
+        port: Int
+    ): Iterator[org.apache.texera.amber.core.tuple.TupleLike] = Iterator.empty
+  }
 }

Reply via email to