This is an automated email from the ASF dual-hosted git repository. renqs pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 74f90d722f7 [FLINK-28250][Connector/Kafka] Fix exactly-once Kafka Sink out of memory 74f90d722f7 is described below commit 74f90d722f7be5db5298b84626935a585391f0df Author: root <r...@ip-10-31-1-150.us-west-2.compute.internal> AuthorDate: Wed Jul 13 16:50:32 2022 +0000 [FLINK-28250][Connector/Kafka] Fix exactly-once Kafka Sink out of memory --- .../flink/connector/kafka/sink/KafkaCommitter.java | 5 ++++ .../connector/kafka/sink/KafkaCommitterTest.java | 27 ++++++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java index d2873dde40b..4dbeaf9e715 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java @@ -72,6 +72,7 @@ class KafkaCommitter implements Committer<KafkaCommittable>, Closeable { .orElseGet(() -> getRecoveryProducer(committable)); producer.commitTransaction(); producer.flush(); + recyclable.ifPresent(Recyclable::close); } catch (RetriableException e) { LOG.warn( "Encountered retriable exception while committing {}.", transactionalId, e); @@ -90,6 +91,7 @@ class KafkaCommitter implements Committer<KafkaCommittable>, Closeable { ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), e); + recyclable.ifPresent(Recyclable::close); request.signalFailedWithKnownReason(e); } catch (InvalidTxnStateException e) { // This exception only occurs when aborting after a commit or vice versa. @@ -99,12 +101,15 @@ class KafkaCommitter implements Committer<KafkaCommittable>, Closeable { + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.", request, e); + recyclable.ifPresent(Recyclable::close); request.signalFailedWithKnownReason(e); } catch (UnknownProducerIdException e) { LOG.error( "Unable to commit transaction ({}) " + UNKNOWN_PRODUCER_ID_ERROR_MESSAGE, request, e); + recyclable.ifPresent(Recyclable::close); + request.signalFailedWithKnownReason(e); } catch (Exception e) { LOG.error( "Transaction ({}) encountered error and data has been potentially lost.", diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java index 30bbdca7889..8def81a38cf 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java @@ -22,6 +22,7 @@ import org.apache.flink.util.TestLoggerExtension; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -85,6 +86,32 @@ public class KafkaCommitterTest { } } + @Test + public void testKafkaCommitterClosesProducer() throws IOException, InterruptedException { + Properties properties = getProperties(); + FlinkKafkaInternalProducer<Object, Object> producer = + new FlinkKafkaInternalProducer(properties, TRANSACTIONAL_ID) { + @Override + public void commitTransaction() throws ProducerFencedException {} + + @Override + public void flush() {} + + @Override + public void close() {} + }; + try (final KafkaCommitter committer = new KafkaCommitter(properties); + Recyclable<FlinkKafkaInternalProducer<Object, Object>> recyclable = + new Recyclable<>(producer, p -> {})) { + final MockCommitRequest<KafkaCommittable> request = + new MockCommitRequest<>( + new KafkaCommittable(PRODUCER_ID, EPOCH, TRANSACTIONAL_ID, recyclable)); + + committer.commit(Collections.singletonList(request)); + assertThat(recyclable.isRecycled()).isTrue(); + } + } + Properties getProperties() { Properties properties = new Properties(); properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:1");