This is an automated email from the ASF dual-hosted git repository.

Xiao-zhen-Liu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 8c077140af fix: return None from getWorkerIndex for non-worker actor 
IDs (#5006)
8c077140af is described below

commit 8c077140af6535f25af0c3917a20aa49def9f29e
Author: Matthew B. <[email protected]>
AuthorDate: Wed May 13 17:01:14 2026 -0700

    fix: return None from getWorkerIndex for non-worker actor IDs (#5006)
    
    ### What changes were proposed in this PR?
    
    VirtualIdentityUtils.getWorkerIndex only matched the worker name pattern
    with no fallback case, so passing a non-worker ActorVirtualIdentity
    (e.g. CONTROLLER, SELF) threw scala.MatchError at runtime. This PR adds
    a fallback case that returns -1 for non-worker actor IDs, matching the
    graceful handling already present in the sibling methods getPhysicalOpId
    and toShorterString.
    
    ### Any related issues, documentation, discussions?
    Closes: #4727
    
    ### How was this PR tested?
    
    Updated VirtualIdentityUtilsSpec the existing test that pinned the
    MatchError behavior was replaced with a test asserting that
    getWorkerIndex returns -1 for special actor IDs like CONTROLLER. The
    existing test for the worker-name happy path still passes unchanged.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    Co-authored with Claude Opus 4.7 in compliance with ASF
---
 .../architecture/common/ExecutorDeployment.scala    |  8 +++++++-
 .../architecture/messaginglayer/OutputManager.scala | 11 ++++++++++-
 .../worker/managers/SerializationManager.scala      |  8 +++++++-
 .../promisehandlers/InitializeExecutorHandler.scala |  8 +++++++-
 .../promisehandlers/UpdateExecutorHandler.scala     |  8 +++++++-
 .../texera/amber/util/VirtualIdentityUtils.scala    |  7 +++++--
 .../amber/util/VirtualIdentityUtilsSpec.scala       | 21 +++++++++++----------
 7 files changed, 54 insertions(+), 17 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala
index fbb5b99ce6..3926143287 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala
@@ -52,7 +52,13 @@ object ExecutorDeployment {
 
     operatorConfig.workerConfigs.foreach(workerConfig => {
       val workerId = workerConfig.workerId
-      val workerIndex = VirtualIdentityUtils.getWorkerIndex(workerId)
+      val workerIndex = VirtualIdentityUtils
+        .getWorkerIndex(workerId)
+        .getOrElse(
+          throw new IllegalStateException(
+            s"Expected worker actor id when deploying executor, got: 
${workerId.name}"
+          )
+        )
       val locationPreference = 
op.locationPreference.getOrElse(RoundRobinPreference)
       val preferredAddress: Address = locationPreference match {
         case PreferController =>
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index affbd786f9..095315590f 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -289,7 +289,16 @@ class OutputManager(
     val bufferedItemWriter = DocumentFactory
       .openDocument(storageUri)
       ._1
-      .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
+      .writer(
+        VirtualIdentityUtils
+          .getWorkerIndex(actorId)
+          .getOrElse(
+            throw new IllegalStateException(
+              s"Expected worker actor id for output storage writer, got: 
${actorId.name}"
+            )
+          )
+          .toString
+      )
       .asInstanceOf[BufferedItemWriter[Tuple]]
     val writerThread = new OutputPortResultWriterThread(bufferedItemWriter)
     this.outputPortResultWriterThreads(portId) = writerThread
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/SerializationManager.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/SerializationManager.scala
index b4afe51030..3730dd91d3 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/SerializationManager.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/SerializationManager.scala
@@ -39,7 +39,13 @@ class SerializationManager(val actorId: 
ActorVirtualIdentity) extends AmberLoggi
   def restoreExecutorState(
       chkpt: CheckpointState
   ): (OperatorExecutor, Iterator[(TupleLike, Option[PortIdentity])]) = {
-    val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
+    val workerIdx = VirtualIdentityUtils
+      .getWorkerIndex(actorId)
+      .getOrElse(
+        throw new IllegalStateException(
+          s"Expected worker actor id when restoring executor state, got: 
${actorId.name}"
+        )
+      )
     val workerCount = execInitMsg.totalWorkerCount
     val executor = execInitMsg.opExecInitInfo match {
       case OpExecWithClassName(className, descString) =>
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
index 212a980e5e..969b466a1b 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
@@ -36,7 +36,13 @@ trait InitializeExecutorHandler {
       ctx: AsyncRPCContext
   ): Future[EmptyReturn] = {
     dp.serializationManager.setOpInitialization(req)
-    val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
+    val workerIdx = VirtualIdentityUtils
+      .getWorkerIndex(actorId)
+      .getOrElse(
+        throw new IllegalStateException(
+          s"Expected worker actor id when initializing executor, got: 
${actorId.name}"
+        )
+      )
     cachedTotalWorkerCount = req.totalWorkerCount
     setupExecutor(req.opExecInitInfo, workerIdx, cachedTotalWorkerCount)
     EmptyReturn()
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
index 8ed9ebdc59..2078ff892b 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
@@ -35,7 +35,13 @@ trait UpdateExecutorHandler {
       request: UpdateExecutorRequest,
       ctx: AsyncRPCContext
   ): Future[EmptyReturn] = {
-    val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
+    val workerIdx = VirtualIdentityUtils
+      .getWorkerIndex(actorId)
+      .getOrElse(
+        throw new IllegalStateException(
+          s"Expected worker actor id when updating executor, got: 
${actorId.name}"
+        )
+      )
     // Close the existing executor (if any) before replacing it to avoid 
resource leaks.
     val oldExecutor = dp.executor
     if (oldExecutor != null) {
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala
index 031c4b8c7f..7c1bfafb8d 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala
@@ -68,10 +68,13 @@ object VirtualIdentityUtils {
     }
   }
 
-  def getWorkerIndex(workerId: ActorVirtualIdentity): Int = {
+  def getWorkerIndex(workerId: ActorVirtualIdentity): Option[Int] = {
     workerId.name match {
       case workerNamePattern(_, _, _, idx) =>
-        idx.toInt
+        Some(idx.toInt)
+      case _ =>
+        // for special actorId such as SELF, CONTROLLER
+        None
     }
   }
 
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala
index 4da024dd53..a25a0c46da 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala
@@ -96,19 +96,20 @@ class VirtualIdentityUtilsSpec extends AnyFlatSpec with 
Matchers {
 
   "getWorkerIndex" should "return the trailing numeric workerId from a worker 
actor name" in {
     val actor = ActorVirtualIdentity("Worker:WF7-myOp-main-42")
-    VirtualIdentityUtils.getWorkerIndex(actor) shouldBe 42
+    VirtualIdentityUtils.getWorkerIndex(actor) shouldBe Some(42)
   }
 
-  it should "throw MatchError on non-worker actor names (current behavior)" in 
{
-    // getWorkerIndex pattern-matches on workerNamePattern with no fallback,
-    // so passing a special ActorVirtualIdentity like CONTROLLER or SELF
-    // yields scala.MatchError. Pinning this behavior here means a future
-    // change that adds a fallback (or a different exception) breaks this
-    // spec on purpose so the new contract is reviewed.
+  it should "return None for non-worker actor names" in {
+    // Special ActorVirtualIdentity values like CONTROLLER or SELF do not
+    // match workerNamePattern. getWorkerIndex returns None rather than
+    // throwing scala.MatchError, mirroring the graceful handling in
+    // getPhysicalOpId and toShorterString. Returning Option forces each
+    // caller to explicitly acknowledge the non-worker case rather than
+    // silently propagating a sentinel value.
     val controller = ActorVirtualIdentity("CONTROLLER")
-    assertThrows[scala.MatchError] {
-      VirtualIdentityUtils.getWorkerIndex(controller)
-    }
+    VirtualIdentityUtils.getWorkerIndex(controller) shouldBe None
+    val self = ActorVirtualIdentity("SELF")
+    VirtualIdentityUtils.getWorkerIndex(self) shouldBe None
   }
 
   // ----- toShorterString -----

Reply via email to