Repository: kafka Updated Branches: refs/heads/trunk f300480f8 -> 1756a22f7
KAFKA-6215: fix transient failures in KafkaStreamsTest - set streams state.dir to test-dir (default /tmp is not reliable) Author: Matthias J. Sax <[email protected]> Reviewers: Damian Guy <[email protected]>, Guozhang Wang <[email protected]>, Ted Yu <[email protected]> Closes #4221 from mjsax/minor-fix-instable-tests Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1756a22f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1756a22f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1756a22f Branch: refs/heads/trunk Commit: 1756a22f7923766175ee4f90bed074cf1b60f932 Parents: f300480 Author: Matthias J. Sax <[email protected]> Authored: Wed Nov 15 11:40:39 2017 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Nov 15 11:40:39 2017 -0800 ---------------------------------------------------------------------- .../apache/kafka/streams/KafkaStreamsTest.java | 46 ++------------------ 1 file changed, 4 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1756a22f/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 69b4584..dd3b9af 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.config.ConfigException; @@ -300,7 +299,9 @@ public class KafkaStreamsTest { @Test public void testNumberDefaultMetrics() { - final KafkaStreams streams = createKafkaStreams(); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1"); + final StreamsBuilder builder = new StreamsBuilder(); + final KafkaStreams streams = new KafkaStreams(builder.build(), props); final Map<MetricName, ? extends Metric> metrics = streams.metrics(); // all 22 default StreamThread metrics + 1 metric that keeps track of number of metrics assertEquals(23, metrics.size()); @@ -308,9 +309,6 @@ public class KafkaStreamsTest { @Test public void testIllegalMetricsConfig() { - final Properties props = new Properties(); - props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig"); final StreamsBuilder builder = new StreamsBuilder(); @@ -322,9 +320,6 @@ public class KafkaStreamsTest { @Test public void testLegalMetricsConfig() { - final Properties props = new Properties(); - props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString()); final StreamsBuilder builder1 = new StreamsBuilder(); final KafkaStreams streams1 = new KafkaStreams(builder1.build(), props); @@ -364,11 +359,6 @@ public class KafkaStreamsTest { public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception { final AtomicBoolean keepRunning = new AtomicBoolean(true); try { - final Properties props = new Properties(); - props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - final StreamsBuilder builder = new StreamsBuilder(); final CountDownLatch latch = new CountDownLatch(1); final String topic = "input"; @@ -409,11 +399,7 @@ public class KafkaStreamsTest { @Test public void shouldReturnThreadMetadata() { - final Properties props = new Properties(); - props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString()); - props.setProperty(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2"); final KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); Set<ThreadMetadata> threadMetadata = streams.localThreadsMetadata(); @@ -427,22 +413,8 @@ public class KafkaStreamsTest { streams.close(); } - - private KafkaStreams createKafkaStreams() { - final Properties props = new Properties(); - props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - - final StreamsBuilder builder = new StreamsBuilder(); - return new KafkaStreams(builder.build(), props); - } - @Test public void testCleanup() { - final Properties props = new Properties(); - props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - final StreamsBuilder builder = new StreamsBuilder(); final KafkaStreams streams = new KafkaStreams(builder.build(), props); @@ -454,10 +426,6 @@ public class KafkaStreamsTest { @Test public void testCannotCleanupWhileRunning() throws InterruptedException { - final Properties props = new Properties(); - props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning"); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - final StreamsBuilder builder = new StreamsBuilder(); final KafkaStreams streams = new KafkaStreams(builder.build(), props); @@ -492,13 +460,7 @@ public class KafkaStreamsTest { @Test public void shouldCleanupOldStateDirs() throws InterruptedException { - final Properties props = new Properties(); - final String appId = "cleanupOldStateDirs"; - final String stateDir = TestUtils.tempDirectory().getPath(); - props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId); - props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1"); - props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); final String topic = "topic"; @@ -518,7 +480,7 @@ public class KafkaStreamsTest { } } }); - final String appDir = stateDir + File.separator + appId; + final String appDir = props.getProperty(StreamsConfig.STATE_DIR_CONFIG) + File.separator + props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG); final File oldTaskDir = new File(appDir, "10_1"); assertTrue(oldTaskDir.mkdirs()); try {
