bkonold commented on a change in pull request #1385:
URL: https://github.com/apache/samza/pull/1385#discussion_r443895612
##########
File path:
samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
##########
@@ -259,6 +277,44 @@ public void stop() {
return oldestOffsets;
}
+ /**
+ * Checks if whether the given offset for the SSP has reached the latest
offset (determined at init),
+ * removing it from the list of SSPs to catch up. Once the set of SSPs to
catch up becomes empty, the latch for the
+ * task will count down, notifying {@link ContainerStorageManager} that it
is caught up.
+ *
+ * @param ssp The SSP to be checked
+ * @param currentOffset The offset to be checked
+ * @param isStartingOffset Indicates whether the offset being checked is the
starting offset of the SSP (and thus has
+ * not yet been processed). This will be set to true
when each SSP's starting offset is checked
+ * on init, and false when checking if an ssp is
caught up after processing an envelope.
+ */
+ private void checkCaughtUp(SystemStreamPartition ssp, String currentOffset,
boolean isStartingOffset) {
+ String offsetToBlockUntil = this.sspOffsetsToBlockUntil.get(ssp);
+
+ LOG.trace("Checking offset {} against {} for {}. isStartingOffset: {}",
currentOffset, offsetToBlockUntil, ssp, isStartingOffset);
+
+ Integer comparatorResult;
+ if (currentOffset == null || offsetToBlockUntil == null) {
+ comparatorResult = -1;
+ } else {
+ SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
+ comparatorResult = systemAdmin.offsetComparator(currentOffset,
offsetToBlockUntil);
+ }
+
+ // If the starting offset, it must be greater (since the envelope at the
starting offset will not yet have been processed)
Review comment:
Indeed, the docs are misleading as they indicate that a
lastProcessedOffset would be used as-is as a starting offset. As you point out,
this it not the case. Will update the doc.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]