This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new d5bc8b7cf8 refactor: untrack generated python proto, regenerate in 
build (#5077)
d5bc8b7cf8 is described below

commit d5bc8b7cf8d2e515280934a9c3e52c305fe3c002
Author: Matthew B. <[email protected]>
AuthorDate: Tue May 26 20:39:27 2026 -0700

    refactor: untrack generated python proto, regenerate in build (#5077)
    
    ### What changes were proposed in this PR?
    Stop version-tracking the betterproto-generated Python files under
    `amber/src/main/python/proto/`. Add a `genPythonProto` sbt task that
    runs `bin/python-proto-gen.sh` as a dependency of
    `WorkflowExecutionService/Compile/compile`, with a graceful skip when
    `protoc` is missing. Wire `protoc` 3.19.4 (matching `PB.protocVersion`)
    and `betterproto[compiler]==2.0.0b7` into the three image build stages
    (worker,master, web-app) and the three CI jobs that touch Python proto
    (`python`, `python-state-materialization-mac`, `amber-integration`). The
    14th tracked file (`proto/org/apache/texera/web/__init__.py`) had no
    corresponding `.proto` source and is gone after regen; grep confirms no
    callers.
      ### Any related issues, documentation, or discussions?
      Closes: #4102
      ### How was this PR tested?
    locally: sbt clean WorkflowExecutionService/compile regenerates the tree
    and pytest -m "not integration" passes; skip path verified with protoc
    absent. All three Docker images build end-to-end. CI covers the workflow
    YAML on
      push.
      ### Was this PR authored or co-authored using generative AI tooling?
      Co-authored with Claude Opus 4.7 in compliance with ASF
---
 .github/workflows/build.yml                        |   30 +
 .gitignore                                         |    3 +
 AGENTS.md                                          |    2 +
 amber/build.sbt                                    |    2 +-
 amber/dev-requirements.txt                         |    4 +
 amber/src/main/python/proto/__init__.py            |    0
 amber/src/main/python/proto/org/__init__.py        |    0
 amber/src/main/python/proto/org/apache/__init__.py |    0
 .../python/proto/org/apache/texera/__init__.py     |    0
 .../proto/org/apache/texera/amber/__init__.py      |    0
 .../proto/org/apache/texera/amber/core/__init__.py |  146 --
 .../org/apache/texera/amber/engine/__init__.py     |    0
 .../texera/amber/engine/architecture/__init__.py   |    0
 .../amber/engine/architecture/rpc/__init__.py      | 2204 --------------------
 .../engine/architecture/sendsemantics/__init__.py  |   66 -
 .../amber/engine/architecture/worker/__init__.py   |   49 -
 .../apache/texera/amber/engine/common/__init__.py  |  156 --
 .../python/proto/org/apache/texera/web/__init__.py |  158 --
 amber/src/main/python/proto/scalapb/__init__.py    |  421 ----
 bin/computing-unit-master.dockerfile               |   21 +-
 bin/computing-unit-worker.dockerfile               |   21 +-
 bin/protoc-version.txt                             |    1 +
 bin/python-proto-gen.sh                            |   12 +-
 bin/texera-web-application.dockerfile              |   22 +-
 24 files changed, 108 insertions(+), 3210 deletions(-)

diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 4fd056c7a7..2b93c55576 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -355,11 +355,21 @@ jobs:
         # mirrors a subset of common deps (e.g. pillow); without this
         # flag a dependabot bump to a version not yet mirrored there
         # fails to resolve even though PyPI has it.
+        # dev-requirements.txt provides the betterproto plugin used by
+        # bin/python-proto-gen.sh.
         run: |
           python -m pip install uv
           if [ -f amber/requirements.txt ]; then uv pip install --system 
--index-strategy unsafe-best-match -r amber/requirements.txt; fi
           if [ -f amber/operator-requirements.txt ]; then uv pip install 
--system --index-strategy unsafe-best-match -r amber/operator-requirements.txt; 
fi
           if [ -f amber/dev-requirements.txt ]; then uv pip install --system 
--index-strategy unsafe-best-match -r amber/dev-requirements.txt; fi
+      - name: Install protoc
+        # Version pinned in bin/protoc-version.txt.
+        run: |
+          PROTOC_VERSION=$(cat bin/protoc-version.txt)
+          curl -fsSL -o /tmp/protoc.zip 
"https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip";
+          sudo unzip -o /tmp/protoc.zip -d /usr/local
+          sudo chmod +x /usr/local/bin/protoc
+          sudo chmod -R a+rX /usr/local/include/google
       - name: Create Databases
         run: |
           psql -h localhost -U postgres -f sql/texera_ddl.sql
@@ -467,6 +477,10 @@ jobs:
             }
           }
           EOF
+      - name: Generate Python proto bindings
+        # Integration specs spawn Python UDF workers that import the
+        # generated betterproto bindings. Independent of sbt and the JDK.
+        run: bash bin/python-proto-gen.sh
       - name: Lint and run amber integration tests
         # AMBER_TEST_FILTER=integration-only tells amber/build.sbt to
         # keep only @org.apache.texera.amber.tags.IntegrationTest
@@ -685,6 +699,16 @@ jobs:
         run: |
           python -m pip install uv
           if [ -f amber/dev-requirements.txt ]; then uv pip install --system 
-r amber/dev-requirements.txt; fi
+      - name: Install protoc
+        # Version pinned in bin/protoc-version.txt.
+        run: |
+          PROTOC_VERSION=$(cat bin/protoc-version.txt)
+          curl -fsSL -o /tmp/protoc.zip 
"https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip";
+          sudo unzip -o /tmp/protoc.zip -d /usr/local
+          sudo chmod +x /usr/local/bin/protoc
+          sudo chmod -R a+rX /usr/local/include/google
+      - name: Generate Python proto bindings
+        run: bash bin/python-proto-gen.sh
       - name: Test with pytest
         # --junit-xml emits a JUnit-XML report alongside the coverage XML
         # so the Test Analytics upload below can feed Codecov's failing-
@@ -745,6 +769,12 @@ jobs:
           if [ -f amber/requirements.txt ]; then uv pip install --system 
--index-strategy unsafe-best-match -r amber/requirements.txt; fi
           if [ -f amber/operator-requirements.txt ]; then uv pip install 
--system --index-strategy unsafe-best-match -r amber/operator-requirements.txt; 
fi
           if [ -f amber/dev-requirements.txt ]; then uv pip install --system 
-r amber/dev-requirements.txt; fi
+      - name: Install protoc
+        # Homebrew protoc; this job doesn't exercise scalapb so the
+        # bin/protoc-version.txt pin doesn't apply here.
+        run: brew install protobuf
+      - name: Generate Python proto bindings
+        run: bash bin/python-proto-gen.sh
       - name: Run state-materialization integration tests
         run: |
           cd amber && pytest -sv \
diff --git a/.gitignore b/.gitignore
index d17fe084cf..6e772fee5a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -60,6 +60,9 @@ coverage.xml
 *.model
 *.pkl
 
+# Regenerated by bin/python-proto-gen.sh.
+amber/src/main/python/proto/
+
 # Ingoring user generated resources
 user-resources/
 
diff --git a/AGENTS.md b/AGENTS.md
index ba40082d59..eef0aea460 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -83,6 +83,8 @@ One Python venv shared across worktrees, sibling of the 
texera checkout:
 ```bash
 python3.12 -m venv ../venv312 && source ../venv312/bin/activate
 pip install -r amber/requirements.txt -r amber/operator-requirements.txt
+# For pytest or running bin/python-proto-gen.sh, also install dev deps:
+pip install -r amber/dev-requirements.txt
 ```
 
 Tests that spawn Python workers need an interpreter path. Edit `python.path`
diff --git a/amber/build.sbt b/amber/build.sbt
index 1f363e73e9..dc9b5d8f30 100644
--- a/amber/build.sbt
+++ b/amber/build.sbt
@@ -179,7 +179,7 @@ libraryDependencies ++= hadoopDependencies
 // protobuf related
 // run the following with sbt to have protobuf codegen
 
-PB.protocVersion := "3.19.4"
+PB.protocVersion := IO.read((ThisBuild / baseDirectory).value / "bin" / 
"protoc-version.txt").trim
 
 enablePlugins(Fs2Grpc)
 
diff --git a/amber/dev-requirements.txt b/amber/dev-requirements.txt
index 1bbacb78d6..7ae68cf4a2 100644
--- a/amber/dev-requirements.txt
+++ b/amber/dev-requirements.txt
@@ -23,3 +23,7 @@
 # Coverage instrumentation for pytest; emits coverage.xml consumed by
 # Codecov's Phase 1 upload.
 pytest-cov==5.0.0
+
+# protoc plugin for bin/python-proto-gen.sh; runtime needs only the
+# base `betterproto` in requirements.txt.
+betterproto[compiler]==2.0.0b7
diff --git a/amber/src/main/python/proto/__init__.py 
b/amber/src/main/python/proto/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/amber/src/main/python/proto/org/__init__.py 
b/amber/src/main/python/proto/org/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/amber/src/main/python/proto/org/apache/__init__.py 
b/amber/src/main/python/proto/org/apache/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/amber/src/main/python/proto/org/apache/texera/__init__.py 
b/amber/src/main/python/proto/org/apache/texera/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/amber/src/main/python/proto/org/apache/texera/amber/__init__.py 
b/amber/src/main/python/proto/org/apache/texera/amber/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git 
a/amber/src/main/python/proto/org/apache/texera/amber/core/__init__.py 
b/amber/src/main/python/proto/org/apache/texera/amber/core/__init__.py
deleted file mode 100644
index 2d21638c26..0000000000
--- a/amber/src/main/python/proto/org/apache/texera/amber/core/__init__.py
+++ /dev/null
@@ -1,146 +0,0 @@
-# Generated by the protocol buffer compiler.  DO NOT EDIT!
-# sources: org/apache/texera/amber/core/executor.proto, 
org/apache/texera/amber/core/virtualidentity.proto, 
org/apache/texera/amber/core/workflow.proto, 
org/apache/texera/amber/core/workflowruntimestate.proto
-# plugin: python-betterproto
-# This file has been @generated
-
-from dataclasses import dataclass
-from datetime import datetime
-from typing import (
-    List,
-)
-
-import betterproto
-
-
-class OutputPortOutputMode(betterproto.Enum):
-    SET_SNAPSHOT = 0
-    """outputs complete result set snapshot for each update"""
-
-    SET_DELTA = 1
-    """outputs incremental result set delta for each update"""
-
-    SINGLE_SNAPSHOT = 2
-    """
-    outputs a single snapshot for the entire execution,
-     used explicitly to support visualization operators that may exceed the 
memory limit
-     TODO: remove this mode after we have a better solution for output size 
limit
-    """
-
-
-class FatalErrorType(betterproto.Enum):
-    COMPILATION_ERROR = 0
-    EXECUTION_FAILURE = 1
-
-
-@dataclass(eq=False, repr=False)
-class WorkflowIdentity(betterproto.Message):
-    id: int = betterproto.int64_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class ExecutionIdentity(betterproto.Message):
-    id: int = betterproto.int64_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class ActorVirtualIdentity(betterproto.Message):
-    name: str = betterproto.string_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class ChannelIdentity(betterproto.Message):
-    from_worker_id: "ActorVirtualIdentity" = betterproto.message_field(1)
-    to_worker_id: "ActorVirtualIdentity" = betterproto.message_field(2)
-    is_control: bool = betterproto.bool_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class OperatorIdentity(betterproto.Message):
-    id: str = betterproto.string_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class PhysicalOpIdentity(betterproto.Message):
-    logical_op_id: "OperatorIdentity" = betterproto.message_field(1)
-    layer_name: str = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class EmbeddedControlMessageIdentity(betterproto.Message):
-    id: str = betterproto.string_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class PortIdentity(betterproto.Message):
-    id: int = betterproto.int32_field(1)
-    internal: bool = betterproto.bool_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class GlobalPortIdentity(betterproto.Message):
-    op_id: "PhysicalOpIdentity" = betterproto.message_field(1)
-    port_id: "PortIdentity" = betterproto.message_field(2)
-    input: bool = betterproto.bool_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class InputPort(betterproto.Message):
-    id: "PortIdentity" = betterproto.message_field(1)
-    display_name: str = betterproto.string_field(2)
-    disallow_multi_links: bool = betterproto.bool_field(3)
-    dependencies: List["PortIdentity"] = betterproto.message_field(4)
-
-
-@dataclass(eq=False, repr=False)
-class OutputPort(betterproto.Message):
-    id: "PortIdentity" = betterproto.message_field(1)
-    display_name: str = betterproto.string_field(2)
-    blocking: bool = betterproto.bool_field(3)
-    mode: "OutputPortOutputMode" = betterproto.enum_field(4)
-
-
-@dataclass(eq=False, repr=False)
-class PhysicalLink(betterproto.Message):
-    from_op_id: "PhysicalOpIdentity" = betterproto.message_field(1)
-    from_port_id: "PortIdentity" = betterproto.message_field(2)
-    to_op_id: "PhysicalOpIdentity" = betterproto.message_field(3)
-    to_port_id: "PortIdentity" = betterproto.message_field(4)
-
-
-@dataclass(eq=False, repr=False)
-class OpExecWithCode(betterproto.Message):
-    code: str = betterproto.string_field(1)
-    language: str = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class OpExecWithClassName(betterproto.Message):
-    class_name: str = betterproto.string_field(1)
-    desc_string: str = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class OpExecSource(betterproto.Message):
-    storage_key: str = betterproto.string_field(1)
-    workflow_identity: "WorkflowIdentity" = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class OpExecInitInfo(betterproto.Message):
-    op_exec_with_class_name: "OpExecWithClassName" = betterproto.message_field(
-        1, group="sealed_value"
-    )
-    op_exec_with_code: "OpExecWithCode" = betterproto.message_field(
-        2, group="sealed_value"
-    )
-    op_exec_source: "OpExecSource" = betterproto.message_field(3, 
group="sealed_value")
-
-
-@dataclass(eq=False, repr=False)
-class WorkflowFatalError(betterproto.Message):
-    type: "FatalErrorType" = betterproto.enum_field(1)
-    timestamp: datetime = betterproto.message_field(2)
-    message: str = betterproto.string_field(3)
-    details: str = betterproto.string_field(4)
-    operator_id: str = betterproto.string_field(5)
-    worker_id: str = betterproto.string_field(6)
diff --git 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/__init__.py 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/__init__.py
 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/__init__.py
