This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e27ecedfda5 Do no throw exceptions if commitSync fails in
KafkaUnboundedReader. (#33402)
e27ecedfda5 is described below
commit e27ecedfda5ad83a07935d26a95849e86896885e
Author: Nick Anikin <[email protected]>
AuthorDate: Tue Dec 17 15:56:51 2024 -0500
Do no throw exceptions if commitSync fails in KafkaUnboundedReader. (#33402)
Committing consumer offsets to Kafka is not critical for KafkaIO because it
relies on the offsets stored in KafkaCheckpointMark, but throwing an exception
makes Dataflow retry the same work item unnecessarily.
---
.../beam/sdk/io/kafka/KafkaUnboundedReader.java | 21 ++++++++++++++-------
1 file changed, 14 insertions(+), 7 deletions(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index 069607955c6..ab9e26b3b74 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -606,13 +606,20 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
LOG.debug("{}: Committing finalized checkpoint {}", this,
checkpointMark);
Consumer<byte[], byte[]> consumer =
Preconditions.checkStateNotNull(this.consumer);
- consumer.commitSync(
- checkpointMark.getPartitions().stream()
- .filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
- .collect(
- Collectors.toMap(
- p -> new TopicPartition(p.getTopic(), p.getPartition()),
- p -> new OffsetAndMetadata(p.getNextOffset()))));
+ try {
+ consumer.commitSync(
+ checkpointMark.getPartitions().stream()
+ .filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
+ .collect(
+ Collectors.toMap(
+ p -> new TopicPartition(p.getTopic(),
p.getPartition()),
+ p -> new OffsetAndMetadata(p.getNextOffset()))));
+ } catch (Exception e) {
+ // Log but ignore the exception. Committing consumer offsets to Kafka
is not critical for
+ // KafkaIO because it relies on the offsets stored in
KafkaCheckpointMark.
+ LOG.warn(
+ String.format("%s: Could not commit finalized checkpoint %s",
this, checkpointMark), e);
+ }
}
}