This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch 
revert-4531-fix/reconfiguration-rpc-wiring
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 7ad2b600ab6ec974ec17edb2acc712c747e090d6
Author: Yicong Huang <[email protected]>
AuthorDate: Mon Apr 27 23:10:52 2026 -0700

    Revert "fix(amber): wire ExecutionReconfigurationService back to the engine 
(…"
    
    This reverts commit 23901be24b23191496cd6c8110380a74eaf61de3.
---
 .../core/architecture/rpc/async_rpc_server.py      |  21 +--
 .../promisehandlers/ReconfigurationHandler.scala   |  47 +++----
 .../engine/architecture/worker/DataProcessor.scala |  11 +-
 .../service/ExecutionReconfigurationService.scala  | 136 ++++++++-----------
 .../amber/engine/e2e/ReconfigurationSpec.scala     |  18 +--
 .../ExecutionReconfigurationServiceSpec.scala      | 149 ---------------------
 6 files changed, 83 insertions(+), 299 deletions(-)

diff --git a/amber/src/main/python/core/architecture/rpc/async_rpc_server.py 
b/amber/src/main/python/core/architecture/rpc/async_rpc_server.py
index 49dc5f0547..d776307030 100644
--- a/amber/src/main/python/core/architecture/rpc/async_rpc_server.py
+++ b/amber/src/main/python/core/architecture/rpc/async_rpc_server.py
@@ -121,23 +121,10 @@ class AsyncRPCServer:
         if self._no_reply_needed(control_invocation.command_id):
             return
 
-        # Reply to the actor that originated this ControlInvocation, identified
-        # by control_invocation.context.sender. For a normal RPC over a
-        # control channel this matches `from_.from_worker_id`; for an
-        # invocation carried in-band by an ECM along a data channel, `from_`
-        # is the data channel between two workers and the original sender
-        # lives only in the invocation's context.
-        # When the context is unset (e.g. unit-test inputs that construct
-        # ControlInvocation directly), fall back to swapping `from_`.
-        ctx = control_invocation.context
-        if ctx.sender.name and ctx.receiver.name:
-            target_channel_id = ChannelIdentity(
-                ctx.receiver, ctx.sender, is_control=True
-            )
-        else:
-            target_channel_id = ChannelIdentity(
-                from_.to_worker_id, from_.from_worker_id, is_control=True
-            )
+        # Reply to the sender.
+        target_channel_id = ChannelIdentity(
+            from_.to_worker_id, from_.from_worker_id, True
+        )
         logger.debug(
             f"PYTHON returns a ReturnInvocation {payload}, replying the 
command"
             f" {command}"
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
index 7653f873c1..210d7c5b98 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
@@ -21,21 +21,16 @@ package 
org.apache.texera.amber.engine.architecture.controller.promisehandlers
 
 import com.twitter.util.Future
 import org.apache.texera.amber.core.virtualidentity.{
-  ActorVirtualIdentity,
   ChannelIdentity,
   EmbeddedControlMessageIdentity
 }
-import org.apache.texera.amber.engine.architecture.controller.{
-  ControllerAsyncRPCHandlerInitializer,
-  UpdateExecutorCompleted
-}
+import 
org.apache.texera.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer
 import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.ALL_ALIGNMENT
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
   AsyncRPCContext,
-  ControlInvocation,
   WorkflowReconfigureRequest
 }
-import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.{ControlReturn, 
EmptyReturn}
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
 import org.apache.texera.amber.engine.common.FriesReconfigurationAlgorithm
 import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
 import org.apache.texera.amber.util.VirtualIdentityUtils
@@ -69,12 +64,7 @@ trait ReconfigurationHandler {
           .getLatestOperatorExecution(updateExecutorRequest.targetOpId)
           .getWorkerIds
         workerIds.foreach { worker =>
-          futures.append(
-            notifyOnComplete(
-              workerInterface.updateExecutor(updateExecutorRequest, 
mkContext(worker)),
-              worker
-            )
-          )
+          futures.append(workerInterface.updateExecutor(updateExecutorRequest, 
mkContext(worker)))
         }
       } else {
         val channelScope = cp.workflowExecution.getRunningRegionExecutions
@@ -98,21 +88,20 @@ trait ReconfigurationHandler {
           }
         }
         val finalScope = channelScope ++ controlChannels
-        val workerCommands: Seq[(ActorVirtualIdentity, ControlInvocation, 
Future[ControlReturn])] =
+        val cmdMapping =
           friesComponent.reconfigurations.flatMap { updateReq =>
             val workers =
               
cp.workflowExecution.getLatestOperatorExecution(updateReq.targetOpId).getWorkerIds
-            workers.map { worker =>
-              val (invocation, future) =
-                createInvocation(METHOD_UPDATE_EXECUTOR.getBareMethodName, 
updateReq, worker)
-              (worker, invocation, future)
-            }
-          }.toSeq
-        val cmdMapping: Map[String, ControlInvocation] = workerCommands.map {
-          case (worker, invocation, _) => worker.name -> invocation
-        }.toMap
-        futures ++= workerCommands.map {
-          case (worker, _, future) => notifyOnComplete(future, worker)
+            workers.map(worker =>
+              worker.name -> createInvocation(
+                METHOD_UPDATE_EXECUTOR.getBareMethodName,
+                updateReq,
+                worker
+              )
+            )
+          }.toMap
+        futures += cmdMapping.map {
+          case (_, (_, singleWorkerUpdateFuture)) => singleWorkerUpdateFuture
         }
         friesComponent.sources.foreach { source =>
           
cp.workflowExecution.getLatestOperatorExecution(source).getWorkerIds.foreach { 
worker =>
@@ -120,7 +109,7 @@ trait ReconfigurationHandler {
               EmbeddedControlMessageIdentity(msg.reconfigurationId),
               ALL_ALIGNMENT,
               finalScope.toSet,
-              cmdMapping,
+              cmdMapping.map(x => (x._1, x._2._1)),
               ChannelIdentity(actorId, worker, isControl = true)
             )
           }
@@ -132,10 +121,4 @@ trait ReconfigurationHandler {
     }
   }
 
-  // After a worker's updateExecutor completes, notify the client so the
-  // ExecutionReconfigurationService can advance completedReconfigurations
-  // and emit ModifyLogicCompletedEvent on the websocket.
-  private def notifyOnComplete[T](future: Future[T], worker: 
ActorVirtualIdentity): Future[T] =
-    future.onSuccess(_ => sendToClient(UpdateExecutorCompleted(worker)))
-
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
index 84f1e8ec65..3aa5fa90a4 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
@@ -242,16 +242,7 @@ class DataProcessor(
       // invoke the control command carried with the ECM
       logger.info(s"process ECM from $channelId, id = ${ecm.id}, cmd = 
$command")
       if (command.isDefined) {
-        // The reply must go back to the actor that originated the invocation
-        // (recorded in command.context.sender), not to channelId.fromWorkerId.
-        // For ECM-embedded commands those differ: channelId is the data
-        // channel between two workers, while the originator is typically the
-        // controller. Fall back to the channel sender when the context is
-        // unset (e.g. unit-test inputs).
-        val ctx = command.get.context
-        val replyTo =
-          if (ctx.sender.name.nonEmpty) ctx.sender else channelId.fromWorkerId
-        asyncRPCServer.receive(command.get, replyTo)
+        asyncRPCServer.receive(command.get, channelId.fromWorkerId)
       }
       // if this worker is not the final destination of the ECM, pass it 
downstream
       val downstreamChannelsInScope = ecm.scope.filter(_.fromWorkerId == 
actorId).toSet
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala
 
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala
index e7617fdfe1..e5867277fc 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/service/ExecutionReconfigurationService.scala
@@ -19,12 +19,7 @@
 
 package org.apache.texera.web.service
 
-import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
-import 
org.apache.texera.amber.engine.architecture.controller.{UpdateExecutorCompleted,
 Workflow}
-import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
-  UpdateExecutorRequest,
-  WorkflowReconfigureRequest
-}
+import org.apache.texera.amber.engine.architecture.controller.Workflow
 import org.apache.texera.amber.engine.common.client.AmberClient
 import org.apache.texera.web.SubscriptionManager
 import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent
@@ -33,9 +28,8 @@ import org.apache.texera.web.model.websocket.response.{
   ModifyLogicCompletedEvent,
   ModifyLogicResponse
 }
-import org.apache.texera.web.storage.{ExecutionReconfigurationStore, 
ExecutionStateStore}
+import org.apache.texera.web.storage.ExecutionStateStore
 
-import java.util.UUID
 import scala.util.{Failure, Success}
 
 class ExecutionReconfigurationService(
@@ -45,11 +39,34 @@ class ExecutionReconfigurationService(
 ) extends SubscriptionManager {
 
   // monitors notification from the engine that a reconfiguration on a worker 
is completed
-  registerWorkerCompletionCallback()
+  //  client.registerCallback[UpdateExecutorCompleted]((evt: 
UpdateExecutorCompleted) => {
+  //    stateStore.reconfigurationStore.updateState(old => {
+  //      old.copy(completedReconfigurations = old.completedReconfigurations + 
evt.id)
+  //    })
+  //  })
 
   // monitors the reconfiguration state (completed workers) change,
   // notifies the frontend when all workers of an operator complete 
reconfiguration
-  registerCompletionDiffHandler()
+  addSubscription(
+    stateStore.reconfigurationStore.registerDiffHandler((oldState, newState) 
=> {
+      if (
+        oldState.completedReconfigurations != 
newState.completedReconfigurations
+        && oldState.currentReconfigId == newState.currentReconfigId
+      ) {
+        val diff = newState.completedReconfigurations -- 
oldState.completedReconfigurations
+        val newlyCompletedOps = diff
+          .map(workerId => 
workflow.physicalPlan.getPhysicalOpByWorkerId(workerId).id)
+          .map(opId => opId.logicalOpId.id)
+        if (newlyCompletedOps.nonEmpty) {
+          List(ModifyLogicCompletedEvent(newlyCompletedOps.toList))
+        } else {
+          List()
+        }
+      } else {
+        List()
+      }
+    })
+  )
 
   // handles reconfigure workflow logic from frontend
   // validate the modify logic request and notifies the frontend
@@ -79,77 +96,42 @@ class ExecutionReconfigurationService(
 
   // actually performs all reconfiguration requests the user made during pause
   // sends ModifyLogic messages to operators and workers,
-  // see the Fries reconfiguration paper for the algorithm.
-  // Note: StateTransferFunc is currently not threaded through to the engine —
-  // the new UpdateExecutorRequest only carries (targetOpId, 
newOpExecInitInfo).
+  // there are two modes: transactional or non-transactional
+  // in the transactional mode, reconfigurations on multiple operators will be 
synchronized
+  // in the non-transaction mode, they are not synchronized, this is faster, 
but can lead to consistency issues
+  // for details, see the Fries reconfiguration paper
   def performReconfigurationOnResume(): Unit = {
     val reconfigurations = 
stateStore.reconfigurationStore.getState.unscheduledReconfigurations
     if (reconfigurations.isEmpty) {
       return
     }
-
-    val reconfigurationId = UUID.randomUUID().toString
-    val updateExecutorRequests = reconfigurations.map {
-      case (op, _) => UpdateExecutorRequest(op.id, op.opExecInitInfo)
-    }
-    dispatch(
-      WorkflowReconfigureRequest(
-        reconfiguration = updateExecutorRequests,
-        reconfigurationId = reconfigurationId
-      )
-    )
-
-    // clear all un-scheduled reconfigurations, start a new reconfiguration ID
-    stateStore.reconfigurationStore.updateState(_ =>
-      ExecutionReconfigurationStore(currentReconfigId = 
Some(reconfigurationId))
-    )
-  }
-
-  // Seam for unit testing the dispatch path without spinning up an 
AmberClient.
-  protected def dispatch(request: WorkflowReconfigureRequest): Unit = {
-    client.controllerInterface.reconfigureWorkflow(request, ())
-  }
-
-  // Seam for unit testing — production wires the engine's 
UpdateExecutorCompleted
-  // events into the reconfiguration store so the diff handler above can fire
-  // ModifyLogicCompletedEvent for the frontend.
-  protected def registerWorkerCompletionCallback(): Unit = {
-    client.registerCallback[UpdateExecutorCompleted]((evt: 
UpdateExecutorCompleted) => {
-      onWorkerReconfigured(evt.id)
-    })
-  }
-
-  // Exposed (instead of inlined in the callback) so tests can drive the
-  // completion path directly.
-  private[service] def onWorkerReconfigured(worker: ActorVirtualIdentity): 
Unit = {
-    stateStore.reconfigurationStore.updateState(old =>
-      old.copy(completedReconfigurations = old.completedReconfigurations + 
worker)
-    )
-  }
-
-  // Seam for unit testing — the diff handler dereferences 
workflow.physicalPlan
-  // to map worker → logical op, which makes constructing a service in tests
-  // require a full Workflow. Tests override to no-op.
-  protected def registerCompletionDiffHandler(): Unit = {
-    addSubscription(
-      stateStore.reconfigurationStore.registerDiffHandler((oldState, newState) 
=> {
-        if (
-          oldState.completedReconfigurations != 
newState.completedReconfigurations
-          && oldState.currentReconfigId == newState.currentReconfigId
-        ) {
-          val diff = newState.completedReconfigurations -- 
oldState.completedReconfigurations
-          val newlyCompletedOps = diff
-            .map(workerId => 
workflow.physicalPlan.getPhysicalOpByWorkerId(workerId).id)
-            .map(opId => opId.logicalOpId.id)
-          if (newlyCompletedOps.nonEmpty) {
-            List(ModifyLogicCompletedEvent(newlyCompletedOps.toList))
-          } else {
-            List()
-          }
-        } else {
-          List()
-        }
-      })
-    )
+    throw new RuntimeException("reconfiguration is tentatively disabled.")
+    //    // schedule all pending reconfigurations to the engine
+    //    val reconfigurationId = UUID.randomUUID().toString
+    //    val modifyLogicReq = AmberModifyLogicRequest(reconfigurations.map {
+    //      case (op, stateTransferFunc) =>
+    //        val bytes = AmberRuntime.serde.serialize(op.opExecInitInfo).get
+    //        val protoAny = Any.of(
+    //          
"org.apache.texera.amber.engine.architecture.deploysemantics.layer.OpExecInitInfo",
+    //          ByteString.copyFrom(bytes)
+    //        )
+    //        val stateTransferFuncOpt = stateTransferFunc.map { func =>
+    //          val bytes = AmberRuntime.serde.serialize(func).get
+    //          Any.of(
+    //            
"org.apache.texera.workflow.common.operators.StateTransferFunc",
+    //            ByteString.copyFrom(bytes)
+    //          )
+    //        }
+    //        UpdateExecutorRequest(op.id, protoAny, stateTransferFuncOpt)
+    //    })
+    //    client.controllerInterface.reconfigureWorkflow(
+    //      WorkflowReconfigureRequest(modifyLogicReq, reconfigurationId),
+    //      ()
+    //    )
+    //
+    //    // clear all un-scheduled reconfigurations, start a new 
reconfiguration ID
+    //    stateStore.reconfigurationStore.updateState(_ =>
+    //      ExecutionReconfigurationStore(Some(reconfigurationId))
+    //    )
   }
 }
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
index 92dfba19de..6f344caae3 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
@@ -150,15 +150,9 @@ class ReconfigurationSpec
           completion.setDone()
         }
       })
