xinyuiscool commented on code in PR #1616: URL: https://github.com/apache/samza/pull/1616#discussion_r931322948
########## samza-core/src/main/java/org/apache/samza/container/RunLoop.java: ########## @@ -134,17 +138,26 @@ public RunLoop(Map<TaskName, RunLoopTask> runLoopTasks, this.latch = new Object(); this.workerTimer = Executors.newSingleThreadScheduledExecutor(); this.clock = clock; - Map<TaskName, AsyncTaskWorker> workers = new HashMap<>(); + // assign runId before creating workers. As the inner AsyncTaskWorker class is not static, it relies on + // the outer class fields to be init first + this.runId = runId; + Map<TaskName, AsyncTaskWorker> workers = new HashMap<>(); for (RunLoopTask task : runLoopTasks.values()) { workers.put(task.taskName(), new AsyncTaskWorker(task)); } // Partions and tasks assigned to the container will not change during the run loop life time this.sspToTaskWorkerMapping = Collections.unmodifiableMap(getSspToAsyncTaskWorkerMap(runLoopTasks, workers)); + this.taskWorkers = Collections.unmodifiableList(new ArrayList<>(workers.values())); this.isAsyncCommitEnabled = isAsyncCommitEnabled; this.elasticityFactor = elasticityFactor; } + public void drain() { Review Comment: As we discussed offline, this method can be removed as we can set these flags when runloop receives the drain message. It will also siimply the logic of invoking runloop and factory interface. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@samza.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org