deleted file mode 100644
index e69de29bb2..0000000000
diff --git 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
deleted file mode 100644
index 2bad2b0bfb..0000000000
--- 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py
+++ /dev/null
@@ -1,2204 +0,0 @@
-# Generated by the protocol buffer compiler.  DO NOT EDIT!
-# sources: 
org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto, 
org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto, 
org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto, 
org/apache/texera/amber/engine/architecture/rpc/testerservice.proto, 
org/apache/texera/amber/engine/architecture/rpc/workerservice.proto
-# plugin: python-betterproto
-# This file has been @generated
-
-from dataclasses import dataclass
-from datetime import datetime
-from typing import (
-    TYPE_CHECKING,
-    Dict,
-    List,
-    Optional,
-)
-
-import betterproto
-import grpclib
-from betterproto.grpc.grpclib_server import ServiceBase
-
-from .... import core as ___core__
-from .. import (
-    sendsemantics as _sendsemantics__,
-    worker as _worker__,
-)
-
-
-if TYPE_CHECKING:
-    import grpclib.server
-    from betterproto.grpc.grpclib_client import MetadataLike
-    from grpclib.metadata import Deadline
-
-
-class EmbeddedControlMessageType(betterproto.Enum):
-    ALL_ALIGNMENT = 0
-    NO_ALIGNMENT = 1
-    PORT_ALIGNMENT = 2
-
-
-class ConsoleMessageType(betterproto.Enum):
-    PRINT = 0
-    ERROR = 1
-    COMMAND = 2
-    DEBUGGER = 3
-
-
-class StatisticsUpdateTarget(betterproto.Enum):
-    BOTH_UI_AND_PERSISTENCE = 0
-    UI_ONLY = 1
-    PERSISTENCE_ONLY = 2
-
-
-class ErrorLanguage(betterproto.Enum):
-    PYTHON = 0
-    SCALA = 1
-
-
-class WorkflowAggregatedState(betterproto.Enum):
-    UNINITIALIZED = 0
-    READY = 1
-    RUNNING = 2
-    PAUSING = 3
-    PAUSED = 4
-    RESUMING = 5
-    COMPLETED = 6
-    FAILED = 7
-    UNKNOWN = 8
-    KILLED = 9
-    TERMINATED = 10
-
-
-@dataclass(eq=False, repr=False)
-class ControlRequest(betterproto.Message):
-    propagate_embedded_control_message_request: (
-        "PropagateEmbeddedControlMessageRequest"
-    ) = betterproto.message_field(1, group="sealed_value")
-    """request for controller"""
-
-    take_global_checkpoint_request: "TakeGlobalCheckpointRequest" = (
-        betterproto.message_field(2, group="sealed_value")
-    )
-    debug_command_request: "DebugCommandRequest" = betterproto.message_field(
-        3, group="sealed_value"
-    )
-    evaluate_python_expression_request: "EvaluatePythonExpressionRequest" = (
-        betterproto.message_field(4, group="sealed_value")
-    )
-    retry_workflow_request: "RetryWorkflowRequest" = betterproto.message_field(
-        5, group="sealed_value"
-    )
-    console_message_triggered_request: "ConsoleMessageTriggeredRequest" = (
-        betterproto.message_field(6, group="sealed_value")
-    )
-    port_completed_request: "PortCompletedRequest" = betterproto.message_field(
-        7, group="sealed_value"
-    )
-    worker_state_updated_request: "WorkerStateUpdatedRequest" = (
-        betterproto.message_field(8, group="sealed_value")
-    )
-    link_workers_request: "LinkWorkersRequest" = betterproto.message_field(
-        9, group="sealed_value"
-    )
-    workflow_reconfigure_request: "WorkflowReconfigureRequest" = (
-        betterproto.message_field(10, group="sealed_value")
-    )
-    jump_to_operator_region_request: "JumpToOperatorRegionRequest" = 
betterproto.message_field(
-        11, group="sealed_value"
-    )
-    add_input_channel_request: "AddInputChannelRequest" = 
betterproto.message_field(
-        50, group="sealed_value"
-    )
-    """request for worker"""
-
-    add_partitioning_request: "AddPartitioningRequest" = 
betterproto.message_field(
-        51, group="sealed_value"
-    )
-    assign_port_request: "AssignPortRequest" = betterproto.message_field(
-        52, group="sealed_value"
-    )
-    finalize_checkpoint_request: "FinalizeCheckpointRequest" = (
-        betterproto.message_field(53, group="sealed_value")
-    )
-    initialize_executor_request: "InitializeExecutorRequest" = (
-        betterproto.message_field(54, group="sealed_value")
-    )
-    update_executor_request: "UpdateExecutorRequest" = 
betterproto.message_field(
-        55, group="sealed_value"
-    )
-    empty_request: "EmptyRequest" = betterproto.message_field(56, 
group="sealed_value")
-    prepare_checkpoint_request: "PrepareCheckpointRequest" = 
betterproto.message_field(
-        57, group="sealed_value"
-    )
-    query_statistics_request: "QueryStatisticsRequest" = 
betterproto.message_field(
-        58, group="sealed_value"
-    )
-    ping: "Ping" = betterproto.message_field(100, group="sealed_value")
-    """request for testing"""
-
-    pong: "Pong" = betterproto.message_field(101, group="sealed_value")
-    nested: "Nested" = betterproto.message_field(102, group="sealed_value")
-    pass_: "Pass" = betterproto.message_field(103, group="sealed_value")
-    error_command: "ErrorCommand" = betterproto.message_field(104, 
group="sealed_value")
-    recursion: "Recursion" = betterproto.message_field(105, 
group="sealed_value")
-    collect: "Collect" = betterproto.message_field(106, group="sealed_value")
-    generate_number: "GenerateNumber" = betterproto.message_field(
-        107, group="sealed_value"
-    )
-    multi_call: "MultiCall" = betterproto.message_field(108, 
group="sealed_value")
-    chain: "Chain" = betterproto.message_field(109, group="sealed_value")
-
-
-@dataclass(eq=False, repr=False)
-class EmptyRequest(betterproto.Message):
-    pass
-
-
-@dataclass(eq=False, repr=False)
-class AsyncRpcContext(betterproto.Message):
-    sender: "___core__.ActorVirtualIdentity" = betterproto.message_field(1)
-    receiver: "___core__.ActorVirtualIdentity" = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class ControlInvocation(betterproto.Message):
-    method_name: str = betterproto.string_field(1)
-    command: "ControlRequest" = betterproto.message_field(2)
-    context: "AsyncRpcContext" = betterproto.message_field(3)
-    command_id: int = betterproto.int64_field(4)
-
-
-@dataclass(eq=False, repr=False)
-class EmbeddedControlMessage(betterproto.Message):
-    id: "___core__.EmbeddedControlMessageIdentity" = 
betterproto.message_field(1)
-    ecm_type: "EmbeddedControlMessageType" = betterproto.enum_field(2)
-    scope: List["___core__.ChannelIdentity"] = betterproto.message_field(3)
-    command_mapping: Dict[str, "ControlInvocation"] = betterproto.map_field(
-        4, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
-    )
-
-
-@dataclass(eq=False, repr=False)
-class PropagateEmbeddedControlMessageRequest(betterproto.Message):
-    source_op_to_start_prop: List["___core__.PhysicalOpIdentity"] = (
-        betterproto.message_field(1)
-    )
-    id: "___core__.EmbeddedControlMessageIdentity" = 
betterproto.message_field(2)
-    ecm_type: "EmbeddedControlMessageType" = betterproto.enum_field(3)
-    scope: List["___core__.PhysicalOpIdentity"] = betterproto.message_field(4)
-    target_ops: List["___core__.PhysicalOpIdentity"] = 
betterproto.message_field(5)
-    command: "ControlRequest" = betterproto.message_field(6)
-    method_name: str = betterproto.string_field(7)
-
-
-@dataclass(eq=False, repr=False)
-class TakeGlobalCheckpointRequest(betterproto.Message):
-    estimation_only: bool = betterproto.bool_field(1)
-    checkpoint_id: "___core__.EmbeddedControlMessageIdentity" = (
-        betterproto.message_field(2)
-    )
-    destination: str = betterproto.string_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class WorkflowReconfigureRequest(betterproto.Message):
-    reconfiguration: List["UpdateExecutorRequest"] = 
betterproto.message_field(1)
-    reconfiguration_id: str = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class DebugCommandRequest(betterproto.Message):
-    worker_id: str = betterproto.string_field(1)
-    cmd: str = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class EvaluatePythonExpressionRequest(betterproto.Message):
-    expression: str = betterproto.string_field(1)
-    operator_id: str = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class RetryWorkflowRequest(betterproto.Message):
-    workers: List["___core__.ActorVirtualIdentity"] = 
betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class ConsoleMessage(betterproto.Message):
-    worker_id: str = betterproto.string_field(1)
-    timestamp: datetime = betterproto.message_field(2)
-    msg_type: "ConsoleMessageType" = betterproto.enum_field(3)
-    source: str = betterproto.string_field(4)
-    title: str = betterproto.string_field(5)
-    message: str = betterproto.string_field(6)
-
-
-@dataclass(eq=False, repr=False)
-class ConsoleMessageTriggeredRequest(betterproto.Message):
-    console_message: "ConsoleMessage" = betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class PortCompletedRequest(betterproto.Message):
-    port_id: "___core__.PortIdentity" = betterproto.message_field(1)
-    input: bool = betterproto.bool_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class WorkerStateUpdatedRequest(betterproto.Message):
-    state: "_worker__.WorkerState" = betterproto.enum_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class LinkWorkersRequest(betterproto.Message):
-    link: "___core__.PhysicalLink" = betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class Ping(betterproto.Message):
-    """Ping message"""
-
-    i: int = betterproto.int32_field(1)
-    end: int = betterproto.int32_field(2)
-    to: "___core__.ActorVirtualIdentity" = betterproto.message_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class Pong(betterproto.Message):
-    """Pong message"""
-
-    i: int = betterproto.int32_field(1)
-    end: int = betterproto.int32_field(2)
-    to: "___core__.ActorVirtualIdentity" = betterproto.message_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class Pass(betterproto.Message):
-    """Pass message"""
-
-    value: str = betterproto.string_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class Nested(betterproto.Message):
-    """Nested message"""
-
-    k: int = betterproto.int32_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class MultiCall(betterproto.Message):
-    """MultiCall message"""
-
-    seq: List["___core__.ActorVirtualIdentity"] = betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class ErrorCommand(betterproto.Message):
-    """ErrorCommand message"""
-
-    pass
-
-
-@dataclass(eq=False, repr=False)
-class Collect(betterproto.Message):
-    """Collect message"""
-
-    workers: List["___core__.ActorVirtualIdentity"] = 
betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class GenerateNumber(betterproto.Message):
-    """GenerateNumber message"""
-
-    pass
-
-
-@dataclass(eq=False, repr=False)
-class Chain(betterproto.Message):
-    """Chain message"""
-
-    nexts: List["___core__.ActorVirtualIdentity"] = 
betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class Recursion(betterproto.Message):
-    """Recursion message"""
-
-    i: int = betterproto.int32_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class AddInputChannelRequest(betterproto.Message):
-    """Messages for the commands"""
-
-    channel_id: "___core__.ChannelIdentity" = betterproto.message_field(1)
-    port_id: "___core__.PortIdentity" = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class AddPartitioningRequest(betterproto.Message):
-    tag: "___core__.PhysicalLink" = betterproto.message_field(1)
-    partitioning: "_sendsemantics__.Partitioning" = 
betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class AssignPortRequest(betterproto.Message):
-    port_id: "___core__.PortIdentity" = betterproto.message_field(1)
-    input: bool = betterproto.bool_field(2)
-    schema: Dict[str, str] = betterproto.map_field(
-        3, betterproto.TYPE_STRING, betterproto.TYPE_STRING
-    )
-    storage_uris: List[str] = betterproto.string_field(4)
-    partitionings: List["_sendsemantics__.Partitioning"] = 
betterproto.message_field(5)
-
-
-@dataclass(eq=False, repr=False)
-class FinalizeCheckpointRequest(betterproto.Message):
-    checkpoint_id: "___core__.EmbeddedControlMessageIdentity" = (
-        betterproto.message_field(1)
-    )
-    write_to: str = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class InitializeExecutorRequest(betterproto.Message):
-    total_worker_count: int = betterproto.int32_field(1)
-    op_exec_init_info: "___core__.OpExecInitInfo" = 
betterproto.message_field(2)
-    is_source: bool = betterproto.bool_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class UpdateExecutorRequest(betterproto.Message):
-    target_op_id: "___core__.PhysicalOpIdentity" = betterproto.message_field(1)
-    new_exec_init_info: "___core__.OpExecInitInfo" = 
betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class PrepareCheckpointRequest(betterproto.Message):
-    checkpoint_id: "___core__.EmbeddedControlMessageIdentity" = (
-        betterproto.message_field(1)
-    )
-    estimation_only: bool = betterproto.bool_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class QueryStatisticsRequest(betterproto.Message):
-    filter_by_workers: List["___core__.ActorVirtualIdentity"] = (
-        betterproto.message_field(1)
-    )
-    update_target: "StatisticsUpdateTarget" = betterproto.enum_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class JumpToOperatorRegionRequest(betterproto.Message):
-    target_operator_id: "___core__.OperatorIdentity" = 
betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class ControlReturn(betterproto.Message):
-    """The generic return message"""
-
-    retrieve_workflow_state_response: "RetrieveWorkflowStateResponse" = (
-        betterproto.message_field(1, group="sealed_value")
-    )
-    """controller responses"""
-
-    propagate_embedded_control_message_response: (
-        "PropagateEmbeddedControlMessageResponse"
-    ) = betterproto.message_field(2, group="sealed_value")
-    take_global_checkpoint_response: "TakeGlobalCheckpointResponse" = (
-        betterproto.message_field(3, group="sealed_value")
-    )
-    evaluate_python_expression_response: "EvaluatePythonExpressionResponse" = (
-        betterproto.message_field(4, group="sealed_value")
-    )
-    start_workflow_response: "StartWorkflowResponse" = 
betterproto.message_field(
-        5, group="sealed_value"
-    )
-    worker_state_response: "WorkerStateResponse" = betterproto.message_field(
-        50, group="sealed_value"
-    )
-    """worker responses"""
-
-    worker_metrics_response: "WorkerMetricsResponse" = 
betterproto.message_field(
-        51, group="sealed_value"
-    )
-    finalize_checkpoint_response: "FinalizeCheckpointResponse" = (
-        betterproto.message_field(52, group="sealed_value")
-    )
-    control_error: "ControlError" = betterproto.message_field(101, 
group="sealed_value")
-    """common responses"""
-
-    empty_return: "EmptyReturn" = betterproto.message_field(102, 
group="sealed_value")
-    string_response: "StringResponse" = betterproto.message_field(
-        103, group="sealed_value"
-    )
-    int_response: "IntResponse" = betterproto.message_field(104, 
group="sealed_value")
-
-
-@dataclass(eq=False, repr=False)
-class EmptyReturn(betterproto.Message):
-    pass
-
-
-@dataclass(eq=False, repr=False)
-class ControlError(betterproto.Message):
-    error_message: str = betterproto.string_field(1)
-    error_details: str = betterproto.string_field(2)
-    stack_trace: str = betterproto.string_field(3)
-    language: "ErrorLanguage" = betterproto.enum_field(4)
-
-
-@dataclass(eq=False, repr=False)
-class ReturnInvocation(betterproto.Message):
-    command_id: int = betterproto.int64_field(1)
-    return_value: "ControlReturn" = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class StringResponse(betterproto.Message):
-    value: str = betterproto.string_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class IntResponse(betterproto.Message):
-    value: int = betterproto.int32_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class RetrieveWorkflowStateResponse(betterproto.Message):
-    state: Dict[str, str] = betterproto.map_field(
-        1, betterproto.TYPE_STRING, betterproto.TYPE_STRING
-    )
-
-
-@dataclass(eq=False, repr=False)
-class FinalizeCheckpointResponse(betterproto.Message):
-    size: int = betterproto.int64_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class PropagateEmbeddedControlMessageResponse(betterproto.Message):
-    returns: Dict[str, "ControlReturn"] = betterproto.map_field(
-        1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
-    )
-
-
-@dataclass(eq=False, repr=False)
-class TakeGlobalCheckpointResponse(betterproto.Message):
-    total_size: int = betterproto.int64_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class TypedValue(betterproto.Message):
-    expression: str = betterproto.string_field(1)
-    value_ref: str = betterproto.string_field(2)
-    value_str: str = betterproto.string_field(3)
-    value_type: str = betterproto.string_field(4)
-    expandable: bool = betterproto.bool_field(5)
-
-
-@dataclass(eq=False, repr=False)
-class EvaluatedValue(betterproto.Message):
-    value: "TypedValue" = betterproto.message_field(1)
-    attributes: List["TypedValue"] = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class EvaluatePythonExpressionResponse(betterproto.Message):
-    values: List["EvaluatedValue"] = betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class StartWorkflowResponse(betterproto.Message):
-    workflow_state: "WorkflowAggregatedState" = betterproto.enum_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class WorkerStateResponse(betterproto.Message):
-    state: "_worker__.WorkerState" = betterproto.enum_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class WorkerMetricsResponse(betterproto.Message):
-    metrics: "_worker__.WorkerMetrics" = betterproto.message_field(1)
-
-
-class RpcTesterStub(betterproto.ServiceStub):
-    async def send_ping(
-        self,
-        ping: "Ping",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "IntResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing",
-            ping,
-            IntResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def send_pong(
-        self,
-        pong: "Pong",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "IntResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong",
-            pong,
-            IntResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def send_nested(
-        self,
-        nested: "Nested",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested",
-            nested,
-            StringResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def send_pass(
-        self,
-        pass_: "Pass",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass",
-            pass_,
-            StringResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def send_error_command(
-        self,
-        error_command: "ErrorCommand",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand",
-            error_command,
-            StringResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def send_recursion(
-        self,
-        recursion: "Recursion",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion",
-            recursion,
-            StringResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def send_collect(
-        self,
-        collect: "Collect",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect",
-            collect,
-            StringResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def send_generate_number(
-        self,
-        generate_number: "GenerateNumber",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "IntResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber",
-            generate_number,
-            IntResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def send_multi_call(
-        self,
-        multi_call: "MultiCall",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall",
-            multi_call,
-            StringResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def send_chain(
-        self,
-        chain: "Chain",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "StringResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain",
-            chain,
-            StringResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-
-class WorkerServiceStub(betterproto.ServiceStub):
-    async def add_input_channel(
-        self,
-        add_input_channel_request: "AddInputChannelRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel",
-            add_input_channel_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def add_partitioning(
-        self,
-        add_partitioning_request: "AddPartitioningRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning",
-            add_partitioning_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def assign_port(
-        self,
-        assign_port_request: "AssignPortRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort",
-            assign_port_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def finalize_checkpoint(
-        self,
-        finalize_checkpoint_request: "FinalizeCheckpointRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "FinalizeCheckpointResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint",
-            finalize_checkpoint_request,
-            FinalizeCheckpointResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def flush_network_buffer(
-        self,
-        empty_request: "EmptyRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer",
-            empty_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def initialize_executor(
-        self,
-        initialize_executor_request: "InitializeExecutorRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor",
-            initialize_executor_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def open_executor(
-        self,
-        empty_request: "EmptyRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor",
-            empty_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def pause_worker(
-        self,
-        empty_request: "EmptyRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "WorkerStateResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker",
-            empty_request,
-            WorkerStateResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def prepare_checkpoint(
-        self,
-        prepare_checkpoint_request: "PrepareCheckpointRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint",
-            prepare_checkpoint_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def query_statistics(
-        self,
-        empty_request: "EmptyRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "WorkerMetricsResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics",
-            empty_request,
-            WorkerMetricsResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def resume_worker(
-        self,
-        empty_request: "EmptyRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "WorkerStateResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker",
-            empty_request,
-            WorkerStateResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def retrieve_state(
-        self,
-        empty_request: "EmptyRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState",
-            empty_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def retry_current_tuple(
-        self,
-        empty_request: "EmptyRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple",
-            empty_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def start_worker(
-        self,
-        empty_request: "EmptyRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "WorkerStateResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker",
-            empty_request,
-            WorkerStateResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def end_worker(
-        self,
-        empty_request: "EmptyRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker",
-            empty_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def start_channel(
-        self,
-        empty_request: "EmptyRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel",
-            empty_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def end_channel(
-        self,
-        empty_request: "EmptyRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel",
-            empty_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def debug_command(
-        self,
-        debug_command_request: "DebugCommandRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand",
-            debug_command_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def evaluate_python_expression(
-        self,
-        evaluate_python_expression_request: "EvaluatePythonExpressionRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EvaluatedValue":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression",
-            evaluate_python_expression_request,
-            EvaluatedValue,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def no_operation(
-        self,
-        empty_request: "EmptyRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation",
-            empty_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def update_executor(
-        self,
-        update_executor_request: "UpdateExecutorRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor",
-            update_executor_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-
-class ControllerServiceStub(betterproto.ServiceStub):
-    async def retrieve_workflow_state(
-        self,
-        empty_request: "EmptyRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "RetrieveWorkflowStateResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState",
-            empty_request,
-            RetrieveWorkflowStateResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def propagate_embedded_control_message(
-        self,
-        propagate_embedded_control_message_request: 
"PropagateEmbeddedControlMessageRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "PropagateEmbeddedControlMessageResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage",
-            propagate_embedded_control_message_request,
-            PropagateEmbeddedControlMessageResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def take_global_checkpoint(
-        self,
-        take_global_checkpoint_request: "TakeGlobalCheckpointRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "TakeGlobalCheckpointResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint",
-            take_global_checkpoint_request,
-            TakeGlobalCheckpointResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def debug_command(
-        self,
-        debug_command_request: "DebugCommandRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand",
-            debug_command_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def evaluate_python_expression(
-        self,
-        evaluate_python_expression_request: "EvaluatePythonExpressionRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EvaluatePythonExpressionResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression",
-            evaluate_python_expression_request,
-            EvaluatePythonExpressionResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def console_message_triggered(
-        self,
-        console_message_triggered_request: "ConsoleMessageTriggeredRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered",
-            console_message_triggered_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def port_completed(
-        self,
-        port_completed_request: "PortCompletedRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted",
-            port_completed_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def start_workflow(
-        self,
-        empty_request: "EmptyRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "StartWorkflowResponse":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow",
-            empty_request,
-            StartWorkflowResponse,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def resume_workflow(
-        self,
-        empty_request: "EmptyRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow",
-            empty_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def pause_workflow(
-        self,
-        empty_request: "EmptyRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow",
-            empty_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def worker_state_updated(
-        self,
-        worker_state_updated_request: "WorkerStateUpdatedRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated",
-            worker_state_updated_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def worker_execution_completed(
-        self,
-        empty_request: "EmptyRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted",
-            empty_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def jump_to_operator_region(
-        self,
-        jump_to_operator_region_request: "JumpToOperatorRegionRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/JumpToOperatorRegion",
-            jump_to_operator_region_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def link_workers(
-        self,
-        link_workers_request: "LinkWorkersRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers",
-            link_workers_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def controller_initiate_query_statistics(
-        self,
-        query_statistics_request: "QueryStatisticsRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics",
-            query_statistics_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def retry_workflow(
-        self,
-        retry_workflow_request: "RetryWorkflowRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow",
-            retry_workflow_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-    async def reconfigure_workflow(
-        self,
-        workflow_reconfigure_request: "WorkflowReconfigureRequest",
-        *,
-        timeout: Optional[float] = None,
-        deadline: Optional["Deadline"] = None,
-        metadata: Optional["MetadataLike"] = None
-    ) -> "EmptyReturn":
-        return await self._unary_unary(
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow",
-            workflow_reconfigure_request,
-            EmptyReturn,
-            timeout=timeout,
-            deadline=deadline,
-            metadata=metadata,
-        )
-
-
-class RpcTesterBase(ServiceBase):
-
-    async def send_ping(self, ping: "Ping") -> "IntResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def send_pong(self, pong: "Pong") -> "IntResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def send_nested(self, nested: "Nested") -> "StringResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def send_pass(self, pass_: "Pass") -> "StringResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def send_error_command(
-        self, error_command: "ErrorCommand"
-    ) -> "StringResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def send_recursion(self, recursion: "Recursion") -> "StringResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def send_collect(self, collect: "Collect") -> "StringResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def send_generate_number(
-        self, generate_number: "GenerateNumber"
-    ) -> "IntResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def send_multi_call(self, multi_call: "MultiCall") -> 
"StringResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def send_chain(self, chain: "Chain") -> "StringResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def __rpc_send_ping(
-        self, stream: "grpclib.server.Stream[Ping, IntResponse]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.send_ping(request)
-        await stream.send_message(response)
-
-    async def __rpc_send_pong(
-        self, stream: "grpclib.server.Stream[Pong, IntResponse]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.send_pong(request)
-        await stream.send_message(response)
-
-    async def __rpc_send_nested(
-        self, stream: "grpclib.server.Stream[Nested, StringResponse]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.send_nested(request)
-        await stream.send_message(response)
-
-    async def __rpc_send_pass(
-        self, stream: "grpclib.server.Stream[Pass, StringResponse]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.send_pass(request)
-        await stream.send_message(response)
-
-    async def __rpc_send_error_command(
-        self, stream: "grpclib.server.Stream[ErrorCommand, StringResponse]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.send_error_command(request)
-        await stream.send_message(response)
-
-    async def __rpc_send_recursion(
-        self, stream: "grpclib.server.Stream[Recursion, StringResponse]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.send_recursion(request)
-        await stream.send_message(response)
-
-    async def __rpc_send_collect(
-        self, stream: "grpclib.server.Stream[Collect, StringResponse]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.send_collect(request)
-        await stream.send_message(response)
-
-    async def __rpc_send_generate_number(
-        self, stream: "grpclib.server.Stream[GenerateNumber, IntResponse]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.send_generate_number(request)
-        await stream.send_message(response)
-
-    async def __rpc_send_multi_call(
-        self, stream: "grpclib.server.Stream[MultiCall, StringResponse]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.send_multi_call(request)
-        await stream.send_message(response)
-
-    async def __rpc_send_chain(
-        self, stream: "grpclib.server.Stream[Chain, StringResponse]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.send_chain(request)
-        await stream.send_message(response)
-
-    def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
-        return {
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPing": 
grpclib.const.Handler(
-                self.__rpc_send_ping,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                Ping,
-                IntResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPong": 
grpclib.const.Handler(
-                self.__rpc_send_pong,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                Pong,
-                IntResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendNested": 
grpclib.const.Handler(
-                self.__rpc_send_nested,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                Nested,
-                StringResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendPass": 
grpclib.const.Handler(
-                self.__rpc_send_pass,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                Pass,
-                StringResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendErrorCommand": 
grpclib.const.Handler(
-                self.__rpc_send_error_command,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                ErrorCommand,
-                StringResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendRecursion": 
grpclib.const.Handler(
-                self.__rpc_send_recursion,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                Recursion,
-                StringResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendCollect": 
grpclib.const.Handler(
-                self.__rpc_send_collect,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                Collect,
-                StringResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendGenerateNumber":
 grpclib.const.Handler(
-                self.__rpc_send_generate_number,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                GenerateNumber,
-                IntResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendMultiCall": 
grpclib.const.Handler(
-                self.__rpc_send_multi_call,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                MultiCall,
-                StringResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.RPCTester/SendChain": 
grpclib.const.Handler(
-                self.__rpc_send_chain,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                Chain,
-                StringResponse,
-            ),
-        }
-
-
-class WorkerServiceBase(ServiceBase):
-
-    async def add_input_channel(
-        self, add_input_channel_request: "AddInputChannelRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def add_partitioning(
-        self, add_partitioning_request: "AddPartitioningRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def assign_port(
-        self, assign_port_request: "AssignPortRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def finalize_checkpoint(
-        self, finalize_checkpoint_request: "FinalizeCheckpointRequest"
-    ) -> "FinalizeCheckpointResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def flush_network_buffer(
-        self, empty_request: "EmptyRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def initialize_executor(
-        self, initialize_executor_request: "InitializeExecutorRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def open_executor(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def pause_worker(
-        self, empty_request: "EmptyRequest"
-    ) -> "WorkerStateResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def prepare_checkpoint(
-        self, prepare_checkpoint_request: "PrepareCheckpointRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def query_statistics(
-        self, empty_request: "EmptyRequest"
-    ) -> "WorkerMetricsResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def resume_worker(
-        self, empty_request: "EmptyRequest"
-    ) -> "WorkerStateResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def retrieve_state(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def retry_current_tuple(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def start_worker(
-        self, empty_request: "EmptyRequest"
-    ) -> "WorkerStateResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def end_worker(self, empty_request: "EmptyRequest") -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def start_channel(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def end_channel(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def debug_command(
-        self, debug_command_request: "DebugCommandRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def evaluate_python_expression(
-        self, evaluate_python_expression_request: 
"EvaluatePythonExpressionRequest"
-    ) -> "EvaluatedValue":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def no_operation(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def update_executor(
-        self, update_executor_request: "UpdateExecutorRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def __rpc_add_input_channel(
-        self, stream: "grpclib.server.Stream[AddInputChannelRequest, 
EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.add_input_channel(request)
-        await stream.send_message(response)
-
-    async def __rpc_add_partitioning(
-        self, stream: "grpclib.server.Stream[AddPartitioningRequest, 
EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.add_partitioning(request)
-        await stream.send_message(response)
-
-    async def __rpc_assign_port(
-        self, stream: "grpclib.server.Stream[AssignPortRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.assign_port(request)
-        await stream.send_message(response)
-
-    async def __rpc_finalize_checkpoint(
-        self,
-        stream: "grpclib.server.Stream[FinalizeCheckpointRequest, 
FinalizeCheckpointResponse]",
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.finalize_checkpoint(request)
-        await stream.send_message(response)
-
-    async def __rpc_flush_network_buffer(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.flush_network_buffer(request)
-        await stream.send_message(response)
-
-    async def __rpc_initialize_executor(
-        self, stream: "grpclib.server.Stream[InitializeExecutorRequest, 
EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.initialize_executor(request)
-        await stream.send_message(response)
-
-    async def __rpc_open_executor(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.open_executor(request)
-        await stream.send_message(response)
-
-    async def __rpc_pause_worker(
-        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerStateResponse]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.pause_worker(request)
-        await stream.send_message(response)
-
-    async def __rpc_prepare_checkpoint(
-        self, stream: "grpclib.server.Stream[PrepareCheckpointRequest, 
EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.prepare_checkpoint(request)
-        await stream.send_message(response)
-
-    async def __rpc_query_statistics(
-        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerMetricsResponse]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.query_statistics(request)
-        await stream.send_message(response)
-
-    async def __rpc_resume_worker(
-        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerStateResponse]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.resume_worker(request)
-        await stream.send_message(response)
-
-    async def __rpc_retrieve_state(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.retrieve_state(request)
-        await stream.send_message(response)
-
-    async def __rpc_retry_current_tuple(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.retry_current_tuple(request)
-        await stream.send_message(response)
-
-    async def __rpc_start_worker(
-        self, stream: "grpclib.server.Stream[EmptyRequest, 
WorkerStateResponse]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.start_worker(request)
-        await stream.send_message(response)
-
-    async def __rpc_end_worker(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.end_worker(request)
-        await stream.send_message(response)
-
-    async def __rpc_start_channel(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.start_channel(request)
-        await stream.send_message(response)
-
-    async def __rpc_end_channel(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.end_channel(request)
-        await stream.send_message(response)
-
-    async def __rpc_debug_command(
-        self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.debug_command(request)
-        await stream.send_message(response)
-
-    async def __rpc_evaluate_python_expression(
-        self,
-        stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, 
EvaluatedValue]",
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.evaluate_python_expression(request)
-        await stream.send_message(response)
-
-    async def __rpc_no_operation(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.no_operation(request)
-        await stream.send_message(response)
-
-    async def __rpc_update_executor(
-        self, stream: "grpclib.server.Stream[UpdateExecutorRequest, 
EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.update_executor(request)
-        await stream.send_message(response)
-
-    def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
-        return {
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel":
 grpclib.const.Handler(
-                self.__rpc_add_input_channel,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                AddInputChannelRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddPartitioning":
 grpclib.const.Handler(
-                self.__rpc_add_partitioning,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                AddPartitioningRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AssignPort": 
grpclib.const.Handler(
-                self.__rpc_assign_port,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                AssignPortRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FinalizeCheckpoint":
 grpclib.const.Handler(
-                self.__rpc_finalize_checkpoint,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                FinalizeCheckpointRequest,
-                FinalizeCheckpointResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/FlushNetworkBuffer":
 grpclib.const.Handler(
-                self.__rpc_flush_network_buffer,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/InitializeExecutor":
 grpclib.const.Handler(
-                self.__rpc_initialize_executor,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                InitializeExecutorRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/OpenExecutor": 
grpclib.const.Handler(
-                self.__rpc_open_executor,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PauseWorker": 
grpclib.const.Handler(
-                self.__rpc_pause_worker,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                WorkerStateResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/PrepareCheckpoint":
 grpclib.const.Handler(
-                self.__rpc_prepare_checkpoint,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                PrepareCheckpointRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/QueryStatistics":
 grpclib.const.Handler(
-                self.__rpc_query_statistics,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                WorkerMetricsResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/ResumeWorker": 
grpclib.const.Handler(
-                self.__rpc_resume_worker,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                WorkerStateResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetrieveState": 
grpclib.const.Handler(
-                self.__rpc_retrieve_state,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/RetryCurrentTuple":
 grpclib.const.Handler(
-                self.__rpc_retry_current_tuple,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartWorker": 
grpclib.const.Handler(
-                self.__rpc_start_worker,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                WorkerStateResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndWorker": 
grpclib.const.Handler(
-                self.__rpc_end_worker,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/StartChannel": 
grpclib.const.Handler(
-                self.__rpc_start_channel,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EndChannel": 
grpclib.const.Handler(
-                self.__rpc_end_channel,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/DebugCommand": 
grpclib.const.Handler(
-                self.__rpc_debug_command,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                DebugCommandRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/EvaluatePythonExpression":
 grpclib.const.Handler(
-                self.__rpc_evaluate_python_expression,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EvaluatePythonExpressionRequest,
-                EvaluatedValue,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/NoOperation": 
grpclib.const.Handler(
-                self.__rpc_no_operation,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor":
 grpclib.const.Handler(
-                self.__rpc_update_executor,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                UpdateExecutorRequest,
-                EmptyReturn,
-            ),
-        }
-
-
-class ControllerServiceBase(ServiceBase):
-
-    async def retrieve_workflow_state(
-        self, empty_request: "EmptyRequest"
-    ) -> "RetrieveWorkflowStateResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def propagate_embedded_control_message(
-        self,
-        propagate_embedded_control_message_request: 
"PropagateEmbeddedControlMessageRequest",
-    ) -> "PropagateEmbeddedControlMessageResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def take_global_checkpoint(
-        self, take_global_checkpoint_request: "TakeGlobalCheckpointRequest"
-    ) -> "TakeGlobalCheckpointResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def debug_command(
-        self, debug_command_request: "DebugCommandRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def evaluate_python_expression(
-        self, evaluate_python_expression_request: 
"EvaluatePythonExpressionRequest"
-    ) -> "EvaluatePythonExpressionResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def console_message_triggered(
-        self, console_message_triggered_request: 
"ConsoleMessageTriggeredRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def port_completed(
-        self, port_completed_request: "PortCompletedRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def start_workflow(
-        self, empty_request: "EmptyRequest"
-    ) -> "StartWorkflowResponse":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def resume_workflow(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def pause_workflow(self, empty_request: "EmptyRequest") -> 
"EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def worker_state_updated(
-        self, worker_state_updated_request: "WorkerStateUpdatedRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def worker_execution_completed(
-        self, empty_request: "EmptyRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def jump_to_operator_region(
-        self, jump_to_operator_region_request: "JumpToOperatorRegionRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def link_workers(
-        self, link_workers_request: "LinkWorkersRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def controller_initiate_query_statistics(
-        self, query_statistics_request: "QueryStatisticsRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def retry_workflow(
-        self, retry_workflow_request: "RetryWorkflowRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def reconfigure_workflow(
-        self, workflow_reconfigure_request: "WorkflowReconfigureRequest"
-    ) -> "EmptyReturn":
-        raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
-
-    async def __rpc_retrieve_workflow_state(
-        self,
-        stream: "grpclib.server.Stream[EmptyRequest, 
RetrieveWorkflowStateResponse]",
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.retrieve_workflow_state(request)
-        await stream.send_message(response)
-
-    async def __rpc_propagate_embedded_control_message(
-        self,
-        stream: "grpclib.server.Stream[PropagateEmbeddedControlMessageRequest, 
PropagateEmbeddedControlMessageResponse]",
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.propagate_embedded_control_message(request)
-        await stream.send_message(response)
-
-    async def __rpc_take_global_checkpoint(
-        self,
-        stream: "grpclib.server.Stream[TakeGlobalCheckpointRequest, 
TakeGlobalCheckpointResponse]",
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.take_global_checkpoint(request)
-        await stream.send_message(response)
-
-    async def __rpc_debug_command(
-        self, stream: "grpclib.server.Stream[DebugCommandRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.debug_command(request)
-        await stream.send_message(response)
-
-    async def __rpc_evaluate_python_expression(
-        self,
-        stream: "grpclib.server.Stream[EvaluatePythonExpressionRequest, 
EvaluatePythonExpressionResponse]",
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.evaluate_python_expression(request)
-        await stream.send_message(response)
-
-    async def __rpc_console_message_triggered(
-        self,
-        stream: "grpclib.server.Stream[ConsoleMessageTriggeredRequest, 
EmptyReturn]",
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.console_message_triggered(request)
-        await stream.send_message(response)
-
-    async def __rpc_port_completed(
-        self, stream: "grpclib.server.Stream[PortCompletedRequest, 
EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.port_completed(request)
-        await stream.send_message(response)
-
-    async def __rpc_start_workflow(
-        self, stream: "grpclib.server.Stream[EmptyRequest, 
StartWorkflowResponse]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.start_workflow(request)
-        await stream.send_message(response)
-
-    async def __rpc_resume_workflow(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.resume_workflow(request)
-        await stream.send_message(response)
-
-    async def __rpc_pause_workflow(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.pause_workflow(request)
-        await stream.send_message(response)
-
-    async def __rpc_worker_state_updated(
-        self, stream: "grpclib.server.Stream[WorkerStateUpdatedRequest, 
EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.worker_state_updated(request)
-        await stream.send_message(response)
-
-    async def __rpc_worker_execution_completed(
-        self, stream: "grpclib.server.Stream[EmptyRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.worker_execution_completed(request)
-        await stream.send_message(response)
-
-    async def __rpc_jump_to_operator_region(
-        self, stream: "grpclib.server.Stream[JumpToOperatorRegionRequest, 
EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.jump_to_operator_region(request)
-        await stream.send_message(response)
-
-    async def __rpc_link_workers(
-        self, stream: "grpclib.server.Stream[LinkWorkersRequest, EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.link_workers(request)
-        await stream.send_message(response)
-
-    async def __rpc_controller_initiate_query_statistics(
-        self, stream: "grpclib.server.Stream[QueryStatisticsRequest, 
EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.controller_initiate_query_statistics(request)
-        await stream.send_message(response)
-
-    async def __rpc_retry_workflow(
-        self, stream: "grpclib.server.Stream[RetryWorkflowRequest, 
EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.retry_workflow(request)
-        await stream.send_message(response)
-
-    async def __rpc_reconfigure_workflow(
-        self, stream: "grpclib.server.Stream[WorkflowReconfigureRequest, 
EmptyReturn]"
-    ) -> None:
-        request = await stream.recv_message()
-        response = await self.reconfigure_workflow(request)
-        await stream.send_message(response)
-
-    def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
-        return {
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState":
 grpclib.const.Handler(
-                self.__rpc_retrieve_workflow_state,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                RetrieveWorkflowStateResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PropagateEmbeddedControlMessage":
 grpclib.const.Handler(
-                self.__rpc_propagate_embedded_control_message,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                PropagateEmbeddedControlMessageRequest,
-                PropagateEmbeddedControlMessageResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/TakeGlobalCheckpoint":
 grpclib.const.Handler(
-                self.__rpc_take_global_checkpoint,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                TakeGlobalCheckpointRequest,
-                TakeGlobalCheckpointResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/DebugCommand":
 grpclib.const.Handler(
-                self.__rpc_debug_command,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                DebugCommandRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/EvaluatePythonExpression":
 grpclib.const.Handler(
-                self.__rpc_evaluate_python_expression,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EvaluatePythonExpressionRequest,
-                EvaluatePythonExpressionResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ConsoleMessageTriggered":
 grpclib.const.Handler(
-                self.__rpc_console_message_triggered,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                ConsoleMessageTriggeredRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PortCompleted":
 grpclib.const.Handler(
-                self.__rpc_port_completed,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                PortCompletedRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/StartWorkflow":
 grpclib.const.Handler(
-                self.__rpc_start_workflow,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                StartWorkflowResponse,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ResumeWorkflow":
 grpclib.const.Handler(
-                self.__rpc_resume_workflow,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/PauseWorkflow":
 grpclib.const.Handler(
-                self.__rpc_pause_workflow,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerStateUpdated":
 grpclib.const.Handler(
-                self.__rpc_worker_state_updated,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                WorkerStateUpdatedRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/WorkerExecutionCompleted":
 grpclib.const.Handler(
-                self.__rpc_worker_execution_completed,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                EmptyRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/JumpToOperatorRegion":
 grpclib.const.Handler(
-                self.__rpc_jump_to_operator_region,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                JumpToOperatorRegionRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/LinkWorkers":
 grpclib.const.Handler(
-                self.__rpc_link_workers,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                LinkWorkersRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ControllerInitiateQueryStatistics":
 grpclib.const.Handler(
-                self.__rpc_controller_initiate_query_statistics,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                QueryStatisticsRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetryWorkflow":
 grpclib.const.Handler(
-                self.__rpc_retry_workflow,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                RetryWorkflowRequest,
-                EmptyReturn,
-            ),
-            
"/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow":
 grpclib.const.Handler(
-                self.__rpc_reconfigure_workflow,
-                grpclib.const.Cardinality.UNARY_UNARY,
-                WorkflowReconfigureRequest,
-                EmptyReturn,
-            ),
-        }
diff --git 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/sendsemantics/__init__.py
 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/sendsemantics/__init__.py
deleted file mode 100644
index bc241806b5..0000000000
--- 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/sendsemantics/__init__.py
+++ /dev/null
@@ -1,66 +0,0 @@
-# Generated by the protocol buffer compiler.  DO NOT EDIT!
-# sources: 
org/apache/texera/amber/engine/architecture/sendsemantics/partitionings.proto
-# plugin: python-betterproto
-# This file has been @generated
-
-from dataclasses import dataclass
-from typing import (
-    List,
-)
-
-import betterproto
-
-from .... import core as ___core__
-
-
-@dataclass(eq=False, repr=False)
-class Partitioning(betterproto.Message):
-    one_to_one_partitioning: "OneToOnePartitioning" = 
betterproto.message_field(
-        1, group="sealed_value"
-    )
-    round_robin_partitioning: "RoundRobinPartitioning" = 
betterproto.message_field(
-        2, group="sealed_value"
-    )
-    hash_based_shuffle_partitioning: "HashBasedShufflePartitioning" = (
-        betterproto.message_field(3, group="sealed_value")
-    )
-    range_based_shuffle_partitioning: "RangeBasedShufflePartitioning" = (
-        betterproto.message_field(4, group="sealed_value")
-    )
-    broadcast_partitioning: "BroadcastPartitioning" = 
betterproto.message_field(
-        5, group="sealed_value"
-    )
-
-
-@dataclass(eq=False, repr=False)
-class OneToOnePartitioning(betterproto.Message):
-    batch_size: int = betterproto.int32_field(1)
-    channels: List["___core__.ChannelIdentity"] = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class RoundRobinPartitioning(betterproto.Message):
-    batch_size: int = betterproto.int32_field(1)
-    channels: List["___core__.ChannelIdentity"] = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class HashBasedShufflePartitioning(betterproto.Message):
-    batch_size: int = betterproto.int32_field(1)
-    channels: List["___core__.ChannelIdentity"] = betterproto.message_field(2)
-    hash_attribute_names: List[str] = betterproto.string_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class RangeBasedShufflePartitioning(betterproto.Message):
-    batch_size: int = betterproto.int32_field(1)
-    channels: List["___core__.ChannelIdentity"] = betterproto.message_field(2)
-    range_attribute_names: List[str] = betterproto.string_field(3)
-    range_min: int = betterproto.int64_field(4)
-    range_max: int = betterproto.int64_field(5)
-
-
-@dataclass(eq=False, repr=False)
-class BroadcastPartitioning(betterproto.Message):
-    batch_size: int = betterproto.int32_field(1)
-    channels: List["___core__.ChannelIdentity"] = betterproto.message_field(2)
diff --git 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/worker/__init__.py
 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/worker/__init__.py
deleted file mode 100644
index 6a7b210e18..0000000000
--- 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/worker/__init__.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# Generated by the protocol buffer compiler.  DO NOT EDIT!
-# sources: org/apache/texera/amber/engine/architecture/worker/statistics.proto
-# plugin: python-betterproto
-# This file has been @generated
-
-from dataclasses import dataclass
-from typing import (
-    List,
-)
-
-import betterproto
-
-from .... import core as ___core__
-
-
-class WorkerState(betterproto.Enum):
-    UNINITIALIZED = 0
-    READY = 1
-    RUNNING = 2
-    PAUSED = 3
-    COMPLETED = 4
-    TERMINATED = 5
-
-
-@dataclass(eq=False, repr=False)
-class PortTupleMetricsMapping(betterproto.Message):
-    port_id: "___core__.PortIdentity" = betterproto.message_field(1)
-    tuple_metrics: "TupleMetrics" = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class TupleMetrics(betterproto.Message):
-    count: int = betterproto.int64_field(1)
-    size: int = betterproto.int64_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class WorkerStatistics(betterproto.Message):
-    input_tuple_metrics: List["PortTupleMetricsMapping"] = 
betterproto.message_field(1)
-    output_tuple_metrics: List["PortTupleMetricsMapping"] = 
betterproto.message_field(2)
-    data_processing_time: int = betterproto.int64_field(3)
-    control_processing_time: int = betterproto.int64_field(4)
-    idle_time: int = betterproto.int64_field(5)
-
-
-@dataclass(eq=False, repr=False)
-class WorkerMetrics(betterproto.Message):
-    worker_state: "WorkerState" = betterproto.enum_field(1)
-    worker_statistics: "WorkerStatistics" = betterproto.message_field(2)
diff --git 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py 
b/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py
deleted file mode 100644
index 55c789aa39..0000000000
--- 
a/amber/src/main/python/proto/org/apache/texera/amber/engine/common/__init__.py
+++ /dev/null
@@ -1,156 +0,0 @@
-# Generated by the protocol buffer compiler.  DO NOT EDIT!
-# sources: org/apache/texera/amber/engine/common/actormessage.proto, 
org/apache/texera/amber/engine/common/ambermessage.proto, 
org/apache/texera/amber/engine/common/executionruntimestate.proto
-# plugin: python-betterproto
-# This file has been @generated
-
-from dataclasses import dataclass
-from typing import (
-    Dict,
-    List,
-)
-
-import betterproto
-
-from ... import core as __core__
-from ..architecture import (
-    rpc as _architecture_rpc__,
-    worker as _architecture_worker__,
-)
-
-
-@dataclass(eq=False, repr=False)
-class DirectControlMessagePayloadV2(betterproto.Message):
-    control_invocation: "_architecture_rpc__.ControlInvocation" = (
-        betterproto.message_field(1, group="value")
-    )
-    return_invocation: "_architecture_rpc__.ReturnInvocation" = (
-        betterproto.message_field(2, group="value")
-    )
-
-
-@dataclass(eq=False, repr=False)
-class PythonDataHeader(betterproto.Message):
-    tag: "__core__.ChannelIdentity" = betterproto.message_field(1)
-    payload_type: str = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class PythonControlMessage(betterproto.Message):
-    tag: "__core__.ChannelIdentity" = betterproto.message_field(1)
-    payload: "DirectControlMessagePayloadV2" = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class BreakpointFault(betterproto.Message):
-    worker_name: str = betterproto.string_field(1)
-    faulted_tuple: "BreakpointFaultBreakpointTuple" = 
betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class BreakpointFaultBreakpointTuple(betterproto.Message):
-    id: int = betterproto.int64_field(1)
-    is_input: bool = betterproto.bool_field(2)
-    tuple: List[str] = betterproto.string_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class OperatorBreakpoints(betterproto.Message):
-    unresolved_breakpoints: List["BreakpointFault"] = 
betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class ExecutionBreakpointStore(betterproto.Message):
-    operator_info: Dict[str, "OperatorBreakpoints"] = betterproto.map_field(
-        1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
-    )
-
-
-@dataclass(eq=False, repr=False)
-class EvaluatedValueList(betterproto.Message):
-    values: List["_architecture_rpc__.EvaluatedValue"] = 
betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class OperatorConsole(betterproto.Message):
-    console_messages: List["_architecture_rpc__.ConsoleMessage"] = (
-        betterproto.message_field(1)
-    )
-    evaluate_expr_results: Dict[str, "EvaluatedValueList"] = 
betterproto.map_field(
-        2, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
-    )
-
-
-@dataclass(eq=False, repr=False)
-class ExecutionConsoleStore(betterproto.Message):
-    operator_console: Dict[str, "OperatorConsole"] = betterproto.map_field(
-        1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
-    )
-
-
-@dataclass(eq=False, repr=False)
-class OperatorWorkerMapping(betterproto.Message):
-    operator_id: str = betterproto.string_field(1)
-    worker_ids: List[str] = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class OperatorStatistics(betterproto.Message):
-    input_metrics: List["_architecture_worker__.PortTupleMetricsMapping"] = (
-        betterproto.message_field(1)
-    )
-    output_metrics: List["_architecture_worker__.PortTupleMetricsMapping"] = (
-        betterproto.message_field(2)
-    )
-    num_workers: int = betterproto.int32_field(3)
-    data_processing_time: int = betterproto.int64_field(4)
-    control_processing_time: int = betterproto.int64_field(5)
-    idle_time: int = betterproto.int64_field(6)
-
-
-@dataclass(eq=False, repr=False)
-class OperatorMetrics(betterproto.Message):
-    operator_state: "_architecture_rpc__.WorkflowAggregatedState" = (
-        betterproto.enum_field(1)
-    )
-    operator_statistics: "OperatorStatistics" = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class ExecutionStatsStore(betterproto.Message):
-    start_time_stamp: int = betterproto.int64_field(1)
-    end_time_stamp: int = betterproto.int64_field(2)
-    operator_info: Dict[str, "OperatorMetrics"] = betterproto.map_field(
-        3, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
-    )
-    operator_worker_mapping: List["OperatorWorkerMapping"] = 
betterproto.message_field(
-        4
-    )
-
-
-@dataclass(eq=False, repr=False)
-class ExecutionMetadataStore(betterproto.Message):
-    state: "_architecture_rpc__.WorkflowAggregatedState" = 
betterproto.enum_field(1)
-    fatal_errors: List["__core__.WorkflowFatalError"] = 
betterproto.message_field(2)
-    execution_id: "__core__.ExecutionIdentity" = betterproto.message_field(3)
-    is_recovering: bool = betterproto.bool_field(4)
-
-
-@dataclass(eq=False, repr=False)
-class Backpressure(betterproto.Message):
-    enable_backpressure: bool = betterproto.bool_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class CreditUpdate(betterproto.Message):
-    pass
-
-
-@dataclass(eq=False, repr=False)
-class ActorCommand(betterproto.Message):
-    backpressure: "Backpressure" = betterproto.message_field(1, 
group="sealed_value")
-    credit_update: "CreditUpdate" = betterproto.message_field(2, 
group="sealed_value")
-
-
-@dataclass(eq=False, repr=False)
-class PythonActorMessage(betterproto.Message):
-    payload: "ActorCommand" = betterproto.message_field(1)
diff --git a/amber/src/main/python/proto/org/apache/texera/web/__init__.py 
b/amber/src/main/python/proto/org/apache/texera/web/__init__.py
deleted file mode 100644
index adb5848bb0..0000000000
--- a/amber/src/main/python/proto/org/apache/texera/web/__init__.py
+++ /dev/null
@@ -1,158 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Generated by the protocol buffer compiler.  DO NOT EDIT!
-# sources: org/apache/texera/workflowruntimestate.proto
-# plugin: python-betterproto
-# This file has been @generated
-
-from dataclasses import dataclass
-from datetime import datetime
-from typing import (
-    Dict,
-    List,
-)
-
-import betterproto
-
-from ...amber.engine import common as __amber_engine_common__
-from ...amber.engine.architecture import worker as 
__amber_engine_architecture_worker__
-
-
-class FatalErrorType(betterproto.Enum):
-    COMPILATION_ERROR = 0
-    EXECUTION_FAILURE = 1
-
-
-class WorkflowAggregatedState(betterproto.Enum):
-    UNINITIALIZED = 0
-    READY = 1
-    RUNNING = 2
-    PAUSING = 3
-    PAUSED = 4
-    RESUMING = 5
-    COMPLETED = 6
-    FAILED = 7
-    UNKNOWN = 8
-    KILLED = 9
-
-
-@dataclass(eq=False, repr=False)
-class BreakpointFault(betterproto.Message):
-    worker_name: str = betterproto.string_field(1)
-    faulted_tuple: "BreakpointFaultBreakpointTuple" = 
betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class BreakpointFaultBreakpointTuple(betterproto.Message):
-    id: int = betterproto.int64_field(1)
-    is_input: bool = betterproto.bool_field(2)
-    tuple: List[str] = betterproto.string_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class OperatorBreakpoints(betterproto.Message):
-    unresolved_breakpoints: List["BreakpointFault"] = 
betterproto.message_field(1)
-
-
-@dataclass(eq=False, repr=False)
-class ExecutionBreakpointStore(betterproto.Message):
-    operator_info: Dict[str, "OperatorBreakpoints"] = betterproto.map_field(
-        1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
-    )
-
-
-@dataclass(eq=False, repr=False)
-class EvaluatedValueList(betterproto.Message):
-    values: List["__amber_engine_architecture_worker__.EvaluatedValue"] = (
-        betterproto.message_field(1)
-    )
-
-
-@dataclass(eq=False, repr=False)
-class OperatorConsole(betterproto.Message):
-    console_messages: 
List["__amber_engine_architecture_worker__.ConsoleMessage"] = (
-        betterproto.message_field(1)
-    )
-    evaluate_expr_results: Dict[str, "EvaluatedValueList"] = 
betterproto.map_field(
-        2, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
-    )
-
-
-@dataclass(eq=False, repr=False)
-class ExecutionConsoleStore(betterproto.Message):
-    operator_console: Dict[str, "OperatorConsole"] = betterproto.map_field(
-        1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
-    )
-
-
-@dataclass(eq=False, repr=False)
-class OperatorWorkerMapping(betterproto.Message):
-    operator_id: str = betterproto.string_field(1)
-    worker_ids: List[str] = betterproto.string_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class OperatorStatistics(betterproto.Message):
-    input_count: 
List["__amber_engine_architecture_worker__.PortTupleCountMapping"] = (
-        betterproto.message_field(1)
-    )
-    output_count: 
List["__amber_engine_architecture_worker__.PortTupleCountMapping"] = (
-        betterproto.message_field(2)
-    )
-    num_workers: int = betterproto.int32_field(3)
-    data_processing_time: int = betterproto.int64_field(4)
-    control_processing_time: int = betterproto.int64_field(5)
-    idle_time: int = betterproto.int64_field(6)
-
-
-@dataclass(eq=False, repr=False)
-class OperatorMetrics(betterproto.Message):
-    operator_state: "WorkflowAggregatedState" = betterproto.enum_field(1)
-    operator_statistics: "OperatorStatistics" = betterproto.message_field(2)
-
-
-@dataclass(eq=False, repr=False)
-class ExecutionStatsStore(betterproto.Message):
-    start_time_stamp: int = betterproto.int64_field(1)
-    end_time_stamp: int = betterproto.int64_field(2)
-    operator_info: Dict[str, "OperatorMetrics"] = betterproto.map_field(
-        3, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
-    )
-    operator_worker_mapping: List["OperatorWorkerMapping"] = 
betterproto.message_field(
-        4
-    )
-
-
-@dataclass(eq=False, repr=False)
-class WorkflowFatalError(betterproto.Message):
-    type: "FatalErrorType" = betterproto.enum_field(1)
-    timestamp: datetime = betterproto.message_field(2)
-    message: str = betterproto.string_field(3)
-    details: str = betterproto.string_field(4)
-    operator_id: str = betterproto.string_field(5)
-    worker_id: str = betterproto.string_field(6)
-
-
-@dataclass(eq=False, repr=False)
-class ExecutionMetadataStore(betterproto.Message):
-    state: "WorkflowAggregatedState" = betterproto.enum_field(1)
-    fatal_errors: List["WorkflowFatalError"] = betterproto.message_field(2)
-    execution_id: "__amber_engine_common__.ExecutionIdentity" = (
-        betterproto.message_field(3)
-    )
-    is_recovering: bool = betterproto.bool_field(4)
diff --git a/amber/src/main/python/proto/scalapb/__init__.py 
b/amber/src/main/python/proto/scalapb/__init__.py
deleted file mode 100644
index 49c713815a..0000000000
--- a/amber/src/main/python/proto/scalapb/__init__.py
+++ /dev/null
@@ -1,421 +0,0 @@
-# Generated by the protocol buffer compiler.  DO NOT EDIT!
-# sources: scalapb/scalapb.proto
-# plugin: python-betterproto
-# This file has been @generated
-
-from dataclasses import dataclass
-from typing import (
-    Dict,
-    List,
-)
-
-import betterproto
-import betterproto.lib.google.protobuf as betterproto_lib_google_protobuf
-
-
-class MatchType(betterproto.Enum):
-    CONTAINS = 0
-    EXACT = 1
-    PRESENCE = 2
-
-
-class ScalaPbOptionsOptionsScope(betterproto.Enum):
-    """
-    Whether to apply the options only to this file, or for the entire package 
(and its subpackages)
-    """
-
-    FILE = 0
-    """Apply the options for this file only (default)"""
-
-    PACKAGE = 1
-    """Apply the options for the entire package and its subpackages."""
-
-
-class ScalaPbOptionsEnumValueNaming(betterproto.Enum):
-    """Naming convention for generated enum values"""
-
-    AS_IN_PROTO = 0
-    CAMEL_CASE = 1
-
-
-@dataclass(eq=False, repr=False)
-class ScalaPbOptions(betterproto.Message):
-    package_name: str = betterproto.string_field(1)
-    """If set then it overrides the java_package and package."""
-
-    flat_package: bool = betterproto.bool_field(2)
-    """
-    If true, the compiler does not append the proto base file name
-     into the generated package name. If false (the default), the
-     generated scala package name is the package_name.basename where
-     basename is the proto file name without the .proto extension.
-    """
-
-    import_: List[str] = betterproto.string_field(3)
-    """
-    Adds the following imports at the top of the file (this is meant
-     to provide implicit TypeMappers)
-    """
-
-    preamble: List[str] = betterproto.string_field(4)
-    """
-    Text to add to the generated scala file.  This can be used only
-     when single_file is true.
-    """
-
-    single_file: bool = betterproto.bool_field(5)
-    """
-    If true, all messages and enums (but not services) will be written
-     to a single Scala file.
-    """
-
-    no_primitive_wrappers: bool = betterproto.bool_field(7)
-    """
-    By default, wrappers defined at
-     
https://github.com/google/protobuf/blob/master/src/google/protobuf/wrappers.proto,
-     are mapped to an Option[T] where T is a primitive type. When this field
-     is set to true, we do not perform this transformation.
-    """
-
-    primitive_wrappers: bool = betterproto.bool_field(6)
-    """
-    DEPRECATED. In ScalaPB <= 0.5.47, it was necessary to explicitly enable
-     primitive_wrappers. This field remains here for backwards compatibility,
-     but it has no effect on generated code. It is an error to set both
-     `primitive_wrappers` and `no_primitive_wrappers`.
-    """
-
-    collection_type: str = betterproto.string_field(8)
-    """
-    Scala type to be used for repeated fields. If unspecified,
-     `scala.collection.Seq` will be used.
-    """
-
-    preserve_unknown_fields: bool = betterproto.bool_field(9)
-    """
-    If set to true, all generated messages in this file will preserve unknown
-     fields.
-    """
-
-    object_name: str = betterproto.string_field(10)
-    """
-    If defined, sets the name of the file-level object that would be 
generated. This
-     object extends `GeneratedFileObject` and contains descriptors, and list 
of message
-     and enum companions.
-    """
-
-    scope: "ScalaPbOptionsOptionsScope" = betterproto.enum_field(11)
-    """Experimental: scope to apply the given options."""
-
-    lenses: bool = betterproto.bool_field(12)
-    """If true, lenses will be generated."""
-
-    retain_source_code_info: bool = betterproto.bool_field(13)
-    """
-    If true, then source-code info information will be included in the
-     generated code - normally the source code info is cleared out to reduce
-     code size.  The source code info is useful for extracting source code
-     location from the descriptors as well as comments.
-    """
-
-    map_type: str = betterproto.string_field(14)
-    """
-    Scala type to be used for maps. If unspecified,
-     `scala.collection.immutable.Map` will be used.
-    """
-
-    no_default_values_in_constructor: bool = betterproto.bool_field(15)
-    """
-    If true, no default values will be generated in message constructors.
-    """
-
-    enum_value_naming: "ScalaPbOptionsEnumValueNaming" = 
betterproto.enum_field(16)
-    enum_strip_prefix: bool = betterproto.bool_field(17)
-    """
-    Indicate if prefix (enum name + optional underscore) should be removed in 
scala code
-     Strip is applied before enum value naming changes.
-    """
-
-    bytes_type: str = betterproto.string_field(21)
-    """Scala type to use for bytes fields."""
-
-    java_conversions: bool = betterproto.bool_field(23)
-    """Enable java conversions for this file."""
-
-    aux_message_options: List["ScalaPbOptionsAuxMessageOptions"] = (
-        betterproto.message_field(18)
-    )
-    """List of message options to apply to some messages."""
-
-    aux_field_options: List["ScalaPbOptionsAuxFieldOptions"] = (
-        betterproto.message_field(19)
-    )
-    """List of message options to apply to some fields."""
-
-    aux_enum_options: List["ScalaPbOptionsAuxEnumOptions"] = 
betterproto.message_field(
-        20
-    )
-    """List of message options to apply to some enums."""
-
-    aux_enum_value_options: List["ScalaPbOptionsAuxEnumValueOptions"] = (
-        betterproto.message_field(22)
-    )
-    """List of enum value options to apply to some enum values."""
-
-    preprocessors: List[str] = betterproto.string_field(24)
-    """List of preprocessors to apply."""
-
-    field_transformations: List["FieldTransformation"] = 
betterproto.message_field(25)
-    ignore_all_transformations: bool = betterproto.bool_field(26)
-    """
-    Ignores all transformations for this file. This is meant to allow specific 
files to
-     opt out from transformations inherited through package-scoped options.
-    """
-
-    getters: bool = betterproto.bool_field(27)
-    """If true, getters will be generated."""
-
-    test_only_no_java_conversions: bool = betterproto.bool_field(999)
-    """
-    For use in tests only. Inhibit Java conversions even when when generator 
parameters
-     request for it.
-    """
-
-
-@dataclass(eq=False, repr=False)
-class ScalaPbOptionsAuxMessageOptions(betterproto.Message):
-    """
-    AuxMessageOptions enables you to set message-level options through 
package-scoped options.
-     This is useful when you can't add a dependency on scalapb.proto from the 
proto file that
-     defines the message.
-    """
-
-    target: str = betterproto.string_field(1)
-    """The fully-qualified name of the message in the proto name space."""
-
-    options: "MessageOptions" = betterproto.message_field(2)
-    """
-    Options to apply to the message. If there are any options defined on the 
target message
-     they take precedence over the options.
-    """
-
-
-@dataclass(eq=False, repr=False)
-class ScalaPbOptionsAuxFieldOptions(betterproto.Message):
-    """
-    AuxFieldOptions enables you to set field-level options through 
package-scoped options.
-     This is useful when you can't add a dependency on scalapb.proto from the 
proto file that
-     defines the field.
-    """
-
-    target: str = betterproto.string_field(1)
-    """The fully-qualified name of the field in the proto name space."""
-
-    options: "FieldOptions" = betterproto.message_field(2)
-    """
-    Options to apply to the field. If there are any options defined on the 
target message
-     they take precedence over the options.
-    """
-
-
-@dataclass(eq=False, repr=False)
-class ScalaPbOptionsAuxEnumOptions(betterproto.Message):
-    """
-    AuxEnumOptions enables you to set enum-level options through 
package-scoped options.
-     This is useful when you can't add a dependency on scalapb.proto from the 
proto file that
-     defines the enum.
-    """
-
-    target: str = betterproto.string_field(1)
-    """The fully-qualified name of the enum in the proto name space."""
-
-    options: "EnumOptions" = betterproto.message_field(2)
-    """
-    Options to apply to the enum. If there are any options defined on the 
target enum
-     they take precedence over the options.
-    """
-
-
-@dataclass(eq=False, repr=False)
-class ScalaPbOptionsAuxEnumValueOptions(betterproto.Message):
-    """
-    AuxEnumValueOptions enables you to set enum value level options through 
package-scoped
-     options.  This is useful when you can't add a dependency on scalapb.proto 
from the proto
-     file that defines the enum.
-    """
-
-    target: str = betterproto.string_field(1)
-    """The fully-qualified name of the enum value in the proto name space."""
-
-    options: "EnumValueOptions" = betterproto.message_field(2)
-    """
-    Options to apply to the enum value. If there are any options defined on
-     the target enum value they take precedence over the options.
-    """
-
-
-@dataclass(eq=False, repr=False)
-class MessageOptions(betterproto.Message):
-    extends: List[str] = betterproto.string_field(1)
-    """Additional classes and traits to mix in to the case class."""
-
-    companion_extends: List[str] = betterproto.string_field(2)
-    """Additional classes and traits to mix in to the companion object."""
-
-    annotations: List[str] = betterproto.string_field(3)
-    """Custom annotations to add to the generated case class."""
-
-    type: str = betterproto.string_field(4)
-    """
-    All instances of this message will be converted to this type. An implicit 
TypeMapper
-     must be present.
-    """
-
-    companion_annotations: List[str] = betterproto.string_field(5)
-    """
-    Custom annotations to add to the companion object of the generated class.
-    """
-
-    sealed_oneof_extends: List[str] = betterproto.string_field(6)
-    """
-    Additional classes and traits to mix in to generated sealed_oneof base 
trait.
-    """
-
-    no_box: bool = betterproto.bool_field(7)
-    """
-    If true, when this message is used as an optional field, do not wrap it in 
an `Option`.
-     This is equivalent of setting `(field).no_box` to true on each field with 
the message type.
-    """
-
-    unknown_fields_annotations: List[str] = betterproto.string_field(8)
-    """
-    Custom annotations to add to the generated `unknownFields` case class 
field.
-    """
-
-
-@dataclass(eq=False, repr=False)
-class Collection(betterproto.Message):
-    """
-    Represents a custom Collection type in Scala. This allows ScalaPB to 
integrate with
-     collection types that are different enough from the ones in the standard 
library.
-    """
-
-    type: str = betterproto.string_field(1)
-    """Type of the collection"""
-
-    non_empty: bool = betterproto.bool_field(2)
-    """
-    Set to true if this collection type is not allowed to be empty, for example
-     cats.data.NonEmptyList.  When true, ScalaPB will not generate `clearX` 
for the repeated
-     field and not provide a default argument in the constructor.
-    """
-
-    adapter: str = betterproto.string_field(3)
-    """
-    An Adapter is a Scala object available at runtime that provides certain 
static methods
-     that can operate on this collection type.
-    """
-
-
-@dataclass(eq=False, repr=False)
-class FieldOptions(betterproto.Message):
-    type: str = betterproto.string_field(1)
-    scala_name: str = betterproto.string_field(2)
-    collection_type: str = betterproto.string_field(3)
-    """
-    Can be specified only if this field is repeated. If unspecified,
-     it falls back to the file option named `collection_type`, which defaults
-     to `scala.collection.Seq`.
-    """
-
-    collection: "Collection" = betterproto.message_field(8)
-    key_type: str = betterproto.string_field(4)
-    """
-    If the field is a map, you can specify custom Scala types for the key
-     or value.
-    """
-
-    value_type: str = betterproto.string_field(5)
-    annotations: List[str] = betterproto.string_field(6)
-    """Custom annotations to add to the field."""
-
-    map_type: str = betterproto.string_field(7)
-    """
-    Can be specified only if this field is a map. If unspecified,
-     it falls back to the file option named `map_type` which defaults to
-     `scala.collection.immutable.Map`
-    """
-
-    no_box: bool = betterproto.bool_field(30)
-    """
-    Do not box this value in Option[T]. If set, this overrides 
MessageOptions.no_box
-    """
-
-    required: bool = betterproto.bool_field(31)
-    """
-    Like no_box it does not box a value in Option[T], but also fails parsing 
when a value
-     is not provided. This enables to emulate required fields in proto3.
-    """
-
-
-@dataclass(eq=False, repr=False)
-class EnumOptions(betterproto.Message):
-    extends: List[str] = betterproto.string_field(1)
-    """Additional classes and traits to mix in to the base trait"""
-
-    companion_extends: List[str] = betterproto.string_field(2)
-    """Additional classes and traits to mix in to the companion object."""
-
-    type: str = betterproto.string_field(3)
-    """
-    All instances of this enum will be converted to this type. An implicit 
TypeMapper
-     must be present.
-    """
-
-    base_annotations: List[str] = betterproto.string_field(4)
-    """Custom annotations to add to the generated enum's base class."""
-
-    recognized_annotations: List[str] = betterproto.string_field(5)
-    """Custom annotations to add to the generated trait."""
-
-    unrecognized_annotations: List[str] = betterproto.string_field(6)
-    """Custom annotations to add to the generated Unrecognized case class."""
-
-
-@dataclass(eq=False, repr=False)
-class EnumValueOptions(betterproto.Message):
-    extends: List[str] = betterproto.string_field(1)
-    """Additional classes and traits to mix in to an individual enum value."""
-
-    scala_name: str = betterproto.string_field(2)
-    """Name in Scala to use for this enum value."""
-
-    annotations: List[str] = betterproto.string_field(3)
-    """
-    Custom annotations to add to the generated case object for this enum value.
-    """
-
-
-@dataclass(eq=False, repr=False)
-class OneofOptions(betterproto.Message):
-    extends: List[str] = betterproto.string_field(1)
-    """Additional traits to mix in to a oneof."""
-
-    scala_name: str = betterproto.string_field(2)
-    """Name in Scala to use for this oneof field."""
-
-
-@dataclass(eq=False, repr=False)
-class FieldTransformation(betterproto.Message):
-    when: "betterproto_lib_google_protobuf.FieldDescriptorProto" = (
-        betterproto.message_field(1)
-    )
-    match_type: "MatchType" = betterproto.enum_field(2)
-    set: "betterproto_lib_google_protobuf.FieldOptions" = 
betterproto.message_field(3)
-
-
-@dataclass(eq=False, repr=False)
-class PreprocessorOutput(betterproto.Message):
-    options_by_file: Dict[str, "ScalaPbOptions"] = betterproto.map_field(
-        1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
-    )
diff --git a/bin/computing-unit-master.dockerfile 
b/bin/computing-unit-master.dockerfile
index 191d23f2dd..0d9d60b79f 100644
--- a/bin/computing-unit-master.dockerfile
+++ b/bin/computing-unit-master.dockerfile
@@ -36,15 +36,32 @@ COPY project/ project/
 COPY build.sbt build.sbt
 COPY .jvmopts .jvmopts
 
-# Update system and install dependencies. python3-minimal is needed by
-# bin/licensing/concat_license_binary.py below.
+# python3-minimal is needed by bin/licensing/concat_license_binary.py;
+# python3-pip installs the betterproto plugin; unzip + curl fetch protoc.
 RUN apt-get update && apt-get install -y \
     netcat \
     unzip \
+    curl \
     libpq-dev \
     python3-minimal \
+    python3-pip \
     && apt-get clean
 
+# Install protoc (version pinned in bin/protoc-version.txt) and the
+# betterproto plugin (version pinned via amber/requirements.txt as a
+# pip constraint, so the runtime base `betterproto` and the build-time
+# `betterproto[compiler]` stay in lockstep), then regenerate
+# amber/src/main/python/proto/ before `sbt dist`.
+COPY bin/protoc-version.txt bin/protoc-version.txt
+COPY bin/python-proto-gen.sh bin/python-proto-gen.sh
+RUN PROTOC_VERSION=$(cat bin/protoc-version.txt) \
+    && curl -fsSL -o /tmp/protoc.zip 
"https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip";
 \
+    && unzip -o /tmp/protoc.zip -d /usr/local \
+    && chmod +x /usr/local/bin/protoc \
+    && rm /tmp/protoc.zip \
+    && pip3 install --no-cache-dir -c amber/requirements.txt 
'betterproto[compiler]' \
+    && bash bin/python-proto-gen.sh
+
 # Add .git for runtime calls to jgit from OPversion
 COPY .git .git
 COPY LICENSE NOTICE DISCLAIMER ./
diff --git a/bin/computing-unit-worker.dockerfile 
b/bin/computing-unit-worker.dockerfile
index 28d3b4cf0c..fc80998888 100644
--- a/bin/computing-unit-worker.dockerfile
+++ b/bin/computing-unit-worker.dockerfile
@@ -36,15 +36,32 @@ COPY project/ project/
 COPY build.sbt build.sbt
 COPY .jvmopts .jvmopts
 
-# Update system and install dependencies. python3-minimal is needed by
-# bin/licensing/concat_license_binary.py below.
+# python3-minimal is needed by bin/licensing/concat_license_binary.py;
+# python3-pip installs the betterproto plugin; unzip + curl fetch protoc.
 RUN apt-get update && apt-get install -y \
     netcat \
     unzip \
+    curl \
     libpq-dev \
     python3-minimal \
+    python3-pip \
     && apt-get clean
 
+# Install protoc (version pinned in bin/protoc-version.txt) and the
+# betterproto plugin (version pinned via amber/requirements.txt as a
+# pip constraint, so the runtime base `betterproto` and the build-time
+# `betterproto[compiler]` stay in lockstep), then regenerate
+# amber/src/main/python/proto/ before `sbt dist`.
+COPY bin/protoc-version.txt bin/protoc-version.txt
+COPY bin/python-proto-gen.sh bin/python-proto-gen.sh
+RUN PROTOC_VERSION=$(cat bin/protoc-version.txt) \
+    && curl -fsSL -o /tmp/protoc.zip 
"https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip";
 \
+    && unzip -o /tmp/protoc.zip -d /usr/local \
+    && chmod +x /usr/local/bin/protoc \
+    && rm /tmp/protoc.zip \
+    && pip3 install --no-cache-dir -c amber/requirements.txt 
'betterproto[compiler]' \
+    && bash bin/python-proto-gen.sh
+
 # Add .git for runtime calls to jgit from OPversion
 COPY .git .git
 COPY LICENSE NOTICE DISCLAIMER ./
diff --git a/bin/protoc-version.txt b/bin/protoc-version.txt
new file mode 100644
index 0000000000..de24deecf3
--- /dev/null
+++ b/bin/protoc-version.txt
@@ -0,0 +1 @@
+3.19.4
diff --git a/bin/python-proto-gen.sh b/bin/python-proto-gen.sh
index 0faf33eb9b..db51bb7626 100755
--- a/bin/python-proto-gen.sh
+++ b/bin/python-proto-gen.sh
@@ -15,10 +15,12 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# assuming inside the pytexera executing Python ENV
+set -euo pipefail
 
-# dirs
-TEXERA_HOME="$(git rev-parse --show-toplevel)"
+# Resolve repo root from this script's location (avoids git/CWD assumptions
+# so the script works inside Docker build stages before .git is copied).
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+TEXERA_HOME="$(cd "$SCRIPT_DIR/.." && pwd)"
 AMBER_DIR="$TEXERA_HOME/amber"
 PYAMBER_DIR="$AMBER_DIR/src/main/python"
 PROTOBUF_AMBER_DIR="$AMBER_DIR/src/main/protobuf"
@@ -26,8 +28,12 @@ PROTOBUF_AMBER_DIR="$AMBER_DIR/src/main/protobuf"
 CORE_DIR="$TEXERA_HOME/common/workflow-core"
 PROTOBUF_CORE_DIR="$CORE_DIR/src/main/protobuf"
 
+PROTOC_INCLUDE_DIR="$(dirname "$(dirname "$(command -v protoc)")")/include"
+
 # proto-gen
+mkdir -p "$PYAMBER_DIR/proto"
 protoc --python_betterproto_out="$PYAMBER_DIR/proto" \
+ -I="$PROTOC_INCLUDE_DIR" \
  -I="$PROTOBUF_AMBER_DIR" \
  -I="$PROTOBUF_CORE_DIR" \
  $(find "$PROTOBUF_AMBER_DIR" -iname "*.proto") \
diff --git a/bin/texera-web-application.dockerfile 
b/bin/texera-web-application.dockerfile
index a829fb16aa..efaee5699a 100644
--- a/bin/texera-web-application.dockerfile
+++ b/bin/texera-web-application.dockerfile
@@ -50,15 +50,33 @@ COPY project/ project/
 COPY build.sbt build.sbt
 COPY .jvmopts .jvmopts
 
-# Update system and install dependencies. python3-minimal is needed by
-# bin/licensing/concat_license_binary.py below.
+# python3-minimal is needed by bin/licensing/concat_license_binary.py;
+# python3-pip installs the betterproto plugin; unzip + curl fetch protoc.
 RUN apt-get update && apt-get install -y \
     netcat \
     unzip \
+    curl \
     libpq-dev \
     python3-minimal \
+    python3-pip \
     && apt-get clean
 
+# Install protoc (version pinned in bin/protoc-version.txt) and the
+# betterproto plugin (version pinned via amber/requirements.txt as a
+# pip constraint, so the runtime base `betterproto` and the build-time
+# `betterproto[compiler]` stay in lockstep), then regenerate
+# amber/src/main/python/proto/ before the WorkflowExecutionService dist
+# is packaged.
+COPY bin/protoc-version.txt bin/protoc-version.txt
+COPY bin/python-proto-gen.sh bin/python-proto-gen.sh
+RUN PROTOC_VERSION=$(cat bin/protoc-version.txt) \
+    && curl -fsSL -o /tmp/protoc.zip 
"https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip";
 \
+    && unzip -o /tmp/protoc.zip -d /usr/local \
+    && chmod +x /usr/local/bin/protoc \
+    && rm /tmp/protoc.zip \
+    && pip3 install --no-cache-dir -c amber/requirements.txt 
'betterproto[compiler]' \
+    && bash bin/python-proto-gen.sh
+
 # Add .git for runtime calls to jgit from OPversion
 COPY .git .git
 COPY LICENSE NOTICE DISCLAIMER ./


Reply via email to