This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new e635dd027d feat(amber): Re-enable operator reconfiguration in Amber 
(#4220)
e635dd027d is described below

commit e635dd027d2f5e3bb29639d3b61941beaf6820c4
Author: Shengquan Ni <[email protected]>
AuthorDate: Sat Apr 25 16:30:19 2026 -0700

    feat(amber): Re-enable operator reconfiguration in Amber (#4220)
    
    ### What changes were proposed in this PR?
    Per the discussion in https://github.com/apache/texera/discussions/4016,
    we decided to bring the operator reconfiguration feature back to the
    Amber engine. This PR includes only the backend changes for this
    feature, but it is enabled on both the Java and Python sides.
    
    Since the code for the Fries Algorithm is still in the codebase, this
    feature is relatively straightforward to implement and maintain going
    forward.
    
    This PR allows source operators to be included in the reconfiguration
    scope (MCS), but it does not allow source operators themselves to be
    modified. First, under the current iterator-based interface, the state
    of a source operator is fully encapsulated within its iterator. Reading
    or manipulating the iterator state is already very difficult in both
    Scala and Python. Second, even if we could access the state, it would
    still be hard for users to clearly define the expected state transition
    semantics—e.g., whether to preserve the old state, reset it, or
    partially transfer it to the new operator.
    
    Due to the reasons above, we disable reconfiguration of source operators
    for now. If clear use cases emerge in the future, we can revisit this
    design decision.
    
    
     ### Any related issues, documentation, discussions?
     See https://github.com/apache/texera/discussions/4016.
    
     ### How was this PR tested?
    Introduced unit tests for this feature.
    
    This PR also updates scala CI to install python dependencies as we are
    using Python UDFs in our e2e tests.
    
     ### Was this PR authored or co-authored using generative AI tooling?
    No
    
    ---------
    
    Signed-off-by: Yicong Huang <[email protected]>
    Signed-off-by: Shengquan Ni <[email protected]>
---
 .github/workflows/github-action-build.yml          |   5 +
 .../engine/architecture/rpc/controlcommands.proto  |  22 +-
 .../architecture/rpc/controllerservice.proto       |   1 +
 .../engine/architecture/rpc/workerservice.proto    |   1 +
 .../handlers/control/update_executor_handler.py    |  27 +-
 .../rpc/async_rpc_handler_initializer.py           |   4 +
 amber/src/main/python/core/runnables/main_loop.py  |   8 +-
 .../main/python/core/runnables/network_receiver.py |   5 +-
 .../amber/engine/architecture/rpc/__init__.py      |  99 +++++--
 .../ControllerAsyncRPCHandlerInitializer.scala     |   3 +-
 .../promisehandlers/ReconfigurationHandler.scala   | 124 ++++++++
 .../DataProcessorRPCHandlerInitializer.scala       |  28 +-
 .../InitializeExecutorHandler.scala                |  18 +-
 ...orHandler.scala => UpdateExecutorHandler.scala} |  29 +-
 .../common}/FriesReconfigurationAlgorithm.scala    |  55 ++--
 .../amber/engine/e2e/ReconfigurationSpec.scala     | 312 +++++++++++++++++++++
 .../texera/amber/operator/TestOperators.scala      |  20 ++
 17 files changed, 646 insertions(+), 115 deletions(-)

diff --git a/.github/workflows/github-action-build.yml 
b/.github/workflows/github-action-build.yml
index 1032363bd7..35bd5a5dbf 100644
--- a/.github/workflows/github-action-build.yml
+++ b/.github/workflows/github-action-build.yml
@@ -117,6 +117,11 @@ jobs:
           python-version: '3.11'
       - name: Show Python
         run: python --version || python3 --version
+      - name: Install dependencies
+        run: |
+          python -m pip install --upgrade pip
+          if [ -f amber/requirements.txt ]; then pip install -r 
amber/requirements.txt; fi
+          if [ -f amber/operator-requirements.txt ]; then pip install -r 
amber/operator-requirements.txt; fi
       - name: Setup sbt launcher
         uses: sbt/setup-sbt@508b753e53cb6095967669e0911487d2b9bc9f41 # v1.1.22
       - uses: coursier/cache-action@4e2615869d13561d626ed48655e1a39e5b192b3c # 
v6.4.9
diff --git 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
index d714f64a15..b22c1bdf7c 100644
--- 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
+++ 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
@@ -25,7 +25,6 @@ import 
"org/apache/texera/amber/engine/architecture/worker/statistics.proto";
 import 
"org/apache/texera/amber/engine/architecture/sendsemantics/partitionings.proto";
 import "scalapb/scalapb.proto";
 import "google/protobuf/timestamp.proto";