-    Await.result(
-      client.controllerInterface.startWorkflow(EmptyRequest(), ()),
-      Duration.fromSeconds(5)
-    )
+    Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
     val pausedReached = stateReached(client, PAUSED)
-    Await.result(
-      client.controllerInterface.pauseWorkflow(EmptyRequest(), ()),
-      Duration.fromSeconds(5)
-    )
+    Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ()))
     Await.result(pausedReached, Duration.fromSeconds(10))
     val physicalOps = targetOps.flatMap(op =>
       workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier)
@@ -170,13 +164,9 @@ class ReconfigurationSpec
           reconfigurationId = "test-reconfigure-1"
         ),
         ()
-      ),
-      Duration.fromSeconds(5)
-    )
-    Await.result(
-      client.controllerInterface.resumeWorkflow(EmptyRequest(), ()),
-      Duration.fromSeconds(5)
+      )
     )
+    Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ()))
     Await.result(completion, Duration.fromMinutes(1))
     result
   }
diff --git 
a/amber/src/test/scala/org/apache/texera/web/service/ExecutionReconfigurationServiceSpec.scala
 
b/amber/src/test/scala/org/apache/texera/web/service/ExecutionReconfigurationServiceSpec.scala
deleted file mode 100644
index 974db13286..0000000000
--- 
a/amber/src/test/scala/org/apache/texera/web/service/ExecutionReconfigurationServiceSpec.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.texera.web.service
-
-import org.apache.texera.amber.core.executor.OpExecWithClassName
-import org.apache.texera.amber.core.virtualidentity.{
-  ActorVirtualIdentity,
-  ExecutionIdentity,
-  OperatorIdentity,
-  PhysicalOpIdentity,
-  WorkflowIdentity
-}
-import org.apache.texera.amber.core.workflow.PhysicalOp
-import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.WorkflowReconfigureRequest
-import org.apache.texera.web.storage.{ExecutionReconfigurationStore, 
ExecutionStateStore}
-import org.scalatest.flatspec.AnyFlatSpec
-import org.scalatest.matchers.should.Matchers
-
-import scala.collection.mutable.ArrayBuffer
-
-/**
-  * Web-service-layer tests for ExecutionReconfigurationService.
-  *
-  * The end-to-end engine path (reconfigureWorkflow → Fries algorithm →
-  * UpdateExecutor on workers) is covered by ReconfigurationSpec.
-  * This spec focuses on the wiring inside performReconfigurationOnResume:
-  * empty short-circuit, request construction, and store reset semantics.
-  */
-class ExecutionReconfigurationServiceSpec extends AnyFlatSpec with Matchers {
-
-  private def mkPhysicalOp(name: String): PhysicalOp =
-    PhysicalOp(
-      id = PhysicalOpIdentity(OperatorIdentity(name), "main"),
-      workflowId = WorkflowIdentity(0L),
-      executionId = ExecutionIdentity(0L),
-      opExecInitInfo = OpExecWithClassName(s"$name.Class", "")
-    )
-
-  /** Service variant that records dispatched requests and skips the 
AmberClient
-    * registration / workflow-dependent diff handler so it can be constructed
-    * without a live engine.
-    */
-  private class RecordingService(stateStore: ExecutionStateStore)
-      extends ExecutionReconfigurationService(client = null, stateStore, 
workflow = null) {
-    val captured: ArrayBuffer[WorkflowReconfigureRequest] = ArrayBuffer.empty
-    override protected def dispatch(request: WorkflowReconfigureRequest): Unit 
=
-      captured += request
-    override protected def registerWorkerCompletionCallback(): Unit = ()
-    override protected def registerCompletionDiffHandler(): Unit = ()
-  }
-
-  "performReconfigurationOnResume" should
-    "return without dispatching when no reconfigurations are pending" in {
-    val stateStore = new ExecutionStateStore()
-    val service = new RecordingService(stateStore)
-
-    noException should be thrownBy service.performReconfigurationOnResume()
-
-    service.captured shouldBe empty
-    val state = stateStore.reconfigurationStore.getState
-    state.unscheduledReconfigurations shouldBe empty
-    state.currentReconfigId shouldBe None
-    state.completedReconfigurations shouldBe empty
-  }
-
-  it should "dispatch one request carrying every pending reconfiguration and 
reset the store" in {
-    val stateStore = new ExecutionStateStore()
-    val service = new RecordingService(stateStore)
-
-    val op1 = mkPhysicalOp("op-1")
-    val op2 = mkPhysicalOp("op-2")
-    stateStore.reconfigurationStore.updateState(_ =>
-      ExecutionReconfigurationStore(unscheduledReconfigurations = List((op1, 
None), (op2, None)))
-    )
-
-    service.performReconfigurationOnResume()
-
-    service.captured should have size 1
-    val request = service.captured.head
-    request.reconfigurationId should not be empty
-    request.reconfiguration.map(_.targetOpId) should contain 
theSameElementsInOrderAs Seq(
-      op1.id,
-      op2.id
-    )
-    request.reconfiguration.map(_.newExecInitInfo) should contain 
theSameElementsInOrderAs Seq(
-      op1.opExecInitInfo,
-      op2.opExecInitInfo
-    )
-
-    val state = stateStore.reconfigurationStore.getState
-    state.unscheduledReconfigurations shouldBe empty
-    state.currentReconfigId shouldBe Some(request.reconfigurationId)
-    state.completedReconfigurations shouldBe empty
-  }
-
-  it should "use a fresh reconfigurationId on each dispatch" in {
-    val stateStore = new ExecutionStateStore()
-    val service = new RecordingService(stateStore)
-
-    def queueAndDispatch(opName: String): String = {
-      stateStore.reconfigurationStore.updateState(old =>
-        old.copy(unscheduledReconfigurations = List((mkPhysicalOp(opName), 
None)))
-      )
-      service.performReconfigurationOnResume()
-      service.captured.last.reconfigurationId
-    }
-
-    val firstId = queueAndDispatch("op-a")
-    val secondId = queueAndDispatch("op-b")
-
-    firstId should not be secondId
-    stateStore.reconfigurationStore.getState.currentReconfigId shouldBe 
Some(secondId)
-  }
-
-  "onWorkerReconfigured" should
-    "add the worker id to completedReconfigurations so the diff handler can 
fire" in {
-    val stateStore = new ExecutionStateStore()
-    val service = new RecordingService(stateStore)
-
-    val w1 = ActorVirtualIdentity("Worker:WF1-E1-op-main-0")
-    val w2 = ActorVirtualIdentity("Worker:WF1-E1-op-main-1")
-    service.onWorkerReconfigured(w1)
-    service.onWorkerReconfigured(w2)
-    // duplicate completion is idempotent (Set semantics).
-    service.onWorkerReconfigured(w1)
-
-    stateStore.reconfigurationStore.getState.completedReconfigurations should 
contain theSameElementsAs Set(
-      w1,
-      w2
-    )
-  }
-}

Reply via email to