scwhittle commented on code in PR #33591:
URL: https://github.com/apache/beam/pull/33591#discussion_r1924239234


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java:
##########
@@ -215,7 +215,13 @@ public long add(WindowedValue<T> data) throws IOException {
               .setMetadata(metadata);
       keyedOutput.addMessages(builder.build());
       keyedOutput.addMessagesIds(id);
-      return (long) key.size() + value.size() + metadata.size() + id.size();
+
+      ByteString offset = 
ByteString.copyFrom(context.getCurrentRecordOffset());

Review Comment:
   would be nice to avoid allocation if no current record since that is common



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java:
##########
@@ -139,6 +146,11 @@ public void finalizeCheckpoint() throws IOException {
         // nothing to do
       }
     }
+
+    /* Get offset limit for unbounded source split checkpoint. */
+    default byte[] getOffsetLimit() throws IOException {

Review Comment:
   maybe we should just throw a runtime exception? it seems like this shouldn't 
have to do IO and IOException needs to be handled at callers if method is 
annoated as throwing non-runtime exception



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java:
##########
@@ -93,6 +93,13 @@ public boolean requiresDeduping() {
     return false;
   }
 
+  /**
+   * Returns whether this source is configured for offset-based deduplication 
by the runner.
+   */
+  public boolean offsetDeduplication() {

Review Comment:
   can this be named as a property of the source instead? The difference being 
that it is available to runners to use for optimization if desired but it isn't 
required that the runner does.
   
   so maybe this should be named isOffsetBased with a comment that if it 
returns true then the source needs to provide offsets that are unique for each 
element, and can be ordered lexicographically for both the UnboundedReaders it 
creates and the checkpoint marks it vends must have an offset greater orr equal 
to all elements read and less than the next element (or whatever the edge-case 
semantics are).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to