scwhittle commented on code in PR #33591:
URL: https://github.com/apache/beam/pull/33591#discussion_r1926729156
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java:
##########
@@ -214,8 +214,27 @@ public long add(WindowedValue<T> data) throws IOException {
.setData(value)
.setMetadata(metadata);
keyedOutput.addMessages(builder.build());
+
+ long offsetSize = 0;
+ if (context.offsetBasedDeduplicationEnabled()) {
+ byte[] rawId = context.getCurrentRecordId();
+ if (rawId.length == 0) {
+ throw new RuntimeException("Unexpected empty record ID while
offset-based deduplication enabled.");
+ }
+ id = ByteString.copyFrom(rawId);
Review Comment:
should we prefer this id or the id from ValueWithRecordId if both are set?
or we could make it not valid to have both set or expect them to be the same
if both are set.
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java:
##########
@@ -93,6 +93,21 @@ public boolean requiresDeduping() {
return false;
}
+ /**
+ * If offsetBasedDeduplicationEnabled returns true, then the UnboundedSource
needs to provide the
+ * following:
+ *
+ * <ul>
+ * <li>UnboundedReader which provides offsets that are unique for each
element and
+ * lexicographically ordered.
+ * <li>CheckpointMark which provides an offset greater than all elements
read and less than or
+ * equal to the next offset that will be read.
+ * </ul>
+ */
+ public boolean offsetBasedDeduplicationEnabled() {
Review Comment:
sorry to still nit on this, but Enabled could be confusing if the runner
doesn't support it or use the offsets.
How about offsetBasedDeduplicationSupported? update the execution context
method as well
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java:
##########
@@ -139,6 +146,11 @@ public void finalizeCheckpoint() throws IOException {
// nothing to do
}
}
+
+ /* Get offset limit for unbounded source split checkpoint. */
+ default byte[] getOffsetLimit() {
Review Comment:
Yup that's what I was thinking, thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]