This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e27ecedfda5 Do no throw exceptions if commitSync fails in 
KafkaUnboundedReader. (#33402)
e27ecedfda5 is described below

commit e27ecedfda5ad83a07935d26a95849e86896885e
Author: Nick Anikin <[email protected]>
AuthorDate: Tue Dec 17 15:56:51 2024 -0500

    Do no throw exceptions if commitSync fails in KafkaUnboundedReader. (#33402)
    
    Committing consumer offsets to Kafka is not critical for KafkaIO because it 
relies on the offsets stored in KafkaCheckpointMark, but throwing an exception 
makes Dataflow retry the same work item unnecessarily.
---
 .../beam/sdk/io/kafka/KafkaUnboundedReader.java     | 21 ++++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index 069607955c6..ab9e26b3b74 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -606,13 +606,20 @@ class KafkaUnboundedReader<K, V> extends 
UnboundedReader<KafkaRecord<K, V>> {
       LOG.debug("{}: Committing finalized checkpoint {}", this, 
checkpointMark);
       Consumer<byte[], byte[]> consumer = 
Preconditions.checkStateNotNull(this.consumer);
 
-      consumer.commitSync(
-          checkpointMark.getPartitions().stream()
-              .filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
-              .collect(
-                  Collectors.toMap(
-                      p -> new TopicPartition(p.getTopic(), p.getPartition()),
-                      p -> new OffsetAndMetadata(p.getNextOffset()))));
+      try {
+        consumer.commitSync(
+            checkpointMark.getPartitions().stream()
+                .filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
+                .collect(
+                    Collectors.toMap(
+                        p -> new TopicPartition(p.getTopic(), 
p.getPartition()),
+                        p -> new OffsetAndMetadata(p.getNextOffset()))));
+      } catch (Exception e) {
+        // Log but ignore the exception. Committing consumer offsets to Kafka 
is not critical for
+        // KafkaIO because it relies on the offsets stored in 
KafkaCheckpointMark.
+        LOG.warn(
+            String.format("%s: Could not commit finalized checkpoint %s", 
this, checkpointMark), e);
+      }
     }
   }
 

Reply via email to