Xiao-zhen-Liu commented on code in PR #4441:
URL: https://github.com/apache/texera/pull/4441#discussion_r3140548480


##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala:
##########
@@ -167,6 +167,8 @@ class RegionExecutionCoordinator(
                 val actorRef = actorRefService.getActorRef(workerId)
                 // Remove the actorRef so that no other actors can find the 
worker and send messages.
                 actorRefService.removeActorRef(workerId)
+                asyncRPCClient.inputGateway.removeControlChannel(workerId)

Review Comment:
   Can you remind me (and also put in the comment) why we need to remove 
control channels? Was it something to do with some ID?



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala:
##########
@@ -94,4 +94,8 @@ class NetworkOutputGateway(
     idToSequenceNums.getOrElseUpdate(channelId, new 
AtomicLong()).getAndIncrement()
   }
 
+  def removeControlChannel(to: ActorVirtualIdentity): Unit = {

Review Comment:
   Why is this not `synchronized` like in `NetworkInputGateway`?



##########
amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala:
##########
@@ -247,6 +247,8 @@ object WorkflowExecutionsResource {
         OPERATOR_PORT_EXECUTIONS.RESULT_URI
       )
       .values(eid.id.toInt, globalPortId.serializeAsString, uri.toString)
+      .onConflict()

Review Comment:
   This seems a bit hacky. It would be cleaner if we just avoid this insertion 
in `RegionExecutionCoordinator` for restarted regions.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala:
##########
@@ -36,19 +36,13 @@ case class WorkflowExecution() {
 
   /**
     * Initializes or retrieves a `RegionExecution` for a given `Region`. If 
not already
-    * initialized, it creates and returns a new `RegionExecution`; otherwise, 
an assertion
-    * error is thrown if re-initialization is attempted.
+    * initialized, it creates and returns a new `RegionExecution`。
     *
     * @param region The `Region` for which to initialize or retrieve the 
`RegionExecution`.
     * @return The `RegionExecution` associated with the given `Region`.
-    * @throws AssertionError if the `RegionExecution` has already been 
initialized.
     */
   def initRegionExecution(region: Region): RegionExecution = {
-    // ensure the region execution hasn't been initialized already.
-    assert(
-      !regionExecutions.contains(region.id),
-      s"RegionExecution of ${region.id} already initialized."
-    )
+    regionExecutions.remove(region.id)

Review Comment:
   `initRegionExecution` now removes any existing execution before creating the 
new one, so the method no longer behaves like an init-or-get helper. Please 
rename or document it as an explicit reset/restart operation, and clean up the 
Scaladoc wording/non-ASCII punctuation so future callers do not assume this is 
a harmless initializer.
   
   Better yet, you could separate `init` from `reset`, which would be cleaner 
anyway. I am a bit against your current way of treating restart the same way as 
init. They are semantically different, even if they do the same thing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to