scwhittle commented on code in PR #33591:
URL: https://github.com/apache/beam/pull/33591#discussion_r1925031478


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -194,6 +195,13 @@ public boolean workIsFailed() {
     return Optional.ofNullable(work).map(Work::isFailed).orElse(false);
   }
 
+  public byte[] getCurrentRecordOffset() {
+    if (activeReader == null) {
+      return NO_RECORD_OFFSET;
+    }
+    return activeReader.getCurrentRecordOffset();

Review Comment:
   maybe we should do
   if (activeReader == null || 
!activeReader.getSource().providesRecordOffsets()) {
     return NO_RECORD_OFFSET;
   }
   return activeReader.getCurrentRecordOffset();
   
   Then we could change the reader implementation to verify the default reader 
impl to just throw an exception that it needs to be overridden if 
providesRecordOffsets is set to true.
   
   I'm just trying to make it not possible to set providesRecordOffsets but 
forget to implement the necessary methods.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java:
##########
@@ -93,6 +93,21 @@ public boolean requiresDeduping() {
     return false;
   }
 
+  /**
+   * If isOffsetDeduplication returns true, then the UnboundedSource needs to
+   * provide the following:
+   * 
+   * <ul>
+   *   <li>UnboundedReader which provides offsets that are unique for each
+   *       element and lexicographically ordered.</li>
+   *   <li>CheckpointMark which provides an offset greater than all elements
+   *       read and less than or equal to the next offset that will be 
read.</li>
+   * </ul>
+   */
+  public boolean isOffsetDeduplication() {

Review Comment:
   maybe providesRecordOffsets? supportsOffsetBasedDeduping?
   
   trying to think of something that is easy to read when in code



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java:
##########
@@ -203,6 +224,11 @@ public byte[] getCurrentRecordId() throws 
NoSuchElementException {
       return EMPTY;
     }
 
+    /* Returns the offset for the current record of this unbounded reader. */
+    public byte[] getCurrentRecordOffset() {
+      return EMPTY;

Review Comment:
   see other comment, seems we could throw like the default checkpoint method 
if we guard at caller



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to