This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 9b69e1e KAFKA-10665: close all kafkaStreams before
purgeLocalStreamsState (#9674)
9b69e1e is described below
commit 9b69e1ee978a39363f7702c062a82c6f2396c17a
Author: Luke Chen <[email protected]>
AuthorDate: Fri Dec 4 22:04:50 2020 +0800
KAFKA-10665: close all kafkaStreams before purgeLocalStreamsState (#9674)
The flaky tests are because we forgot to close the kafkaStreams before
purgeLocalStreamsState, so that sometimes there will be some tmp files be
created/deleted during streams running(ex: checkpoint.tmp), and caused the
DirectoryNotEmptyException or NoSuchFileException be thrown.
Reviewers: Levani Kokhreidze, Bill Bejeck <[email protected]>
---
.../StreamTableJoinTopologyOptimizationIntegrationTest.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
index aefa324..f72bd7d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
@@ -75,6 +75,7 @@ public class
StreamTableJoinTopologyOptimizationIntegrationTest {
private String inputTopic;
private String outputTopic;
private String applicationId;
+ private KafkaStreams kafkaStreams;
private Properties streamsConfiguration;
@@ -119,6 +120,9 @@ public class
StreamTableJoinTopologyOptimizationIntegrationTest {
@After
public void whenShuttingDown() throws IOException {
+ if (kafkaStreams != null) {
+ kafkaStreams.close();
+ }
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}
@@ -137,7 +141,7 @@ public class
StreamTableJoinTopologyOptimizationIntegrationTest {
.join(table, (value1, value2) -> value2)
.to(outputTopic);
- startStreams(streamsBuilder);
+ kafkaStreams = startStreams(streamsBuilder);
final long timestamp = System.currentTimeMillis();
@@ -149,8 +153,6 @@ public class
StreamTableJoinTopologyOptimizationIntegrationTest {
sendEvents(inputTopic, timestamp, expectedRecords);
sendEvents(outputTopic, timestamp, expectedRecords);
- startStreams(streamsBuilder);
-
validateReceivedMessages(
outputTopic,
new IntegerDeserializer(),