This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 316a9c2d76 test(workflow-core): add unit test coverage for
core/executor reflection (#4834)
316a9c2d76 is described below
commit 316a9c2d76d85ea0b936a54bcc24d81a3cfe79b6
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun May 3 13:11:03 2026 -0700
test(workflow-core): add unit test coverage for core/executor reflection
(#4834)
### What changes were proposed in this PR?
Add `CoreExecutorReflectionSpec` covering the four classes/traits in
`common/workflow-core/.../core/executor`:
- `OperatorExecutor` trait defaults (open/close no-ops,
produceStateOnStart/Finish None, processState passthrough,
processTupleMultiPort delegation to processTuple, onFinish empty,
onFinishMultiPort delegation to onFinish)
- `SourceOperatorExecutor` trait defaults (processTuple /
processTupleMultiPort always empty; onFinishMultiPort delegates to
produceTuple with no port routing)
- `ExecFactory.newExecFromJavaClassName` across all four
constructor-shape fallbacks (no-arg, (String), (Int, Int), (String, Int,
Int)) plus ClassNotFoundException for unknown classes
- `JavaRuntimeCompilation.compileCode` success path on a self-contained
source and diagnostic RuntimeException on syntax errors
The `newExecFromJavaCode` / `compileCode`-with-Texera-deps integration
is intentionally not covered: javac is invoked with null compilation
options, which under sbt test does not include the project classpath and
produces "package ... does not exist" errors for any source that
references workflow-core types. That failure is a deployment-environment
artifact, not a contract violation.
### Any related issues, documentation, discussions?
Closes #4833
### How was this PR tested?
`sbt "WorkflowCore/testOnly
org.apache.texera.amber.core.executor.CoreExecutorReflectionSpec"` -
18/18 tests pass.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../core/executor/CoreExecutorReflectionSpec.scala | 229 +++++++++++++++++++++
1 file changed, 229 insertions(+)
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/executor/CoreExecutorReflectionSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/executor/CoreExecutorReflectionSpec.scala
new file mode 100644
index 0000000000..6357e336c6
--- /dev/null
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/executor/CoreExecutorReflectionSpec.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.core.executor
+
+import org.apache.texera.amber.core.state.State
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema,
Tuple, TupleLike}
+import org.scalatest.flatspec.AnyFlatSpec
+
+class CoreExecutorReflectionSpec extends AnyFlatSpec {
+
+ //
---------------------------------------------------------------------------
+ // OperatorExecutor trait defaults
+ //
---------------------------------------------------------------------------
+
+ private val schema: Schema = Schema().add(new Attribute("v",
AttributeType.INTEGER))
+ private def tuple(v: Int): Tuple =
+ Tuple.builder(schema).add(schema.getAttribute("v"),
Integer.valueOf(v)).build()
+
+ /** Minimal concrete subclass — only `processTuple` is abstract. */
+ private class IdentityExec extends OperatorExecutor {
+ override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] =
+ Iterator.single(tuple)
+ }
+
+ "OperatorExecutor.open" should "default to a no-op" in {
+ val exec = new IdentityExec
+ exec.open() // should not throw
+ succeed
+ }
+
+ "OperatorExecutor.close" should "default to a no-op" in {
+ val exec = new IdentityExec
+ exec.close()
+ succeed
+ }
+
+ "OperatorExecutor.produceStateOnStart" should "default to None for any port"
in {
+ val exec = new IdentityExec
+ assert(exec.produceStateOnStart(0).isEmpty)
+ assert(exec.produceStateOnStart(7).isEmpty)
+ }
+
+ "OperatorExecutor.processState" should "default to passing the state through
unchanged" in {
+ val exec = new IdentityExec
+ val state = State(Map[String, Any]("k" -> 1))
+ assert(exec.processState(state, 0).contains(state))
+ }
+
+ "OperatorExecutor.processTupleMultiPort" should "default to delegating to
processTuple with no port routing" in {
+ val exec = new IdentityExec
+ val out = exec.processTupleMultiPort(tuple(1), 0).toList
+ assert(out.size == 1)
+ assert(out.head._1.asInstanceOf[Tuple] == tuple(1))
+ assert(out.head._2.isEmpty)
+ }
+
+ "OperatorExecutor.produceStateOnFinish" should "default to None for any
port" in {
+ val exec = new IdentityExec
+ assert(exec.produceStateOnFinish(0).isEmpty)
+ }
+
+ "OperatorExecutor.onFinish" should "default to an empty iterator" in {
+ val exec = new IdentityExec
+ assert(exec.onFinish(0).isEmpty)
+ }
+
+ "OperatorExecutor.onFinishMultiPort" should "default to delegating to
onFinish with no port routing" in {
+ val exec = new IdentityExec
+ assert(exec.onFinishMultiPort(0).isEmpty)
+ }
+
+ //
---------------------------------------------------------------------------
+ // SourceOperatorExecutor trait defaults
+ //
---------------------------------------------------------------------------
+
+ private class CountingSource extends SourceOperatorExecutor {
+ override def produceTuple(): Iterator[TupleLike] =
+ List(tuple(1), tuple(2), tuple(3)).iterator
+ }
+
+ "SourceOperatorExecutor.processTuple" should "always return an empty
iterator" in {
+ val exec = new CountingSource
+ assert(exec.processTuple(tuple(99), 0).isEmpty)
+ }
+
+ "SourceOperatorExecutor.processTupleMultiPort" should "always return an
empty iterator" in {
+ val exec = new CountingSource
+ assert(exec.processTupleMultiPort(tuple(99), 0).isEmpty)
+ }
+
+ "SourceOperatorExecutor.onFinishMultiPort" should "delegate to produceTuple
with no port routing" in {
+ val exec = new CountingSource
+ val out = exec.onFinishMultiPort(0).toList
+ assert(out.size == 3)
+ assert(out.map(_._1.asInstanceOf[Tuple]) == List(tuple(1), tuple(2),
tuple(3)))
+ assert(out.forall(_._2.isEmpty))
+ }
+
+ //
---------------------------------------------------------------------------
+ // ExecFactory.newExecFromJavaClassName
+ //
---------------------------------------------------------------------------
+
+ "ExecFactory.newExecFromJavaClassName" should "instantiate a no-arg
constructor when no descString is given" in {
+ val exec = ExecFactory.newExecFromJavaClassName(
+ classOf[CoreExecutorReflectionSpec.NoArgExec].getName
+ )
+ assert(exec.isInstanceOf[CoreExecutorReflectionSpec.NoArgExec])
+ }
+
+ it should "instantiate a (String) constructor when descString is provided"
in {
+ val exec = ExecFactory.newExecFromJavaClassName(
+ classOf[CoreExecutorReflectionSpec.StringArgExec].getName,
+ descString = "hello"
+ )
+ val typed = exec.asInstanceOf[CoreExecutorReflectionSpec.StringArgExec]
+ assert(typed.desc == "hello")
+ }
+
+ it should "fall back to (Int, Int) constructor for parallelizable executors
with no descString" in {
+ val exec = ExecFactory.newExecFromJavaClassName(
+ classOf[CoreExecutorReflectionSpec.IdxCountExec].getName,
+ idx = 3,
+ workerCount = 7
+ )
+ val typed = exec.asInstanceOf[CoreExecutorReflectionSpec.IdxCountExec]
+ assert(typed.idx == 3)
+ assert(typed.workerCount == 7)
+ }
+
+ it should "fall back to (String, Int, Int) constructor when descString is
given" in {
+ val exec = ExecFactory.newExecFromJavaClassName(
+ classOf[CoreExecutorReflectionSpec.StringIdxCountExec].getName,
+ descString = "hi",
+ idx = 1,
+ workerCount = 4
+ )
+ val typed =
exec.asInstanceOf[CoreExecutorReflectionSpec.StringIdxCountExec]
+ assert(typed.desc == "hi")
+ assert(typed.idx == 1)
+ assert(typed.workerCount == 4)
+ }
+
+ it should "raise ClassNotFoundException for unknown class names" in {
+ assertThrows[ClassNotFoundException] {
+ ExecFactory.newExecFromJavaClassName("does.not.exist.AtAll")
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // JavaRuntimeCompilation.compileCode
+ //
+ // A success-path test that compiles a real OperatorExecutor subclass from a
+ // string is intentionally omitted: `compiler.getTask(...)` is invoked with
+ // null compilation options, which means the system javac picks up its own
+ // (test) classpath rather than the project classpath. Under sbt test that
+ // does not include workflow-core itself, so the compile fails with
+ // "package org.apache.texera... does not exist" — a deployment-environment
+ // artifact rather than a contract violation. We exercise just the diagnostic
+ // path here.
+ //
---------------------------------------------------------------------------
+
+ "JavaRuntimeCompilation.compileCode" should "compile a self-contained Java
class with no external deps" in {
+ val src =
+ """public class JavaUDFOpExec {
+ | public int compute() { return 42; }
+ |}""".stripMargin
+ val cls = JavaRuntimeCompilation.compileCode(src)
+ assert(cls.getName ==
"org.apache.texera.amber.operators.udf.java.JavaUDFOpExec")
+ val instance = cls.getDeclaredConstructor().newInstance()
+ val result =
cls.getMethod("compute").invoke(instance).asInstanceOf[Integer]
+ assert(result == 42)
+ }
+
+ it should "raise RuntimeException with diagnostics when the source has
syntax errors" in {
+ val ex = intercept[RuntimeException] {
+ JavaRuntimeCompilation.compileCode("public class Garbage { not valid
java }")
+ }
+ assert(ex.getMessage.contains("Error at line"))
+ }
+}
+
+private object CoreExecutorReflectionSpec {
+ // Public so reflection inside ExecFactory can reach the no-arg constructor.
+ class NoArgExec extends OperatorExecutor {
+ override def processTuple(
+ tuple: org.apache.texera.amber.core.tuple.Tuple,
+ port: Int
+ ): Iterator[org.apache.texera.amber.core.tuple.TupleLike] = Iterator.empty
+ }
+
+ class StringArgExec(val desc: String) extends OperatorExecutor {
+ override def processTuple(
+ tuple: org.apache.texera.amber.core.tuple.Tuple,
+ port: Int
+ ): Iterator[org.apache.texera.amber.core.tuple.TupleLike] = Iterator.empty
+ }
+
+ class IdxCountExec(val idx: Int, val workerCount: Int) extends
OperatorExecutor {
+ override def processTuple(
+ tuple: org.apache.texera.amber.core.tuple.Tuple,
+ port: Int
+ ): Iterator[org.apache.texera.amber.core.tuple.TupleLike] = Iterator.empty
+ }
+
+ class StringIdxCountExec(val desc: String, val idx: Int, val workerCount:
Int)
+ extends OperatorExecutor {
+ override def processTuple(
+ tuple: org.apache.texera.amber.core.tuple.Tuple,
+ port: Int
+ ): Iterator[org.apache.texera.amber.core.tuple.TupleLike] = Iterator.empty
+ }
+}