This is an automated email from the ASF dual-hosted git repository.
Xiao-zhen-Liu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 8c077140af fix: return None from getWorkerIndex for non-worker actor
IDs (#5006)
8c077140af is described below
commit 8c077140af6535f25af0c3917a20aa49def9f29e
Author: Matthew B. <[email protected]>
AuthorDate: Wed May 13 17:01:14 2026 -0700
fix: return None from getWorkerIndex for non-worker actor IDs (#5006)
### What changes were proposed in this PR?
VirtualIdentityUtils.getWorkerIndex only matched the worker name pattern
with no fallback case, so passing a non-worker ActorVirtualIdentity
(e.g. CONTROLLER, SELF) threw scala.MatchError at runtime. This PR adds
a fallback case that returns -1 for non-worker actor IDs, matching the
graceful handling already present in the sibling methods getPhysicalOpId
and toShorterString.
### Any related issues, documentation, discussions?
Closes: #4727
### How was this PR tested?
Updated VirtualIdentityUtilsSpec the existing test that pinned the
MatchError behavior was replaced with a test asserting that
getWorkerIndex returns -1 for special actor IDs like CONTROLLER. The
existing test for the worker-name happy path still passes unchanged.
### Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.7 in compliance with ASF
---
.../architecture/common/ExecutorDeployment.scala | 8 +++++++-
.../architecture/messaginglayer/OutputManager.scala | 11 ++++++++++-
.../worker/managers/SerializationManager.scala | 8 +++++++-
.../promisehandlers/InitializeExecutorHandler.scala | 8 +++++++-
.../promisehandlers/UpdateExecutorHandler.scala | 8 +++++++-
.../texera/amber/util/VirtualIdentityUtils.scala | 7 +++++--
.../amber/util/VirtualIdentityUtilsSpec.scala | 21 +++++++++++----------
7 files changed, 54 insertions(+), 17 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala
index fbb5b99ce6..3926143287 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/ExecutorDeployment.scala
@@ -52,7 +52,13 @@ object ExecutorDeployment {
operatorConfig.workerConfigs.foreach(workerConfig => {
val workerId = workerConfig.workerId
- val workerIndex = VirtualIdentityUtils.getWorkerIndex(workerId)
+ val workerIndex = VirtualIdentityUtils
+ .getWorkerIndex(workerId)
+ .getOrElse(
+ throw new IllegalStateException(
+ s"Expected worker actor id when deploying executor, got:
${workerId.name}"
+ )
+ )
val locationPreference =
op.locationPreference.getOrElse(RoundRobinPreference)
val preferredAddress: Address = locationPreference match {
case PreferController =>
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index affbd786f9..095315590f 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -289,7 +289,16 @@ class OutputManager(
val bufferedItemWriter = DocumentFactory
.openDocument(storageUri)
._1
- .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
+ .writer(
+ VirtualIdentityUtils
+ .getWorkerIndex(actorId)
+ .getOrElse(
+ throw new IllegalStateException(
+ s"Expected worker actor id for output storage writer, got:
${actorId.name}"
+ )
+ )
+ .toString
+ )
.asInstanceOf[BufferedItemWriter[Tuple]]
val writerThread = new OutputPortResultWriterThread(bufferedItemWriter)
this.outputPortResultWriterThreads(portId) = writerThread
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/SerializationManager.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/SerializationManager.scala
index b4afe51030..3730dd91d3 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/SerializationManager.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/SerializationManager.scala
@@ -39,7 +39,13 @@ class SerializationManager(val actorId:
ActorVirtualIdentity) extends AmberLoggi
def restoreExecutorState(
chkpt: CheckpointState
): (OperatorExecutor, Iterator[(TupleLike, Option[PortIdentity])]) = {
- val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
+ val workerIdx = VirtualIdentityUtils
+ .getWorkerIndex(actorId)
+ .getOrElse(
+ throw new IllegalStateException(
+ s"Expected worker actor id when restoring executor state, got:
${actorId.name}"
+ )
+ )
val workerCount = execInitMsg.totalWorkerCount
val executor = execInitMsg.opExecInitInfo match {
case OpExecWithClassName(className, descString) =>
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
index 212a980e5e..969b466a1b 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
@@ -36,7 +36,13 @@ trait InitializeExecutorHandler {
ctx: AsyncRPCContext
): Future[EmptyReturn] = {
dp.serializationManager.setOpInitialization(req)
- val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
+ val workerIdx = VirtualIdentityUtils
+ .getWorkerIndex(actorId)
+ .getOrElse(
+ throw new IllegalStateException(
+ s"Expected worker actor id when initializing executor, got:
${actorId.name}"
+ )
+ )
cachedTotalWorkerCount = req.totalWorkerCount
setupExecutor(req.opExecInitInfo, workerIdx, cachedTotalWorkerCount)
EmptyReturn()
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
index 8ed9ebdc59..2078ff892b 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
@@ -35,7 +35,13 @@ trait UpdateExecutorHandler {
request: UpdateExecutorRequest,
ctx: AsyncRPCContext
): Future[EmptyReturn] = {
- val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
+ val workerIdx = VirtualIdentityUtils
+ .getWorkerIndex(actorId)
+ .getOrElse(
+ throw new IllegalStateException(
+ s"Expected worker actor id when updating executor, got:
${actorId.name}"
+ )
+ )
// Close the existing executor (if any) before replacing it to avoid
resource leaks.
val oldExecutor = dp.executor
if (oldExecutor != null) {
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala
index 031c4b8c7f..7c1bfafb8d 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/VirtualIdentityUtils.scala
@@ -68,10 +68,13 @@ object VirtualIdentityUtils {
}
}
- def getWorkerIndex(workerId: ActorVirtualIdentity): Int = {
+ def getWorkerIndex(workerId: ActorVirtualIdentity): Option[Int] = {
workerId.name match {
case workerNamePattern(_, _, _, idx) =>
- idx.toInt
+ Some(idx.toInt)
+ case _ =>
+ // for special actorId such as SELF, CONTROLLER
+ None
}
}
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala
index 4da024dd53..a25a0c46da 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/VirtualIdentityUtilsSpec.scala
@@ -96,19 +96,20 @@ class VirtualIdentityUtilsSpec extends AnyFlatSpec with
Matchers {
"getWorkerIndex" should "return the trailing numeric workerId from a worker
actor name" in {
val actor = ActorVirtualIdentity("Worker:WF7-myOp-main-42")
- VirtualIdentityUtils.getWorkerIndex(actor) shouldBe 42
+ VirtualIdentityUtils.getWorkerIndex(actor) shouldBe Some(42)
}
- it should "throw MatchError on non-worker actor names (current behavior)" in
{
- // getWorkerIndex pattern-matches on workerNamePattern with no fallback,
- // so passing a special ActorVirtualIdentity like CONTROLLER or SELF
- // yields scala.MatchError. Pinning this behavior here means a future
- // change that adds a fallback (or a different exception) breaks this
- // spec on purpose so the new contract is reviewed.
+ it should "return None for non-worker actor names" in {
+ // Special ActorVirtualIdentity values like CONTROLLER or SELF do not
+ // match workerNamePattern. getWorkerIndex returns None rather than
+ // throwing scala.MatchError, mirroring the graceful handling in
+ // getPhysicalOpId and toShorterString. Returning Option forces each
+ // caller to explicitly acknowledge the non-worker case rather than
+ // silently propagating a sentinel value.
val controller = ActorVirtualIdentity("CONTROLLER")
- assertThrows[scala.MatchError] {
- VirtualIdentityUtils.getWorkerIndex(controller)
- }
+ VirtualIdentityUtils.getWorkerIndex(controller) shouldBe None
+ val self = ActorVirtualIdentity("SELF")
+ VirtualIdentityUtils.getWorkerIndex(self) shouldBe None
}
// ----- toShorterString -----