scwhittle commented on code in PR #33596:
URL: https://github.com/apache/beam/pull/33596#discussion_r1937391856


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java:
##########
@@ -137,6 +137,7 @@ Object getDefaultValue() {
         return false;
       }
     },
+    OFFSET_DEDUPLICATION(LEGACY),

Review Comment:
   this looks odd to me, shouldn't there be a } brace to match the start of the 
enum {? seems like that before but not sure why it works



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -299,6 +300,34 @@ public Instant getCurrentTimestamp() throws 
NoSuchElementException {
     return curTimestamp;
   }
 
+  private static final byte[] EMPTY_RECORD_ID = new byte[0];
+
+  @Override
+  public byte[] getCurrentRecordId() throws NoSuchElementException {
+    if (!this.offsetBasedDeduplicationSupported) {
+      // BoundedReadFromUnboundedSource and tests may call 
getCurrentRecordId(), even for non offset
+      // deduplication cases. Therefore, Kafka reader cannot produce an 
exception when offset
+      // deduplication is disabled. Instead an empty record ID is provided.
+      return EMPTY_RECORD_ID;
+    }
+    if (curRecord != null) {

Review Comment:
   nit: how about checking == null here and throwing. the exception clearly 
tied to the check then



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1573,6 +1586,27 @@ && 
runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
       return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, 
valueCoder));
     }
 
+    private void checkRedistributeConfiguration() {
+      if (getRedistributeNumKeys() == 0 && isRedistributed()) {
+        LOG.warn(
+            "withRedistribute without withRedistributeNumKeys will create a 
key per record, which is sub-optimal for most use cases.");
+      }
+      if (isAllowDuplicates()) {
+        checkState(

Review Comment:
   was previously a log warn



##########
sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java:
##########
@@ -221,6 +222,9 @@ public Row toConfigRow(Read<?, ?> transform) {
       fieldValues.put("redistribute", transform.isRedistributed());
       fieldValues.put("redistribute_num_keys", 
transform.getRedistributeNumKeys());

Review Comment:
   should these other nullable fields, also have a null check?



##########
sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java:
##########
@@ -349,6 +353,12 @@ public Row toConfigRow(Read<?, ?> transform) {
             }
           }
         }
+        if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, 
"2.63.0") >= 0) {
+          Boolean offsetDeduplication = 
configRow.getValue("offset_deduplication");

Review Comment:
   mark var nullable



##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java:
##########
@@ -427,15 +441,15 @@ static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
       reader = reader.withTimestampFn(timestampFn);
     }
 
-    if (redistribute) {
+    if (redistribute != null && redistribute) {
+      reader = reader.withRedistribute();
+      reader = reader.withAllowDuplicates(withAllowDuplicates);

Review Comment:
   seems like this should have null check too?



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -299,6 +300,34 @@ public Instant getCurrentTimestamp() throws 
NoSuchElementException {
     return curTimestamp;
   }
 
+  private static final byte[] EMPTY_RECORD_ID = new byte[0];
+
+  @Override
+  public byte[] getCurrentRecordId() throws NoSuchElementException {
+    if (!this.offsetBasedDeduplicationSupported) {
+      // BoundedReadFromUnboundedSource and tests may call 
getCurrentRecordId(), even for non offset
+      // deduplication cases. Therefore, Kafka reader cannot produce an 
exception when offset
+      // deduplication is disabled. Instead an empty record ID is provided.
+      return EMPTY_RECORD_ID;
+    }
+    if (curRecord != null) {
+      return KafkaIOUtils.OffsetBasedDeduplication.getUniqueId(
+          curRecord.getTopic(), curRecord.getPartition(), 
curRecord.getOffset());
+    }
+    throw new NoSuchElementException("KafkaUnboundedReader's curRecord is 
null.");
+  }
+
+  @Override
+  public byte[] getCurrentRecordOffset() throws NoSuchElementException {
+    if (!this.offsetBasedDeduplicationSupported) {
+      throw new RuntimeException("UnboundedSource must enable offset-based 
deduplication.");
+    }
+    if (curRecord != null) {

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to