bkonold commented on a change in pull request #1385:
URL: https://github.com/apache/samza/pull/1385#discussion_r462815102



##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -259,6 +288,44 @@ public void stop() {
     return oldestOffsets;
   }
 
+  /**
+   * An SSP is considered caught up once the offset indicated for it in {@link 
#sspOffsetsToBlockUntil} has been
+   * processed. Once the set of SSPs to catch up becomes empty, the latch for 
the task will count down, notifying
+   * {@link ContainerStorageManager} that it is caught up.
+   *
+   * @param ssp The SSP to be checked
+   * @param currentOffset The offset to be checked
+   * @param isStartingOffset Indicates whether the offset being checked is the 
starting offset of the SSP (and thus has
+   *                         not yet been processed). This will be set to true 
when each SSP's starting offset is checked
+   *                         on init, and false when checking if an ssp is 
caught up after processing an envelope.
+   */
+  private void checkCaughtUp(SystemStreamPartition ssp, String currentOffset, 
boolean isStartingOffset) {
+    String offsetToBlockUntil = this.sspOffsetsToBlockUntil.get(ssp);
+
+    LOG.trace("Checking offset {} against {} for {}. isStartingOffset: {}", 
currentOffset, offsetToBlockUntil, ssp, isStartingOffset);
+
+    Integer comparatorResult;
+    if (currentOffset == null || offsetToBlockUntil == null) {
+      comparatorResult = -1;
+    } else {
+      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
+      comparatorResult = systemAdmin.offsetComparator(currentOffset, 
offsetToBlockUntil);
+    }
+
+    // If the starting offset, it must be greater (since the envelope at the 
starting offset will not yet have been processed)
+    // If not the starting offset, it must be greater than OR equal
+    if (comparatorResult != null && ((isStartingOffset && comparatorResult > 
0) || (!isStartingOffset && comparatorResult >= 0))) {
+      LOG.info("Side input ssp {} has caught up to offset {}.", ssp, 
offsetToBlockUntil);

Review comment:
       yea i'll change it back. at startup we need to check starting offsets 
against upcoming, but after each process we compare the envelope's offset to 
newest.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to