bkonold commented on a change in pull request #1385:
URL: https://github.com/apache/samza/pull/1385#discussion_r443891873



##########
File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -154,17 +149,13 @@
   private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
taskSideInputStoreSSPs;
   private final Map<SystemStreamPartition, TaskSideInputHandler> 
sspSideInputHandlers;
   private SystemConsumers sideInputSystemConsumers;
-  private final Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata
-      = new ConcurrentHashMap<>(); // Recorded sspMetadata of the 
taskSideInputSSPs recorded at start, used to determine when sideInputs are 
caughtup and container init can proceed
-  private volatile CountDownLatch sideInputsCaughtUp; // Used by the 
sideInput-read thread to signal to the main thread
+  private volatile Map<TaskName, CountDownLatch> sideInputTaskLatches; // Used 
by the sideInput-read thread to signal to the main thread
   private volatile boolean shouldShutdown = false;
+  private RunLoop sideInputRunLoop;
 
   private final ExecutorService sideInputsReadExecutor = 
Executors.newSingleThreadExecutor(
       new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDEINPUTS_READ_THREAD_NAME).build());
 
-  private final ScheduledExecutorService sideInputsFlushExecutor = 
Executors.newSingleThreadScheduledExecutor(
-      new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDEINPUTS_FLUSH_THREAD_NAME).build());
-  private ScheduledFuture sideInputsFlushFuture;

Review comment:
       @mynameborat @rmatharu I am not clear on the historical context here. 
Can one of you comment on why it was written this way?
   
   From what I can tell, the change introduced to threading by my PR is that 
`commit` of one task will no longer be able to run concurrently with `process` 
of another task.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to