This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 9b69e1e  KAFKA-10665: close all kafkaStreams before 
purgeLocalStreamsState (#9674)
9b69e1e is described below

commit 9b69e1ee978a39363f7702c062a82c6f2396c17a
Author: Luke Chen <[email protected]>
AuthorDate: Fri Dec 4 22:04:50 2020 +0800

    KAFKA-10665: close all kafkaStreams before purgeLocalStreamsState (#9674)
    
    The flaky tests are because we forgot to close the kafkaStreams before 
purgeLocalStreamsState, so that sometimes there will be some tmp files be 
created/deleted during streams running(ex: checkpoint.tmp), and caused the 
DirectoryNotEmptyException or NoSuchFileException be thrown.
    
    Reviewers:  Levani Kokhreidze, Bill Bejeck <[email protected]>
---
 .../StreamTableJoinTopologyOptimizationIntegrationTest.java       | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
index aefa324..f72bd7d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
@@ -75,6 +75,7 @@ public class 
StreamTableJoinTopologyOptimizationIntegrationTest {
     private String inputTopic;
     private String outputTopic;
     private String applicationId;
+    private KafkaStreams kafkaStreams;
 
     private Properties streamsConfiguration;
 
@@ -119,6 +120,9 @@ public class 
StreamTableJoinTopologyOptimizationIntegrationTest {
 
     @After
     public void whenShuttingDown() throws IOException {
+        if (kafkaStreams != null) {
+            kafkaStreams.close();
+        }
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
     }
 
@@ -137,7 +141,7 @@ public class 
StreamTableJoinTopologyOptimizationIntegrationTest {
             .join(table, (value1, value2) -> value2)
             .to(outputTopic);
 
-        startStreams(streamsBuilder);
+        kafkaStreams = startStreams(streamsBuilder);
 
         final long timestamp = System.currentTimeMillis();
 
@@ -149,8 +153,6 @@ public class 
StreamTableJoinTopologyOptimizationIntegrationTest {
         sendEvents(inputTopic, timestamp, expectedRecords);
         sendEvents(outputTopic, timestamp, expectedRecords);
 
-        startStreams(streamsBuilder);
-
         validateReceivedMessages(
             outputTopic,
             new IntegerDeserializer(),

Reply via email to