prateekm commented on code in PR #1657:
URL: https://github.com/apache/samza/pull/1657#discussion_r1143790143


##########
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java:
##########
@@ -69,6 +72,12 @@ public class StorageManagerUtil {
   private static final String SST_FILE_SUFFIX = ".sst";
   private static final CheckpointV2Serde CHECKPOINT_V2_SERDE = new 
CheckpointV2Serde();
 
+  // Unlike checkpoint or offset files, side input offset file can have 
multiple writers / readers,
+  // since they are written during SideInputTask commit and copied to store 
checkpoint directory
+  // by TaskStorageCommitManager during regular TaskInstance commit (these 
commits are on separate run loops).
+  // We use a (process-wide) lock to ensure that such write and copy 
operations are thread-safe.

Review Comment:
   That's right, needed because there's no other coordination/synchronization 
between the two run loops. It is used during commit whenever they 
read/write/copy side input offsets file.
   
   To avoid deadlocks, lock is only held within a single method invocation, and 
the methods do not call each other. On second thought, I'll use a semaphore 
instead of a reentrant lock to document and enforce this.



##########
samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java:
##########
@@ -44,16 +45,19 @@ public class SideInputTask implements RunLoopTask {
   private final Set<SystemStreamPartition> taskSSPs;
   private final TaskSideInputHandler taskSideInputHandler;
   private final TaskInstanceMetrics metrics;
+  private final long commitMs;
 
   public SideInputTask(
       TaskName taskName,
       Set<SystemStreamPartition> taskSSPs,
       TaskSideInputHandler taskSideInputHandler,
-      TaskInstanceMetrics metrics) {
+      TaskInstanceMetrics metrics,
+      long commitMs) {
     this.taskName = taskName;
     this.taskSSPs = taskSSPs;
     this.taskSideInputHandler = taskSideInputHandler;
     this.metrics = metrics;
+    this.commitMs = commitMs;

Review Comment:
   Will do, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to