This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 3f1a459873ca3cd0efd75819d44d1642b61836a2
Author: Rohan <[email protected]>
AuthorDate: Sat Sep 2 18:14:14 2023 -0700

    KAFKA-15429: reset transactionInFlight on StreamsProducer close (#14326)
    
    Resets the value of transactionInFlight to false when closing the
    StreamsProducer. This ensures we don't try to commit against a
    closed producer
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 .../processor/internals/StreamsProducer.java       |  5 ++--
 .../processor/internals/StreamsProducerTest.java   | 28 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index 61de201a4fa..bc8a3118fe7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -192,12 +192,11 @@ public class StreamsProducer {
 
         oldProducerTotalBlockedTime += totalBlockedTime(producer);
         final long start = time.nanoseconds();
-        producer.close();
+        close();
         final long closeTime = time.nanoseconds() - start;
         oldProducerTotalBlockedTime += closeTime;
 
         producer = clientSupplier.getProducer(eosV2ProducerConfigs);
-        transactionInitialized = false;
     }
 
     private double getMetricValue(final Map<MetricName, ? extends Metric> 
metrics,
@@ -371,6 +370,8 @@ public class StreamsProducer {
 
     void close() {
         producer.close();
+        transactionInFlight = false;
+        transactionInitialized = false;
     }
 
     // for testing only
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
index 9470a7b166e..1787e6092f0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
@@ -207,6 +207,34 @@ public class StreamsProducerTest {
 
     // functional tests
 
+    @Test
+    public void shouldResetTransactionInFlightOnClose() {
+        // given:
+        eosBetaStreamsProducer.send(
+            new ProducerRecord<>("topic", new byte[1]), (metadata, error) -> { 
});
+        assertThat(eosBetaStreamsProducer.transactionInFlight(), is(true));
+
+        // when:
+        eosBetaStreamsProducer.close();
+
+        // then:
+        assertThat(eosBetaStreamsProducer.transactionInFlight(), is(false));
+    }
+
+    @Test
+    public void shouldResetTransactionInFlightOnReset() {
+        // given:
+        eosBetaStreamsProducer.send(
+            new ProducerRecord<>("topic", new byte[1]), (metadata, error) -> { 
});
+        assertThat(eosBetaStreamsProducer.transactionInFlight(), is(true));
+
+        // when:
+        eosBetaStreamsProducer.resetProducer();
+
+        // then:
+        assertThat(eosBetaStreamsProducer.transactionInFlight(), is(false));
+    }
+
     @Test
     public void shouldCreateProducer() {
         assertThat(mockClientSupplier.producers.size(), is(1));

Reply via email to