mynameborat commented on code in PR #1657:
URL: https://github.com/apache/samza/pull/1657#discussion_r1142837174


##########
samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java:
##########
@@ -44,16 +45,19 @@ public class SideInputTask implements RunLoopTask {
   private final Set<SystemStreamPartition> taskSSPs;
   private final TaskSideInputHandler taskSideInputHandler;
   private final TaskInstanceMetrics metrics;
+  private final long commitMs;
 
   public SideInputTask(
       TaskName taskName,
       Set<SystemStreamPartition> taskSSPs,
       TaskSideInputHandler taskSideInputHandler,
-      TaskInstanceMetrics metrics) {
+      TaskInstanceMetrics metrics,
+      long commitMs) {
     this.taskName = taskName;
     this.taskSSPs = taskSSPs;
     this.taskSideInputHandler = taskSideInputHandler;
     this.metrics = metrics;
+    this.commitMs = commitMs;

Review Comment:
   Reminder to add warn logging about `commitMs <= 0`



##########
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java:
##########
@@ -69,6 +72,12 @@ public class StorageManagerUtil {
   private static final String SST_FILE_SUFFIX = ".sst";
   private static final CheckpointV2Serde CHECKPOINT_V2_SERDE = new 
CheckpointV2Serde();
 
+  // Unlike checkpoint or offset files, side input offset file can have 
multiple writers / readers,
+  // since they are written during SideInputTask commit and copied to store 
checkpoint directory
+  // by TaskStorageCommitManager during regular TaskInstance commit (these 
commits are on separate run loops).
+  // We use a (process-wide) lock to ensure that such write and copy 
operations are thread-safe.

Review Comment:
   Why do we need process-wide locks? We don't have container level stores and 
this util is scoped to task level right?



##########
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java:
##########
@@ -69,6 +72,12 @@ public class StorageManagerUtil {
   private static final String SST_FILE_SUFFIX = ".sst";
   private static final CheckpointV2Serde CHECKPOINT_V2_SERDE = new 
CheckpointV2Serde();
 
+  // Unlike checkpoint or offset files, side input offset file can have 
multiple writers / readers,
+  // since they are written during SideInputTask commit and copied to store 
checkpoint directory
+  // by TaskStorageCommitManager during regular TaskInstance commit (these 
commits are on separate run loops).
+  // We use a (process-wide) lock to ensure that such write and copy 
operations are thread-safe.

Review Comment:
   Never mind. I realized this lock is used across the two run loops during the 
upload to the blob store code path. Is that right? 



##########
samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java:
##########
@@ -339,7 +339,7 @@ private Map<String, StorageEngine> 
createStoreEngines(Set<String> storeNames, Jo
     // Put non persisted stores
     nonPersistedStores.forEach(storageEngines::put);
     // Create persisted stores
-    storeNames.forEach(storeName -> {
+    storeNames.stream().filter(s -> 
!nonPersistedStores.containsKey(s)).forEach(storeName -> {

Review Comment:
   Is this part of bug fix or more like enforcing the flow for only persisted 
stores?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to