tomstepp commented on code in PR #33596:
URL: https://github.com/apache/beam/pull/33596#discussion_r1925853600
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -299,6 +303,30 @@ public Instant getCurrentTimestamp() throws
NoSuchElementException {
return curTimestamp;
}
+ @Override
+ public byte[] getCurrentRecordId() throws NoSuchElementException {
Review Comment:
1. Callers to check: currently callers can check via
`UnboundedReader.getCurrentSource().offsetBasedDeduplicationEnabled()`. Are you
proposing to provide a direct method of UnboundedReader for callers to check?
For example, `UnboundedReader.offsetBasedDeduplicationEnabled()`?
2. Error to call if not set: I've adjusted the checks here a bit (check
offset dedup earlier), but not sure if I captured what you meant. Please let me
know if we should adjust the approach some more.
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -299,6 +303,30 @@ public Instant getCurrentTimestamp() throws
NoSuchElementException {
return curTimestamp;
}
+ @Override
+ public byte[] getCurrentRecordId() throws NoSuchElementException {
+ if (curId == null) {
+ if (this.offsetDeduplication) {
+ throw new NoSuchElementException();
+ } else {
+ return new byte[0];
+ }
+ }
+ return curId;
+ }
+
+ @Override
+ public byte[] getCurrentRecordOffset() throws NoSuchElementException {
Review Comment:
Responded above but leaving this open to make sure any subsequent changes
get reflected here too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]