bkonold commented on a change in pull request #1385:
URL: https://github.com/apache/samza/pull/1385#discussion_r443895237



##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -119,6 +125,17 @@ public void init() {
 
     this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
     LOG.info("Starting offsets for the task {}: {}", taskName, 
startingOffsets);
+
+    this.sspOffsetsToBlockUntil = new HashMap<>();
+    for (SystemStreamPartition ssp : this.sspToStores.keySet()) {
+      SystemStreamMetadata metadata = 
this.streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), false);
+      if (metadata != null) {
+        String offset = 
metadata.getSystemStreamPartitionMetadata().get(ssp.getPartition()).getNewestOffset();
+        this.sspOffsetsToBlockUntil.put(ssp, offset);
+      }
+    }
+

Review comment:
       Sure. I don't mind refactoring this into its own method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to