This is an automated email from the ASF dual-hosted git repository. ableegoldman pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 3f1a459873ca3cd0efd75819d44d1642b61836a2 Author: Rohan <[email protected]> AuthorDate: Sat Sep 2 18:14:14 2023 -0700 KAFKA-15429: reset transactionInFlight on StreamsProducer close (#14326) Resets the value of transactionInFlight to false when closing the StreamsProducer. This ensures we don't try to commit against a closed producer Reviewers: Anna Sophie Blee-Goldman <[email protected]> --- .../processor/internals/StreamsProducer.java | 5 ++-- .../processor/internals/StreamsProducerTest.java | 28 ++++++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java index 61de201a4fa..bc8a3118fe7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java @@ -192,12 +192,11 @@ public class StreamsProducer { oldProducerTotalBlockedTime += totalBlockedTime(producer); final long start = time.nanoseconds(); - producer.close(); + close(); final long closeTime = time.nanoseconds() - start; oldProducerTotalBlockedTime += closeTime; producer = clientSupplier.getProducer(eosV2ProducerConfigs); - transactionInitialized = false; } private double getMetricValue(final Map<MetricName, ? extends Metric> metrics, @@ -371,6 +370,8 @@ public class StreamsProducer { void close() { producer.close(); + transactionInFlight = false; + transactionInitialized = false; } // for testing only diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java index 9470a7b166e..1787e6092f0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java @@ -207,6 +207,34 @@ public class StreamsProducerTest { // functional tests + @Test + public void shouldResetTransactionInFlightOnClose() { + // given: + eosBetaStreamsProducer.send( + new ProducerRecord<>("topic", new byte[1]), (metadata, error) -> { }); + assertThat(eosBetaStreamsProducer.transactionInFlight(), is(true)); + + // when: + eosBetaStreamsProducer.close(); + + // then: + assertThat(eosBetaStreamsProducer.transactionInFlight(), is(false)); + } + + @Test + public void shouldResetTransactionInFlightOnReset() { + // given: + eosBetaStreamsProducer.send( + new ProducerRecord<>("topic", new byte[1]), (metadata, error) -> { }); + assertThat(eosBetaStreamsProducer.transactionInFlight(), is(true)); + + // when: + eosBetaStreamsProducer.resetProducer(); + + // then: + assertThat(eosBetaStreamsProducer.transactionInFlight(), is(false)); + } + @Test public void shouldCreateProducer() { assertThat(mockClientSupplier.producers.size(), is(1));
