xinyuiscool commented on code in PR #1616:
URL: https://github.com/apache/samza/pull/1616#discussion_r931322948


##########
samza-core/src/main/java/org/apache/samza/container/RunLoop.java:
##########
@@ -134,17 +138,26 @@ public RunLoop(Map<TaskName, RunLoopTask> runLoopTasks,
     this.latch = new Object();
     this.workerTimer = Executors.newSingleThreadScheduledExecutor();
     this.clock = clock;
-    Map<TaskName, AsyncTaskWorker> workers = new HashMap<>();
+    // assign runId before creating workers. As the inner AsyncTaskWorker 
class is not static, it relies on
+    // the outer class fields to be init first
+    this.runId = runId;
+    Map<TaskName, AsyncTaskWorker>  workers = new HashMap<>();
     for (RunLoopTask task : runLoopTasks.values()) {
       workers.put(task.taskName(), new AsyncTaskWorker(task));
     }
     // Partions and tasks assigned to the container will not change during the 
run loop life time
     this.sspToTaskWorkerMapping = 
Collections.unmodifiableMap(getSspToAsyncTaskWorkerMap(runLoopTasks, workers));
+
     this.taskWorkers = Collections.unmodifiableList(new 
ArrayList<>(workers.values()));
     this.isAsyncCommitEnabled = isAsyncCommitEnabled;
     this.elasticityFactor = elasticityFactor;
   }
 
+  public void drain() {

Review Comment:
   As we discussed offline, this method can be removed as we can set these 
flags when runloop receives the drain message. It will also siimply the logic 
of invoking runloop and factory interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to