-import "google/protobuf/any.proto";
 
 option (scalapb.options) = {
   scope: FILE,
@@ -40,12 +39,12 @@ message ControlRequest {
     TakeGlobalCheckpointRequest takeGlobalCheckpointRequest = 2;
     DebugCommandRequest debugCommandRequest = 3;
     EvaluatePythonExpressionRequest evaluatePythonExpressionRequest = 4;
-    ModifyLogicRequest modifyLogicRequest = 5;
-    RetryWorkflowRequest retryWorkflowRequest = 6;
-    ConsoleMessageTriggeredRequest consoleMessageTriggeredRequest = 8;
-    PortCompletedRequest portCompletedRequest = 9;
-    WorkerStateUpdatedRequest workerStateUpdatedRequest = 10;
-    LinkWorkersRequest linkWorkersRequest = 11;
+    RetryWorkflowRequest retryWorkflowRequest = 5;
+    ConsoleMessageTriggeredRequest consoleMessageTriggeredRequest = 6;
+    PortCompletedRequest portCompletedRequest = 7;
+    WorkerStateUpdatedRequest workerStateUpdatedRequest = 8;
+    LinkWorkersRequest linkWorkersRequest = 9;
+    WorkflowReconfigureRequest workflowReconfigureRequest = 10;
 
     // request for worker
     AddInputChannelRequest addInputChannelRequest = 50;
@@ -119,7 +118,7 @@ message TakeGlobalCheckpointRequest {
 }
 
 message WorkflowReconfigureRequest{
-  ModifyLogicRequest reconfiguration = 1 [(scalapb.field).no_box = true];
+  repeated UpdateExecutorRequest reconfiguration = 1;
   string reconfigurationId = 2;
 }
 
@@ -134,10 +133,6 @@ message EvaluatePythonExpressionRequest {
   string operatorId = 2;
 }
 
-message ModifyLogicRequest {
-  repeated UpdateExecutorRequest updateRequest = 1;
-}
-
 message RetryWorkflowRequest {
   repeated core.ActorVirtualIdentity workers = 1;
 }
@@ -260,8 +255,7 @@ message InitializeExecutorRequest {
 
 message UpdateExecutorRequest {
   core.PhysicalOpIdentity targetOpId = 1 [(scalapb.field).no_box = true];
-  google.protobuf.Any newExecutor = 2 [(scalapb.field).no_box = true];
-  google.protobuf.Any stateTransferFunc = 3;
+  core.OpExecInitInfo newExecInitInfo = 2;
 }
 
 message PrepareCheckpointRequest{
diff --git 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
index 70d189a341..27b4727ee9 100644
--- 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
+++ 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
@@ -45,4 +45,5 @@ service ControllerService {
   rpc LinkWorkers(LinkWorkersRequest) returns (EmptyReturn);
   rpc ControllerInitiateQueryStatistics(QueryStatisticsRequest) returns 
(EmptyReturn);
   rpc RetryWorkflow(RetryWorkflowRequest) returns (EmptyReturn);
+  rpc ReconfigureWorkflow(WorkflowReconfigureRequest) returns (EmptyReturn);
 }
\ No newline at end of file
diff --git 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto
 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto
index dbcd6d8a5e..21944ffefc 100644
--- 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto
+++ 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto
@@ -50,4 +50,5 @@ service WorkerService {
   rpc DebugCommand(DebugCommandRequest) returns (EmptyReturn);
   rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns 
(EvaluatedValue);
   rpc NoOperation(EmptyRequest) returns (EmptyReturn);
+  rpc UpdateExecutor(UpdateExecutorRequest) returns (EmptyReturn);
 }
\ No newline at end of file
diff --git 
a/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
 
b/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
index fe33b2dec0..f2b6d16d46 100644
--- 
a/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
+++ 
b/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py
@@ -15,14 +15,19 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# from proto.org.apache.texera.amber.engine.architecture.worker import 
UpdateExecutorV2
-# from core.architecture.handlers.control.control_handler_base import 
ControlHandler
-# from core.architecture.managers.context import Context
-#
-#
-# class UpdateExecutorHandler(ControlHandler):
-#     cmd = UpdateExecutorV2
-#
-#     def __call__(self, context: Context, command: cmd, *args, **kwargs):
-#         context.executor_manager.update_executor(command.code, 
command.is_source)
-#         return None
+from core.architecture.handlers.control.control_handler_base import 
ControlHandler
+from proto.org.apache.texera.amber.engine.architecture.rpc import (
+    EmptyReturn,
+    UpdateExecutorRequest,
+)
+from core.util import get_one_of
+from proto.org.apache.texera.amber.core import OpExecWithCode
+
+
+class UpdateExecutorHandler(ControlHandler):
+    async def update_executor(self, req: UpdateExecutorRequest) -> EmptyReturn:
+        op_exec_with_code: OpExecWithCode = get_one_of(req.new_exec_init_info)
+        self.context.executor_manager.update_executor(
+            op_exec_with_code.code, 
self.context.executor_manager.executor.is_source
+        )
+        return EmptyReturn()
diff --git 
a/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py 
b/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py
index c2574028a1..146cf91b0d 100644
--- 
a/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py
+++ 
b/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py
@@ -45,6 +45,9 @@ from 
core.architecture.handlers.control.replay_current_tuple_handler import (
 from core.architecture.handlers.control.resume_worker_handler import 
ResumeWorkerHandler
 from core.architecture.handlers.control.start_channel_handler import 
StartChannelHandler
 from core.architecture.handlers.control.start_worker_handler import 
StartWorkerHandler
+from core.architecture.handlers.control.update_executor_handler import (
+    UpdateExecutorHandler,
+)
 
 
 class AsyncRPCHandlerInitializer(
@@ -64,5 +67,6 @@ class AsyncRPCHandlerInitializer(
     StartChannelHandler,
     EndChannelHandler,
     NoOperationHandler,
+    UpdateExecutorHandler,
 ):
     pass
diff --git a/amber/src/main/python/core/runnables/main_loop.py 
b/amber/src/main/python/core/runnables/main_loop.py
index d73c655734..794224c97f 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -118,7 +118,13 @@ class MainLoop(StoppableQueueBlockingRunnable):
             or not self._input_queue.is_data_enabled()
         ):
             next_entry = self.interruptible_get()
-            self._process_dcm(next_entry)
+            match(
+                next_entry,
+                DCMElement,
+                self._process_dcm,
+                ECMElement,
+                self._process_ecm,
+            )
 
     @overrides
     def pre_start(self) -> None:
diff --git a/amber/src/main/python/core/runnables/network_receiver.py 
b/amber/src/main/python/core/runnables/network_receiver.py
index fd42a8f589..5ab857c1c3 100644
--- a/amber/src/main/python/core/runnables/network_receiver.py
+++ b/amber/src/main/python/core/runnables/network_receiver.py
@@ -90,7 +90,7 @@ class NetworkReceiver(Runnable, Stoppable):
             # Explicitly set is_control to trigger lazy computation.
             # If not set, it may be computed at different times,
             # causing hash inconsistencies.
-            data_header.tag.is_control = False
+            data_header.tag.is_control = bool(data_header.tag.is_control)
             payload = match(
                 data_header.payload_type,
                 "Data",
@@ -102,8 +102,7 @@ class NetworkReceiver(Runnable, Stoppable):
             )
             if isinstance(payload, EmbeddedControlMessage):
                 for channel_id in payload.scope:
-                    if not channel_id.is_control:
-                        channel_id.is_control = False
+                    channel_id.is_control = bool(channel_id.is_control)
                 shared_queue.put(ECMElement(tag=data_header.tag, 
payload=payload))
             else:
                 shared_queue.put(DataElement(tag=data_header.tag, 
payload=payload))
diff --git 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
index b7522a696a..77d51933af 100644
--- 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
+++ 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
@@ -13,7 +13,6 @@ from typing import (
 )
 
 import betterproto
-import betterproto.lib.google.protobuf as betterproto_lib_google_protobuf
 import grpclib
 from betterproto.grpc.grpclib_server import ServiceBase
 
@@ -84,23 +83,23 @@ class ControlRequest(betterproto.Message):
     evaluate_python_expression_request: "EvaluatePythonExpressionRequest" = (
         betterproto.message_field(4, group="sealed_value")
     )
-    modify_logic_request: "ModifyLogicRequest" = betterproto.message_field(
-        5, group="sealed_value"
-    )
     retry_workflow_request: "RetryWorkflowRequest" = betterproto.message_field(
-        6, group="sealed_value"
+        5, group="sealed_value"
     )
     console_message_triggered_request: "ConsoleMessageTriggeredRequest" = (
-        betterproto.message_field(8, group="sealed_value")
+        betterproto.message_field(6, group="sealed_value")
     )
     port_completed_request: "PortCompletedRequest" = betterproto.message_field(
-        9, group="sealed_value"
+        7, group="sealed_value"
     )
     worker_state_updated_request: "WorkerStateUpdatedRequest" = (
-        betterproto.message_field(10, group="sealed_value")
+        betterproto.message_field(8, group="sealed_value")
     )
     link_workers_request: "LinkWorkersRequest" = betterproto.message_field(
-        11, group="sealed_value"
+        9, group="sealed_value"
+    )
+    workflow_reconfigure_request: "WorkflowReconfigureRequest" = (
+        betterproto.message_field(10, group="sealed_value")
     )
     add_input_channel_request: "AddInputChannelRequest" = 
betterproto.message_field(
         50, group="sealed_value"
@@ -198,7 +197,7 @@ class TakeGlobalCheckpointRequest(betterproto.Message):
 
 @dataclass(eq=False, repr=False)
 class WorkflowReconfigureRequest(betterproto.Message):
-    reconfiguration: "ModifyLogicRequest" = betterproto.message_field(1)
+    reconfiguration: List["UpdateExecutorRequest"] = 
betterproto.message_field(1)
     reconfiguration_id: str = betterproto.string_field(2)
 
 
@@ -214,11 +213,6 @@ class EvaluatePythonExpressionRequest(betterproto.Message):
     operator_id: str = betterproto.string_field(2)
 
 
-@dataclass(eq=False, repr=False)
-class ModifyLogicRequest(betterproto.Message):
-    update_request: List["UpdateExecutorRequest"] = 
betterproto.message_field(1)
-
-
 @dataclass(eq=False, repr=False)
 class RetryWorkflowRequest(betterproto.Message):
     workers: List["___core__.ActorVirtualIdentity"] = 
betterproto.message_field(1)
@@ -372,10 +366,7 @@ class InitializeExecutorRequest(betterproto.Message):
 @dataclass(eq=False, repr=False)
 class UpdateExecutorRequest(betterproto.Message):
     target_op_id: "___core__.PhysicalOpIdentity" = betterproto.message_field(1)
-    new_executor: "betterproto_lib_google_protobuf.Any" = 
betterproto.message_field(2)
-    state_transfer_func: "betterproto_lib_google_protobuf.Any" = (
-        betterproto.message_field(3)
-    )
+    new_exec_init_info: "___core__.OpExecInitInfo" = 
betterproto.message_field(2)
 
 
 @dataclass(eq=False, repr=False)
@@ -1037,6 +1028,23 @@ class WorkerServiceStub(betterproto.ServiceStub):
             metadata=metadata,
         )
 
+    async def update_executor(
+        self,
+        update_executor_request: "UpdateExecutorRequest",
+        *,
+        timeout: Optional[float] = None,
+        deadline: Optional["Deadline"] = None,
+        metadata: Optional["MetadataLike"] = None
+    ) -> "EmptyReturn":
+        return await self._unary_unary(
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor",
+            update_executor_request,
+            EmptyReturn,
+            timeout=timeout,
+            deadline=deadline,
+            metadata=metadata,
+        )
+
 
 class ControllerServiceStub(betterproto.ServiceStub):
     async def retrieve_workflow_state(
@@ -1294,6 +1302,23 @@ class ControllerServiceStub(betterproto.ServiceStub):
             metadata=metadata,
         )
 
+    async def reconfigure_workflow(
+        self,
+        workflow_reconfigure_request: "WorkflowReconfigureRequest",
+        *,
+        timeout: Optional[float] = None,
+        deadline: Optional["Deadline"] = None,
+        metadata: Optional["MetadataLike"] = None
+    ) -> "EmptyReturn":
+        return await self._unary_unary(
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow",
+            workflow_reconfigure_request,
+            EmptyReturn,
+            timeout=timeout,
+            deadline=deadline,
+            metadata=metadata,
+        )
+
 
 class RpcTesterBase(ServiceBase):
 
@@ -1554,6 +1579,11 @@ class WorkerServiceBase(ServiceBase):
     async def no_operation(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
+    async def update_executor(
+        self, update_executor_request: "UpdateExecutorRequest"
+    ) -> "EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
     async def __rpc_add_input_channel(
         self, stream: "grpclib.server.Stream[AddInputChannelRequest, 
EmptyReturn]"
     ) -> None:
@@ -1696,6 +1726,13 @@ class WorkerServiceBase(ServiceBase):
         response = await self.no_operation(request)
         await stream.send_message(response)
 
+    async def __rpc_update_executor(
+        self, stream: "grpclib.server.Stream[UpdateExecutorRequest, 
EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.update_executor(request)
+        await stream.send_message(response)
+
     def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
         return {
             
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel":
 grpclib.const.Handler(
@@ -1818,6 +1855,12 @@ class WorkerServiceBase(ServiceBase):
                 EmptyRequest,
                 EmptyReturn,
             ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor":
 grpclib.const.Handler(
+                self.__rpc_update_executor,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                UpdateExecutorRequest,
+                EmptyReturn,
+            ),
         }
 
 
@@ -1895,6 +1938,11 @@ class ControllerServiceBase(ServiceBase):
     ) -> "EmptyReturn":
         raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
 
+    async def reconfigure_workflow(
+        self, workflow_reconfigure_request: "WorkflowReconfigureRequest"
+    ) -> "EmptyReturn":
+        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
+
     async def __rpc_retrieve_workflow_state(
         self,
         stream: "grpclib.server.Stream[EmptyRequest, 
RetrieveWorkflowStateResponse]",
@@ -2005,6 +2053,13 @@ class ControllerServiceBase(ServiceBase):
         response = await self.retry_workflow(request)
         await stream.send_message(response)
 
+    async def __rpc_reconfigure_workflow(
+        self, stream: "grpclib.server.Stream[WorkflowReconfigureRequest, 
EmptyReturn]"
+    ) -> None:
+        request = await stream.recv_message()
+        response = await self.reconfigure_workflow(request)
+        await stream.send_message(response)
+
     def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
         return {
             
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState":
 grpclib.const.Handler(
@@ -2097,4 +2152,10 @@ class ControllerServiceBase(ServiceBase):
                 RetryWorkflowRequest,
                 EmptyReturn,
             ),
+            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow":
 grpclib.const.Handler(
+                self.__rpc_reconfigure_workflow,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                WorkflowReconfigureRequest,
+                EmptyReturn,
+            ),
         }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
index 4d9a36bab4..2902173364 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
@@ -46,6 +46,7 @@ class ControllerAsyncRPCHandlerInitializer(
     with DebugCommandHandler
     with TakeGlobalCheckpointHandler
     with EmbeddedControlMessageHandler
-    with RetrieveWorkflowStateHandler {
+    with RetrieveWorkflowStateHandler
+    with ReconfigurationHandler {
   val actorId: ActorVirtualIdentity = cp.actorId
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
new file mode 100644
index 0000000000..210d7c5b98
--- /dev/null
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.controller.promisehandlers
+
+import com.twitter.util.Future
+import org.apache.texera.amber.core.virtualidentity.{
+  ChannelIdentity,
+  EmbeddedControlMessageIdentity
+}
+import 
org.apache.texera.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer
+import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.ALL_ALIGNMENT
+import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
+  AsyncRPCContext,
+  WorkflowReconfigureRequest
+}
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
+import org.apache.texera.amber.engine.common.FriesReconfigurationAlgorithm
+import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
+import org.apache.texera.amber.util.VirtualIdentityUtils
+import 
org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.METHOD_UPDATE_EXECUTOR
+
+import scala.collection.mutable
+
+trait ReconfigurationHandler {
+  this: ControllerAsyncRPCHandlerInitializer =>
+
+  override def reconfigureWorkflow(
+      msg: WorkflowReconfigureRequest,
+      ctx: AsyncRPCContext
+  ): Future[EmptyReturn] = {
+    if (
+      msg.reconfiguration.exists(req =>
+        
cp.workflowScheduler.physicalPlan.getOperator(req.targetOpId).isSourceOperator
+      )
+    ) {
+      throw new IllegalStateException(
+        "Reconfiguration cannot be applied to source operators"
+      )
+    }
+    val futures = mutable.ArrayBuffer[Future[_]]()
+    val friesComponents =
+      
FriesReconfigurationAlgorithm.getReconfigurations(cp.workflowExecutionCoordinator,
 msg)
+    friesComponents.foreach { friesComponent =>
+      if (friesComponent.scope.size == 1) {
+        val updateExecutorRequest = friesComponent.reconfigurations.head
+        val workerIds = cp.workflowExecution
+          .getLatestOperatorExecution(updateExecutorRequest.targetOpId)
+          .getWorkerIds
+        workerIds.foreach { worker =>
+          futures.append(workerInterface.updateExecutor(updateExecutorRequest, 
mkContext(worker)))
+        }
+      } else {
+        val channelScope = cp.workflowExecution.getRunningRegionExecutions
+          .flatMap(regionExecution =>
+            regionExecution.getAllLinkExecutions
+              .map(_._2)
+              .flatMap(linkExecution => 
linkExecution.getAllChannelExecutions.map(_._1))
+          )
+          .filter(channelId => {
+            friesComponent.scope
+              
.contains(VirtualIdentityUtils.getPhysicalOpId(channelId.fromWorkerId)) &&
+              friesComponent.scope
+                
.contains(VirtualIdentityUtils.getPhysicalOpId(channelId.toWorkerId))
+          })
+        val controlChannels = friesComponent.sources.flatMap { source =>
+          
cp.workflowExecution.getLatestOperatorExecution(source).getWorkerIds.flatMap { 
worker =>
+            Seq(
+              ChannelIdentity(CONTROLLER, worker, isControl = true),
+              ChannelIdentity(worker, CONTROLLER, isControl = true)
+            )
+          }
+        }
+        val finalScope = channelScope ++ controlChannels
+        val cmdMapping =
+          friesComponent.reconfigurations.flatMap { updateReq =>
+            val workers =
+              
cp.workflowExecution.getLatestOperatorExecution(updateReq.targetOpId).getWorkerIds
+            workers.map(worker =>
+              worker.name -> createInvocation(
+                METHOD_UPDATE_EXECUTOR.getBareMethodName,
+                updateReq,
+                worker
+              )
+            )
+          }.toMap
+        futures += cmdMapping.map {
+          case (_, (_, singleWorkerUpdateFuture)) => singleWorkerUpdateFuture
+        }
+        friesComponent.sources.foreach { source =>
+          
cp.workflowExecution.getLatestOperatorExecution(source).getWorkerIds.foreach { 
worker =>
+            sendECM(
+              EmbeddedControlMessageIdentity(msg.reconfigurationId),
+              ALL_ALIGNMENT,
+              finalScope.toSet,
+              cmdMapping.map(x => (x._1, x._2._1)),
+              ChannelIdentity(actorId, worker, isControl = true)
+            )
+          }
+        }
+      }
+    }
+    Future.collect(futures.toList).map { _ =>
+      EmptyReturn()
+    }
+  }
+
+}
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
index 2abcdf6697..6b0c62ac3f 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala
@@ -20,6 +20,13 @@
 package org.apache.texera.amber.engine.architecture.worker
 
 import com.twitter.util.Future
+import org.apache.texera.amber.core.executor.{
+  ExecFactory,
+  OpExecInitInfo,
+  OpExecSource,
+  OpExecWithClassName,
+  OpExecWithCode
+}
 import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
   AsyncRPCContext,
@@ -32,6 +39,9 @@ import 
org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServi
 import org.apache.texera.amber.engine.architecture.worker.promisehandlers._
 import org.apache.texera.amber.engine.common.AmberLogging
 import org.apache.texera.amber.engine.common.rpc.AsyncRPCHandlerInitializer
+import org.apache.texera.amber.operator.source.cache.CacheSourceOpExec
+
+import java.net.URI
 
 class DataProcessorRPCHandlerInitializer(val dp: DataProcessor)
     extends AsyncRPCHandlerInitializer(dp.asyncRPCClient, dp.asyncRPCServer)
@@ -52,9 +62,12 @@ class DataProcessorRPCHandlerInitializer(val dp: 
DataProcessor)
     with FlushNetworkBufferHandler
     with RetrieveStateHandler
     with PrepareCheckpointHandler
-    with FinalizeCheckpointHandler {
+    with FinalizeCheckpointHandler
+    with UpdateExecutorHandler {
   val actorId: ActorVirtualIdentity = dp.actorId
 
+  var cachedTotalWorkerCount = 0
+
   override def debugCommand(
       request: DebugCommandRequest,
       ctx: AsyncRPCContext
@@ -69,4 +82,17 @@ class DataProcessorRPCHandlerInitializer(val dp: 
DataProcessor)
     ???
 
   override def noOperation(request: EmptyRequest, ctx: AsyncRPCContext): 
Future[EmptyReturn] = ???
+
+  def setupExecutor(execInitInfo: OpExecInitInfo, workerIdx: Int, workerCount: 
Int): Unit = {
+    dp.executor = execInitInfo match {
+      case OpExecWithClassName(className, descString) =>
+        ExecFactory.newExecFromJavaClassName(className, descString, workerIdx, 
workerCount)
+      case OpExecWithCode(code, _) =>
+        ExecFactory.newExecFromJavaCode(code)
+      case OpExecSource(storageUri, _) =>
+        new CacheSourceOpExec(URI.create(storageUri))
+      case OpExecInitInfo.Empty =>
+        throw new IllegalArgumentException("Empty executor initialization 
info")
+    }
+  }
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
index bf45d8eff9..212a980e5e 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
@@ -20,18 +20,14 @@
 package org.apache.texera.amber.engine.architecture.worker.promisehandlers
 
 import com.twitter.util.Future
-import org.apache.texera.amber.core.executor._
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
   AsyncRPCContext,
   InitializeExecutorRequest
 }
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
 import 
org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer
-import org.apache.texera.amber.operator.source.cache.CacheSourceOpExec
 import org.apache.texera.amber.util.VirtualIdentityUtils
 
-import java.net.URI
-
 trait InitializeExecutorHandler {
   this: DataProcessorRPCHandlerInitializer =>
 
@@ -41,18 +37,8 @@ trait InitializeExecutorHandler {
   ): Future[EmptyReturn] = {
     dp.serializationManager.setOpInitialization(req)
     val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
-    val workerCount = req.totalWorkerCount
-    dp.executor = req.opExecInitInfo match {
-      case OpExecWithClassName(className, descString) =>
-        ExecFactory.newExecFromJavaClassName(className, descString, workerIdx, 
workerCount)
-      case OpExecWithCode(code, _) =>
-        ExecFactory.newExecFromJavaCode(code)
-      case OpExecSource(storageUri, _) =>
-        new CacheSourceOpExec(URI.create(storageUri))
-      case OpExecInitInfo.Empty =>
-        throw new IllegalArgumentException("Empty executor initialization 
info")
-    }
+    cachedTotalWorkerCount = req.totalWorkerCount
+    setupExecutor(req.opExecInitInfo, workerIdx, cachedTotalWorkerCount)
     EmptyReturn()
   }
-
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
similarity index 62%
copy from 
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
copy to 
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
index bf45d8eff9..8ed9ebdc59 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
@@ -20,38 +20,29 @@
 package org.apache.texera.amber.engine.architecture.worker.promisehandlers
 
 import com.twitter.util.Future
-import org.apache.texera.amber.core.executor._
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
   AsyncRPCContext,
-  InitializeExecutorRequest
+  UpdateExecutorRequest
 }
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
 import 
org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer
-import org.apache.texera.amber.operator.source.cache.CacheSourceOpExec
 import org.apache.texera.amber.util.VirtualIdentityUtils
 
-import java.net.URI
-
-trait InitializeExecutorHandler {
+trait UpdateExecutorHandler {
   this: DataProcessorRPCHandlerInitializer =>
 
-  override def initializeExecutor(
-      req: InitializeExecutorRequest,
+  override def updateExecutor(
+      request: UpdateExecutorRequest,
       ctx: AsyncRPCContext
   ): Future[EmptyReturn] = {
-    dp.serializationManager.setOpInitialization(req)
     val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
-    val workerCount = req.totalWorkerCount
-    dp.executor = req.opExecInitInfo match {
-      case OpExecWithClassName(className, descString) =>
-        ExecFactory.newExecFromJavaClassName(className, descString, workerIdx, 
workerCount)
-      case OpExecWithCode(code, _) =>
-        ExecFactory.newExecFromJavaCode(code)
-      case OpExecSource(storageUri, _) =>
-        new CacheSourceOpExec(URI.create(storageUri))
-      case OpExecInitInfo.Empty =>
-        throw new IllegalArgumentException("Empty executor initialization 
info")
+    // Close the existing executor (if any) before replacing it to avoid 
resource leaks.
+    val oldExecutor = dp.executor
+    if (oldExecutor != null) {
+      oldExecutor.close()
     }
+    setupExecutor(request.newExecInitInfo, workerIdx, cachedTotalWorkerCount)
+    dp.executor.open()
     EmptyReturn()
   }
 
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/FriesReconfigurationAlgorithm.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/common/FriesReconfigurationAlgorithm.scala
similarity index 72%
rename from 
amber/src/main/scala/org/apache/texera/web/service/FriesReconfigurationAlgorithm.scala
rename to 
amber/src/main/scala/org/apache/texera/amber/engine/common/FriesReconfigurationAlgorithm.scala
index c2a15106b1..c13e780119 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/service/FriesReconfigurationAlgorithm.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/common/FriesReconfigurationAlgorithm.scala
@@ -17,13 +17,13 @@
  * under the License.
  */
 
-package org.apache.texera.web.service
+package org.apache.texera.amber.engine.common
 
 import org.apache.texera.amber.core.virtualidentity.PhysicalOpIdentity
 import org.apache.texera.amber.core.workflow.PhysicalPlan
 import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
-  ModifyLogicRequest,
-  PropagateEmbeddedControlMessageRequest
+  UpdateExecutorRequest,
+  WorkflowReconfigureRequest
 }
 import org.apache.texera.amber.engine.architecture.scheduling.{Region, 
WorkflowExecutionCoordinator}
 import org.jgrapht.alg.connectivity.ConnectivityInspector
@@ -34,28 +34,33 @@ import scala.jdk.CollectionConverters.SetHasAsScala
 
 object FriesReconfigurationAlgorithm {
 
+  case class FriesComponent(
+      sources: Set[PhysicalOpIdentity],
+      scope: Set[PhysicalOpIdentity],
+      reconfigurations: Set[UpdateExecutorRequest]
+  )
+
   private def getOneToManyOperators(region: Region): Set[PhysicalOpIdentity] = 
{
     region.getOperators.filter(op => op.isOneToManyOp).map(op => op.id)
   }
 
-  def scheduleReconfigurations(
+  def getReconfigurations(
       workflowExecutionCoordinator: WorkflowExecutionCoordinator,
-      reconfiguration: ModifyLogicRequest,
-      epochMarkerId: String
-  ): Set[PropagateEmbeddedControlMessageRequest] = {
+      reconfiguration: WorkflowReconfigureRequest
+  ): Set[FriesComponent] = {
     // independently schedule reconfigurations for each region:
     workflowExecutionCoordinator.getExecutingRegions
-      .flatMap(region => computeMCS(region, reconfiguration, epochMarkerId))
+      .flatMap(region => computeMCS(region, reconfiguration, 
reconfiguration.reconfigurationId))
   }
 
   private def computeMCS(
       region: Region,
-      reconfiguration: ModifyLogicRequest,
+      reconfiguration: WorkflowReconfigureRequest,
       epochMarkerId: String
-  ): List[PropagateEmbeddedControlMessageRequest] = {
+  ): List[FriesComponent] = {
 
     // add all reconfiguration operators to M
-    val reconfigOps = reconfiguration.updateRequest.map(req => 
req.targetOpId).toSet
+    val reconfigOps = reconfiguration.reconfiguration.map(req => 
req.targetOpId).toSet
     val M = mutable.Set.empty ++ reconfigOps
 
     // for each one-to-many operator, add it to M if its downstream has a 
reconfiguration operator
@@ -101,30 +106,20 @@ object FriesReconfigurationAlgorithm {
 
     // find the MCS components,
     // for each component, send an epoch marker to each of its source operators
-    val epochMarkers = new 
ArrayBuffer[PropagateEmbeddedControlMessageRequest]()
+    val epochMarkers = new ArrayBuffer[FriesComponent]()
 
     val connectedSets = new ConnectivityInspector(mcsPlan.dag).connectedSets()
     connectedSets.forEach(component => {
       val componentSet = component.asScala.toSet
       val componentPlan = mcsPlan.getSubPlan(componentSet)
-
-      // generate the reconfiguration command for this component
-      //      val reconfigCommands =
-      //        reconfiguration.updateRequest
-      //          .filter(req => component.contains(req.targetOpId))
-      //      val reconfigTargets = reconfigCommands.map(_.targetOpId)
-      //
-      //      // find the source operators of the component
-      //      val sources = 
componentSet.intersect(mcsPlan.getSourceOperatorIds)
-      //      epochMarkers += PropagateEmbeddedControlMessageRequest(
-      //        sources.toSeq,
-      //        EmbeddedControlMessageIdentity(epochMarkerId),
-      //        ALL_ALIGNMENT,
-      //        componentPlan.operators.map(_.id).toSeq,
-      //        reconfigTargets,
-      //        ModifyLogicRequest(reconfigCommands),
-      //        METHOD_MODIFY_LOGIC.getBareMethodName
-      //      )
+      val reconfigCommands =
+        reconfiguration.reconfiguration
+          .filter(req => component.contains(req.targetOpId))
+          .toSet
+
+      // find the source operators of the component
+      val sources = componentSet.intersect(mcsPlan.getSourceOperatorIds)
+      epochMarkers += FriesComponent(sources, 
componentPlan.operators.map(_.id), reconfigCommands)
     })
     epochMarkers.toList
   }
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
new file mode 100644
index 0000000000..5f9f1bba18
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ReconfigurationSpec.scala
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.e2e
+
+import com.twitter.util.{Await, Duration, Promise}
+import com.typesafe.scalalogging.Logger
+import org.apache.pekko.actor.{ActorSystem, Props}
+import org.apache.pekko.testkit.{ImplicitSender, TestKit}
+import org.apache.pekko.util.Timeout
+import org.apache.texera.amber.clustering.SingleNodeListener
+import org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode}
+import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.storage.model.VirtualDocument
+import org.apache.texera.amber.core.tuple.Tuple
+import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
+import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
+import org.apache.texera.amber.engine.architecture.controller.{
+  ControllerConfig,
+  ExecutionStateUpdate
+}
+import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
+  EmptyRequest,
+  UpdateExecutorRequest,
+  WorkflowReconfigureRequest
+}
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
+import org.apache.texera.amber.engine.common.AmberRuntime
+import org.apache.texera.amber.engine.common.client.AmberClient
+import org.apache.texera.amber.engine.e2e.TestUtils.{
+  cleanupWorkflowExecutionData,
+  initiateTexeraDBForTestCases,
+  setUpWorkflowExecutionData
+}
+import org.apache.texera.amber.operator.{LogicalOp, TestOperators}
+import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
+import org.apache.texera.workflow.LogicalLink
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
+import org.scalatest.flatspec.AnyFlatSpecLike
+
+import scala.concurrent.duration._
+
+class ReconfigurationSpec
+    extends TestKit(ActorSystem("ReconfigurationSpec", 
AmberRuntime.akkaConfig))
+    with ImplicitSender
+    with AnyFlatSpecLike
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach
+    with Retries {
+
+  /**
+    * This block retries each test once if it fails.
+    * In the CI environment, there is a chance that executeWorkflow does not 
receive "COMPLETED" status.
+    * Until we find the root cause of this issue, we use a retry mechanism 
here to stabilize CI runs.
+    */
+  override def withFixture(test: NoArgTest): Outcome =
+    withRetry { super.withFixture(test) }
+
+  implicit val timeout: Timeout = Timeout(5.seconds)
+
+  val logger = Logger("ReconfigurationSpecLogger")
+  val ctx = new WorkflowContext()
+
+  override protected def beforeEach(): Unit = {
+    setUpWorkflowExecutionData()
+  }
+
+  override protected def afterEach(): Unit = {
+    cleanupWorkflowExecutionData()
+  }
+
+  override def beforeAll(): Unit = {
+    system.actorOf(Props[SingleNodeListener](), "cluster-info")
+    // These test cases access postgres in CI, but occasionally the jdbc 
driver cannot be found during CI run.
+    // Explicitly load the JDBC driver to avoid flaky CI failures.
+    Class.forName("org.postgresql.Driver")
+    initiateTexeraDBForTestCases()
+  }
+
+  override def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  def shouldReconfigure(
+      operators: List[LogicalOp],
+      links: List[LogicalLink],
+      targetOps: Seq[LogicalOp],
+      newOpExecInitInfo: OpExecInitInfo
+  ): Map[OperatorIdentity, List[Tuple]] = {
+    val workflow =
+      TestUtils.buildWorkflow(operators, links, ctx)
+    val client =
+      new AmberClient(
+        system,
+        workflow.context,
+        workflow.physicalPlan,
+        ControllerConfig.default,
+        error => {}
+      )
+    val completion = Promise[Unit]()
+    var result: Map[OperatorIdentity, List[Tuple]] = null
+    client
+      .registerCallback[ExecutionStateUpdate](evt => {
+        if (evt.state == COMPLETED) {
+          result = workflow.logicalPlan.getTerminalOperatorIds
+            .filter(terminalOpId => {
+              val uri = getResultUriByLogicalPortId(
+                workflow.context.executionId,
+                terminalOpId,
+                PortIdentity()
+              )
+              uri.nonEmpty
+            })
+            .map(terminalOpId => {
+              //TODO: remove the delay after fixing the issue of reporting 
"completed" status too early.
+              Thread.sleep(1000)
+              val uri = getResultUriByLogicalPortId(
+                workflow.context.executionId,
+                terminalOpId,
+                PortIdentity()
+              ).get
+              terminalOpId -> DocumentFactory
+                .openDocument(uri)
+                ._1
+                .asInstanceOf[VirtualDocument[Tuple]]
+                .get()
+                .toList
+            })
+            .toMap
+          completion.setDone()
+        }
+      })
+    Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
+    Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ()))
+    Thread.sleep(4000)
+    val physicalOps = targetOps.flatMap(op =>
+      workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier)
+    )
+    Await.result(
+      client.controllerInterface.reconfigureWorkflow(
+        WorkflowReconfigureRequest(
+          reconfiguration = physicalOps.map(op => UpdateExecutorRequest(op.id, 
newOpExecInitInfo)),
+          reconfigurationId = "test-reconfigure-1"
+        ),
+        ()
+      )
+    )
+    Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ()))
+    Await.result(completion, Duration.fromMinutes(1))
+    result
+  }
+
+  "Engine" should "be able to modify a python UDF worker in workflow" in {
+    val sourceOpDesc = TestOperators.smallCsvScanOpDesc()
+    val udfOpDesc = TestOperators.pythonOpDesc()
+    val code = """
+                 |from pytexera import *
+                 |
+                 |class ProcessTupleOperator(UDFOperatorV2):
+                 |    @overrides
+                 |    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
+                 |        tuple_['Region'] = tuple_['Region'] + '_reconfigured'
+                 |        yield tuple_
+                 |""".stripMargin
+
+    val result = shouldReconfigure(
+      List(sourceOpDesc, udfOpDesc),
+      List(
+        LogicalLink(
+          sourceOpDesc.operatorIdentifier,
+          PortIdentity(),
+          udfOpDesc.operatorIdentifier,
+          PortIdentity()
+        )
+      ),
+      Seq(udfOpDesc),
+      OpExecWithCode(code, "python")
+    )
+    assert(result(udfOpDesc.operatorIdentifier).exists { t =>
+      t.getField("Region").asInstanceOf[String].contains("_reconfigured")
+    })
+  }
+
+  "Engine" should "be able to modify a java operator in workflow" in {
+    val sourceOpDesc = TestOperators.mediumCsvScanOpDesc()
+    val keywordMatchNoneOpDesc = TestOperators.keywordSearchOpDesc("Region", 
"ShouldMatchNone")
+    val keywordMatchManyOpDesc = TestOperators.keywordSearchOpDesc("Region", 
"Asia")
+    val result = shouldReconfigure(
+      List(sourceOpDesc, keywordMatchNoneOpDesc),
+      List(
+        LogicalLink(
+          sourceOpDesc.operatorIdentifier,
+          PortIdentity(),
+          keywordMatchNoneOpDesc.operatorIdentifier,
+          PortIdentity()
+        )
+      ),
+      Seq(keywordMatchNoneOpDesc),
+      keywordMatchManyOpDesc.getPhysicalOp(ctx.workflowId, 
ctx.executionId).opExecInitInfo
+    )
+    assert(result(keywordMatchNoneOpDesc.operatorIdentifier).nonEmpty)
+  }
+
+  "Engine" should "not be able to modify a source operator in workflow" in {
+    val sourceOpDesc = TestOperators.mediumCsvScanOpDesc()
+    val sourceOpDesc2 = TestOperators.mediumCsvScanOpDesc()
+    val keywordMatchNoneOpDesc = TestOperators.keywordSearchOpDesc("Region", 
"ShouldMatchNone")
+    val ex = intercept[Throwable] {
+      shouldReconfigure(
+        List(sourceOpDesc, keywordMatchNoneOpDesc),
+        List(
+          LogicalLink(
+            sourceOpDesc.operatorIdentifier,
+            PortIdentity(),
+            keywordMatchNoneOpDesc.operatorIdentifier,
+            PortIdentity()
+          )
+        ),
+        Seq(sourceOpDesc),
+        sourceOpDesc2.getPhysicalOp(ctx.workflowId, 
ctx.executionId).opExecInitInfo
+      )
+    }
+    assert(
+      ex.getMessage == "java.lang.IllegalStateException: Reconfiguration 
cannot be applied to source operators"
+    )
+  }
+
+  "Engine" should "propagate reconfiguration through a source operator in 
workflow" in {
+    val sourceOpDesc = TestOperators.pythonSourceOpDesc(10000)
+    val udfOpDesc = TestOperators.pythonOpDesc()
+    val code = """
+                 |from pytexera import *
+                 |
+                 |class ProcessTupleOperator(UDFOperatorV2):
+                 |    @overrides
+                 |    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
+                 |        tuple_['field_1'] = tuple_['field_1'] + 
'_reconfigured'
+                 |        yield tuple_
+                 |""".stripMargin
+    val result = shouldReconfigure(
+      List(sourceOpDesc, udfOpDesc),
+      List(
+        LogicalLink(
+          sourceOpDesc.operatorIdentifier,
+          PortIdentity(),
+          udfOpDesc.operatorIdentifier,
+          PortIdentity()
+        )
+      ),
+      Seq(udfOpDesc),
+      OpExecWithCode(code, "python")
+    )
+    assert(result(udfOpDesc.operatorIdentifier).exists { t =>
+      t.getField("field_1").asInstanceOf[String].contains("_reconfigured")
+    })
+  }
+
+  "Engine" should "be able to modify two python UDFs in workflow" in {
+    val sourceOpDesc = TestOperators.smallCsvScanOpDesc()
+    val udfOpDesc1 = TestOperators.pythonOpDesc()
+    val udfOpDesc2 = TestOperators.pythonOpDesc()
+    val code = """
+                 |from pytexera import *
+                 |
+                 |class ProcessTupleOperator(UDFOperatorV2):
+                 |    @overrides
+                 |    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
+                 |        tuple_['Region'] = tuple_['Region'] + '_reconfigured'
+                 |        yield tuple_
+                 |""".stripMargin
+
+    val result = shouldReconfigure(
+      List(sourceOpDesc, udfOpDesc1, udfOpDesc2),
+      List(
+        LogicalLink(
+          sourceOpDesc.operatorIdentifier,
+          PortIdentity(),
+          udfOpDesc1.operatorIdentifier,
+          PortIdentity()
+        ),
+        LogicalLink(
+          udfOpDesc1.operatorIdentifier,
+          PortIdentity(),
+          udfOpDesc2.operatorIdentifier,
+          PortIdentity()
+        )
+      ),
+      Seq(udfOpDesc1, udfOpDesc2),
+      OpExecWithCode(code, "python")
+    )
+    assert(result(udfOpDesc2.operatorIdentifier).exists { t =>
+      
t.getField("Region").asInstanceOf[String].contains("_reconfigured_reconfigured")
+    })
+  }
+
+}
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
index ab7e8dc2de..268b06ff8b 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala
@@ -20,6 +20,7 @@
 package org.apache.texera.amber.operator
 
 import org.apache.texera.amber.core.storage.FileResolver
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType}
 import org.apache.texera.amber.operator.aggregate.{
   AggregateOpDesc,
   AggregationFunction,
@@ -32,6 +33,7 @@ import 
org.apache.texera.amber.operator.source.scan.json.JSONLScanSourceOpDesc
 import 
org.apache.texera.amber.operator.source.sql.asterixdb.AsterixDBSourceOpDesc
 import org.apache.texera.amber.operator.source.sql.mysql.MySQLSourceOpDesc
 import org.apache.texera.amber.operator.udf.python.PythonUDFOpDescV2
+import 
org.apache.texera.amber.operator.udf.python.source.PythonUDFSourceOpDescV2
 
 import java.nio.file.Path
 
@@ -171,6 +173,7 @@ object TestOperators {
   def pythonOpDesc(): PythonUDFOpDescV2 = {
     val udf = new PythonUDFOpDescV2()
     udf.workers = 1
+    udf.retainInputColumns = true
     udf.code = """
         |from pytexera import *
         |
@@ -181,4 +184,21 @@ object TestOperators {
         |""".stripMargin
     udf
   }
+
+  def pythonSourceOpDesc(numTuple: Int): PythonUDFSourceOpDescV2 = {
+    val udf = new PythonUDFSourceOpDescV2()
+    udf.workers = 1
+    udf.columns = List(new Attribute("field_1", AttributeType.STRING))
+    udf.code = s"""
+                 |from pytexera import *
+                 |
+                 |class UDFSourceOperator(UDFSourceOperator):
+                 |    @overrides
+                 |    def produce(self) -> Iterator[Union[TupleLike, 
TableLike, None]]:
+                 |        for i in range($numTuple):
+                 |          yield {'field_1': str(i) }
+                 |""".stripMargin
+    udf
+  }
+
 }

Reply via email to