Repository: kafka
Updated Branches:
  refs/heads/trunk cc84686a4 -> f9865d52e


KAFKA-5225; StreamsResetter doesn't allow custom Consumer properties

Author: Matthias J. Sax <matth...@confluent.io>
Author: Bharat Viswanadham <bhar...@us.ibm.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Damian Guy <damian....@gmail.com>

Closes #3970 from mjsax/kafka-5225-streams-resetter-properties


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f9865d52
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f9865d52
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f9865d52

Branch: refs/heads/trunk
Commit: f9865d52e81bbdddb7889d6c3cc7be537e610826
Parents: cc84686
Author: Matthias J. Sax <matth...@confluent.io>
Authored: Mon Oct 2 13:47:45 2017 -0700
Committer: Damian Guy <damian....@gmail.com>
Committed: Mon Oct 2 13:47:45 2017 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 .../main/scala/kafka/tools/StreamsResetter.java |  60 ++-
 .../AbstractResetIntegrationTest.java           | 473 +++++++++++++++++++
 .../integration/ResetIntegrationTest.java       | 352 +-------------
 .../ResetIntegrationWithSslTest.java            |  96 ++++
 .../integration/utils/KafkaEmbedded.java        |   5 +-
 6 files changed, 615 insertions(+), 372 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f9865d52/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index cbae7b0..d7b799b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -893,6 +893,7 @@ project(':streams') {
     testCompile project(':core').sourceSets.test.output
     testCompile libs.junit
     testCompile libs.easymock
+    testCompile libs.bcpkix
 
     testRuntime libs.slf4jlog4j
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f9865d52/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java 
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 09d0d75..5539258 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -16,7 +16,11 @@
  */
 package kafka.tools;
 
-
+import joptsimple.OptionException;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.DeleteTopicsResult;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
@@ -27,9 +31,11 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -38,12 +44,6 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import joptsimple.OptionException;
-import joptsimple.OptionParser;
-import joptsimple.OptionSet;
-import joptsimple.OptionSpec;
-import joptsimple.OptionSpecBuilder;
-
 /**
  * {@link StreamsResetter} resets the processing state of a Kafka Streams 
application so that, for example, you can reprocess its input from scratch.
  * <p>
@@ -71,14 +71,13 @@ public class StreamsResetter {
     private static final int EXIT_CODE_ERROR = 1;
 
     private static OptionSpec<String> bootstrapServerOption;
-    private static OptionSpecBuilder zookeeperOption;
     private static OptionSpec<String> applicationIdOption;
     private static OptionSpec<String> inputTopicsOption;
     private static OptionSpec<String> intermediateTopicsOption;
     private static OptionSpecBuilder dryRunOption;
+    private static OptionSpec<String> commandConfigOption;
 
     private OptionSet options = null;
-    private final Properties consumerConfig = new Properties();
     private final List<String> allTopics = new LinkedList<>();
     private boolean dryRun = false;
 
@@ -86,10 +85,8 @@ public class StreamsResetter {
         return run(args, new Properties());
     }
 
-    public int run(final String[] args, final Properties config) {
-        consumerConfig.clear();
-        consumerConfig.putAll(config);
-
+    public int run(final String[] args,
+                   final Properties config) {
         int exitCode = EXIT_CODE_SUCCESS;
 
         KafkaAdminClient kafkaAdminClient = null;
@@ -99,12 +96,14 @@ public class StreamsResetter {
             dryRun = options.has(dryRunOption);
 
             final String groupId = options.valueOf(applicationIdOption);
+            final Properties properties = new Properties();
+            if (options.has(commandConfigOption)) {
+                
properties.putAll(Utils.loadProps(options.valueOf(commandConfigOption)));
+            }
+            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
options.valueOf(bootstrapServerOption));
 
-            validateNoActiveConsumers(groupId);
-
-            final Properties adminClientProperties = new Properties();
-            adminClientProperties.put("bootstrap.servers", 
options.valueOf(bootstrapServerOption));
-            kafkaAdminClient = (KafkaAdminClient) 
AdminClient.create(adminClientProperties);
+            validateNoActiveConsumers(groupId, properties);
+            kafkaAdminClient = (KafkaAdminClient) 
AdminClient.create(properties);
 
             allTopics.clear();
             allTopics.addAll(kafkaAdminClient.listTopics().names().get(60, 
TimeUnit.SECONDS));
@@ -112,7 +111,10 @@ public class StreamsResetter {
             if (dryRun) {
                 System.out.println("----Dry run displays the actions which 
will be performed when running Streams Reset Tool----");
             }
-            maybeResetInputAndSeekToEndIntermediateTopicOffsets();
+
+            final HashMap<Object, Object> consumerConfig = new 
HashMap<>(config);
+            consumerConfig.putAll(properties);
+            
maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig);
             maybeDeleteInternalTopics(kafkaAdminClient);
 
         } catch (final Throwable e) {
@@ -128,10 +130,11 @@ public class StreamsResetter {
         return exitCode;
     }
 
-    private void validateNoActiveConsumers(final String groupId) {
+    private void validateNoActiveConsumers(final String groupId,
+                                           final Properties properties) {
         kafka.admin.AdminClient olderAdminClient = null;
         try {
-            olderAdminClient = 
kafka.admin.AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption));
+            olderAdminClient = kafka.admin.AdminClient.create(properties);
             if (!olderAdminClient.describeConsumerGroup(groupId, 
0).consumers().get().isEmpty()) {
                 throw new IllegalStateException("Consumer group '" + groupId + 
"' is still active. "
                                                 + "Make sure to stop all 
running application instances before running the reset tool.");
@@ -156,8 +159,6 @@ public class StreamsResetter {
             .ofType(String.class)
             .defaultsTo("localhost:9092")
             .describedAs("urls");
-        zookeeperOption = optionParser.accepts("zookeeper", "Zookeeper option 
is deprecated by bootstrap.servers, as the reset tool would no longer access 
Zookeeper directly.");
-
         inputTopicsOption = optionParser.accepts("input-topics", 
"Comma-separated list of user input topics. For these topics, the tool will 
reset the offset to the earliest available offset.")
             .withRequiredArg()
             .ofType(String.class)
@@ -168,8 +169,15 @@ public class StreamsResetter {
             .ofType(String.class)
             .withValuesSeparatedBy(',')
             .describedAs("list");
+        commandConfigOption = optionParser.accepts("config-file", "Property 
file containing configs to be passed to admin clients and embedded consumer.")
+            .withRequiredArg()
+            .ofType(String.class)
+            .describedAs("file name");
         dryRunOption = optionParser.accepts("dry-run", "Display the actions 
that would be performed without executing the reset commands.");
 
+        // TODO: deprecated in 1.0; can be removed eventually
+        optionParser.accepts("zookeeper", "Zookeeper option is deprecated by 
bootstrap.servers, as the reset tool would no longer access Zookeeper 
directly.");
+
         try {
             options = optionParser.parse(args);
         } catch (final OptionException e) {
@@ -178,7 +186,7 @@ public class StreamsResetter {
         }
     }
 
-    private void maybeResetInputAndSeekToEndIntermediateTopicOffsets() {
+    private void maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map 
consumerConfig) {
         final List<String> inputTopics = options.valuesOf(inputTopicsOption);
         final List<String> intermediateTopics = 
options.valuesOf(intermediateTopicsOption);
 
@@ -219,7 +227,6 @@ public class StreamsResetter {
 
         final Properties config = new Properties();
         config.putAll(consumerConfig);
-        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
options.valueOf(bootstrapServerOption));
         config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
@@ -274,7 +281,8 @@ public class StreamsResetter {
         System.out.println("Done.");
     }
 
-    private void maybeSeekToEnd(final KafkaConsumer<byte[], byte[]> client, 
final Set<TopicPartition> intermediateTopicPartitions) {
+    private void maybeSeekToEnd(final KafkaConsumer<byte[], byte[]> client,
+                                final Set<TopicPartition> 
intermediateTopicPartitions) {
 
         final String groupId = options.valueOf(applicationIdOption);
         final List<String> intermediateTopics = 
options.valuesOf(intermediateTopicsOption);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f9865d52/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
new file mode 100644
index 0000000..6ab7141
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.admin.AdminClient;
+import kafka.tools.StreamsResetter;
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+abstract class AbstractResetIntegrationTest {
+    private final static Logger log = 
LoggerFactory.getLogger(AbstractResetIntegrationTest.class);
+
+    static final int NUM_BROKERS = 1;
+
+    private static final String APP_ID = "cleanup-integration-test";
+    private static final String INPUT_TOPIC = "inputTopic";
+    private static final String OUTPUT_TOPIC = "outputTopic";
+    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+    private static final String INTERMEDIATE_USER_TOPIC = "userTopic";
+
+    private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
+    private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
+    private static final int TIMEOUT_MULTIPLIER = 5;
+
+    private static AdminClient adminClient = null;
+    private static KafkaAdminClient kafkaAdminClient = null;
+    private static int testNo = 0;
+
+    static EmbeddedKafkaCluster cluster;
+    static String bootstrapServers;
+    static MockTime mockTime;
+
+    private final AbstractResetIntegrationTest.WaitUntilConsumerGroupGotClosed 
consumerGroupInactive = new 
AbstractResetIntegrationTest.WaitUntilConsumerGroupGotClosed();
+
+    private class WaitUntilConsumerGroupGotClosed implements TestCondition {
+        @Override
+        public boolean conditionMet() {
+            return adminClient.describeConsumerGroup(APP_ID, 
0).consumers().get().isEmpty();
+        }
+    }
+
+    static void afterClassGlobalCleanup() {
+        if (adminClient != null) {
+            adminClient.close();
+            adminClient = null;
+        }
+
+        if (kafkaAdminClient != null) {
+            kafkaAdminClient.close(10, TimeUnit.SECONDS);
+            kafkaAdminClient = null;
+        }
+    }
+
+    void beforePrepareTest() throws Exception {
+        ++testNo;
+        bootstrapServers = cluster.bootstrapServers();
+        mockTime = cluster.time;
+
+        Properties sslConfig = getClientSslConfig();
+        if (sslConfig == null) {
+            sslConfig = new Properties();
+            sslConfig.put("bootstrap.servers", bootstrapServers);
+        }
+
+        if (adminClient == null) {
+            adminClient = AdminClient.create(sslConfig);
+        }
+
+        if (kafkaAdminClient == null) {
+            kafkaAdminClient =  (KafkaAdminClient) 
org.apache.kafka.clients.admin.AdminClient.create(sslConfig);
+        }
+
+        // busy wait until cluster (ie, ConsumerGroupCoordinator) is available
+        while (true) {
+            Thread.sleep(50);
+
+            try {
+                TestUtils.waitForCondition(consumerGroupInactive, 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
+                    "Test consumer group active even after waiting " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+            } catch (final TimeoutException e) {
+                continue;
+            }
+            break;
+        }
+
+        prepareInputData();
+    }
+
+    Properties getClientSslConfig() {
+        return null;
+    }
+
+    void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() 
throws Exception {
+        final Properties sslConfig = getClientSslConfig();
+        final Properties streamsConfiguration = prepareTest();
+
+        final Properties resultTopicConsumerConfig = new Properties();
+        if (sslConfig != null) {
+            resultTopicConsumerConfig.putAll(sslConfig);
+        }
+        resultTopicConsumerConfig.putAll(TestUtils.consumerConfig(
+            bootstrapServers,
+            APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
+            LongDeserializer.class,
+            LongDeserializer.class));
+
+        // RUN
+        KafkaStreams streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        final KafkaStreams handlerReference = streams;
+        streams.setUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                handlerReference.close(10, TimeUnit.SECONDS);
+                log.error("Streams application failed: ", e);
+            }
+        });
+        streams.start();
+        final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            resultTopicConsumerConfig,
+            OUTPUT_TOPIC,
+            10);
+
+        streams.close();
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * 
STREAMS_CONSUMER_TIMEOUT,
+            "Streams Application consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+
+        // RESET
+        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
+        final KafkaStreams handlerReference2 = streams;
+        streams.setUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                handlerReference2.close(10, TimeUnit.SECONDS);
+                log.error("Streams application failed: ", e);
+            }
+        });
+        streams.cleanUp();
+        cleanGlobal(null, sslConfig);
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * 
CLEANUP_CONSUMER_TIMEOUT,
+            "Reset Tool consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+
+        assertInternalTopicsGotDeleted(null);
+
+        // RE-RUN
+        streams.start();
+        final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            resultTopicConsumerConfig,
+            OUTPUT_TOPIC,
+            10);
+        streams.close();
+
+        assertThat(resultRerun, equalTo(result));
+
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * 
CLEANUP_CONSUMER_TIMEOUT,
+            "Reset Tool consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        cleanGlobal(null, sslConfig);
+    }
+
+    void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() 
throws Exception {
+        cluster.createTopic(INTERMEDIATE_USER_TOPIC);
+
+        final Properties sslConfig = getClientSslConfig();
+        final Properties streamsConfiguration = prepareTest();
+
+        final Properties resultTopicConsumerConfig = new Properties();
+        if (sslConfig != null) {
+            resultTopicConsumerConfig.putAll(sslConfig);
+        }
+        resultTopicConsumerConfig.putAll(TestUtils.consumerConfig(
+            bootstrapServers,
+            APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
+            LongDeserializer.class,
+            LongDeserializer.class));
+
+        // RUN
+        KafkaStreams streams = new 
KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), 
streamsConfiguration);
+        final KafkaStreams handlerReference = streams;
+        streams.setUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                handlerReference.close(10, TimeUnit.SECONDS);
+                log.error("Streams application failed: ", e);
+            }
+        });
+        streams.start();
+        final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            resultTopicConsumerConfig,
+            OUTPUT_TOPIC,
+            10);
+        // receive only first values to make sure intermediate user topic is 
not consumed completely
+        // => required to test "seekToEnd" for intermediate topics
+        final List<KeyValue<Long, Long>> result2 = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            resultTopicConsumerConfig,
+            OUTPUT_TOPIC_2,
+            40
+        );
+
+        streams.close();
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * 
STREAMS_CONSUMER_TIMEOUT,
+            "Streams Application consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
+
+        // insert bad record to make sure intermediate user topic gets 
seekToEnd()
+        mockTime.sleep(1);
+        Properties producerConfig = sslConfig;
+        if (producerConfig == null) {
+            producerConfig = new Properties();
+        }
+        producerConfig.putAll(TestUtils.producerConfig(bootstrapServers, 
LongSerializer.class, StringSerializer.class));
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            INTERMEDIATE_USER_TOPIC,
+            Collections.singleton(new KeyValue<>(-1L, 
"badRecord-ShouldBeSkipped")),
+            producerConfig,
+            mockTime.milliseconds());
+
+        // RESET
+        streams = new 
KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), 
streamsConfiguration);
+        final KafkaStreams handlerReference2 = streams;
+        streams.setUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                handlerReference2.close(10, TimeUnit.SECONDS);
+                log.error("Streams application failed: ", e);
+            }
+        });
+        streams.cleanUp();
+        cleanGlobal(INTERMEDIATE_USER_TOPIC, sslConfig);
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * 
CLEANUP_CONSUMER_TIMEOUT,
+            "Reset Tool consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+
+        assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC);
+
+        // RE-RUN
+        streams.start();
+        final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            resultTopicConsumerConfig,
+            OUTPUT_TOPIC,
+            10);
+        final List<KeyValue<Long, Long>> resultRerun2 = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+            resultTopicConsumerConfig,
+            OUTPUT_TOPIC_2_RERUN,
+            40
+        );
+        streams.close();
+
+        assertThat(resultRerun, equalTo(result));
+        assertThat(resultRerun2, equalTo(result2));
+
+        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * 
CLEANUP_CONSUMER_TIMEOUT,
+            "Reset Tool consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
+        cleanGlobal(INTERMEDIATE_USER_TOPIC, sslConfig);
+
+        cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
+    }
+
+    private Properties prepareTest() throws IOException {
+        Properties streamsConfiguration = getClientSslConfig();
+        if (streamsConfiguration == null) {
+            streamsConfiguration = new Properties();
+        }
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
testNo);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
100);
+        streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" 
+ STREAMS_CONSUMER_TIMEOUT);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, 
true);
+
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+
+        return streamsConfiguration;
+    }
+
+    private void prepareInputData() throws Exception {
+        cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, 
OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
+
+        Properties producerConfig = getClientSslConfig();
+        if (producerConfig == null) {
+            producerConfig = new Properties();
+        }
+        producerConfig.putAll(TestUtils.producerConfig(bootstrapServers, 
LongSerializer.class, StringSerializer.class));
+
+        mockTime.sleep(10);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, 
mockTime.milliseconds());
+        mockTime.sleep(10);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, 
mockTime.milliseconds());
+        mockTime.sleep(10);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, 
mockTime.milliseconds());
+        mockTime.sleep(10);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, 
mockTime.milliseconds());
+        mockTime.sleep(10);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, 
mockTime.milliseconds());
+        mockTime.sleep(10);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, 
mockTime.milliseconds());
+        mockTime.sleep(1);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, 
mockTime.milliseconds());
+        mockTime.sleep(1);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, 
mockTime.milliseconds());
+        mockTime.sleep(1);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, 
mockTime.milliseconds());
+        mockTime.sleep(1);
+        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, 
mockTime.milliseconds());
+    }
+
+    private Topology setupTopologyWithIntermediateUserTopic(final String 
outputTopic2) {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
+
+        // use map to trigger internal re-partitioning before groupByKey
+        input.map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
+            @Override
+            public KeyValue<Long, String> apply(final Long key, final String 
value) {
+                return new KeyValue<>(key, value);
+            }
+        })
+            .groupByKey()
+            .count()
+            .toStream()
+            .to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
+
+        input.through(INTERMEDIATE_USER_TOPIC)
+            .groupByKey()
+            .windowedBy(TimeWindows.of(35).advanceBy(10))
+            .count()
+            .toStream()
+            .map(new KeyValueMapper<Windowed<Long>, Long, KeyValue<Long, 
Long>>() {
+                @Override
+                public KeyValue<Long, Long> apply(final Windowed<Long> key, 
final Long value) {
+                    return new KeyValue<>(key.window().start() + 
key.window().end(), value);
+                }
+            })
+            .to(outputTopic2, Produced.with(Serdes.Long(), Serdes.Long()));
+
+        return builder.build();
+    }
+
+    private Topology setupTopologyWithoutIntermediateUserTopic() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
+
+        // use map to trigger internal re-partitioning before groupByKey
+        input.map(new KeyValueMapper<Long, String, KeyValue<Long, Long>>() {
+            @Override
+            public KeyValue<Long, Long> apply(final Long key, final String 
value) {
+                return new KeyValue<>(key, key);
+            }
+        }).to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
+
+        return builder.build();
+    }
+
+    private void cleanGlobal(final String intermediateUserTopic, final 
Properties sslConfig) throws Exception {
+        // leaving --zookeeper arg here to ensure tool works if users add it
+        final String[] parameters;
+        if (intermediateUserTopic != null) {
+            parameters = new String[]{
+                "--application-id", APP_ID + testNo,
+                "--bootstrap-servers", bootstrapServers,
+                "--input-topics", INPUT_TOPIC,
+                "--intermediate-topics", INTERMEDIATE_USER_TOPIC,
+                "--zookeeper", "localhost:2181"
+            };
+        } else {
+            if (sslConfig != null) {
+                final File configFile = TestUtils.tempFile();
+                final BufferedWriter writer = new BufferedWriter(new 
FileWriter(configFile));
+                writer.write(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG + 
"=SSL\n");
+                writer.write(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG + "=" + 
sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) + "\n");
+                writer.write(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG + "=" + 
sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) + "\n");
+                writer.close();
+
+                parameters = new String[]{
+                    "--application-id", APP_ID + testNo,
+                    "--bootstrap-servers", bootstrapServers,
+                    "--input-topics", INPUT_TOPIC,
+                    "--config-file", configFile.getAbsolutePath()
+                };
+            } else {
+                parameters = new String[]{
+                    "--application-id", APP_ID + testNo,
+                    "--bootstrap-servers", bootstrapServers,
+                    "--input-topics", INPUT_TOPIC
+                };
+            }
+        }
+        final Properties cleanUpConfig = new Properties();
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
CLEANUP_CONSUMER_TIMEOUT);
+
+        final int exitCode = new StreamsResetter().run(parameters, 
cleanUpConfig);
+        Assert.assertEquals(0, exitCode);
+    }
+
+    private void assertInternalTopicsGotDeleted(final String 
intermediateUserTopic) throws Exception {
+        final Set<String> expectedRemainingTopicsAfterCleanup = new 
HashSet<>();
+        expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC);
+        if (intermediateUserTopic != null) {
+            expectedRemainingTopicsAfterCleanup.add(intermediateUserTopic);
+        }
+        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC);
+        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2);
+        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN);
+        expectedRemainingTopicsAfterCleanup.add("__consumer_offsets");
+
+        final Set<String> allTopics = new HashSet<>();
+
+        final ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
+        listTopicsOptions.listInternal(true);
+        
allTopics.addAll(kafkaAdminClient.listTopics(listTopicsOptions).names().get(30000,
 TimeUnit.MILLISECONDS));
+        assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f9865d52/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index d76f5da..549f8f1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -16,57 +16,22 @@
  */
 package org.apache.kafka.streams.integration;
 
-import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.admin.ListTopicsOptions;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.Topology;
+import kafka.server.KafkaConfig$;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestCondition;
-import org.apache.kafka.test.TestUtils;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import kafka.admin.AdminClient;
-import kafka.server.KafkaConfig$;
-import kafka.tools.StreamsResetter;
-import kafka.utils.MockTime;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
 
 /**
  * Tests local state store and global application cleanup.
  */
 @Category({IntegrationTest.class})
-public class ResetIntegrationTest {
-    private static final int NUM_BROKERS = 1;
+public class ResetIntegrationTest extends AbstractResetIntegrationTest {
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER;
@@ -80,327 +45,26 @@ public class ResetIntegrationTest {
         // otherwise, input records could fall into different windows for 
different runs depending on the initial mock time
         final long alignedTime = (System.currentTimeMillis() / 1000) * 1000;
         CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props, alignedTime);
+        cluster = CLUSTER;
     }
 
-    private static final String APP_ID = "cleanup-integration-test";
-    private static final String INPUT_TOPIC = "inputTopic";
-    private static final String OUTPUT_TOPIC = "outputTopic";
-    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
-    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
-    private static final String INTERMEDIATE_USER_TOPIC = "userTopic";
-
-    private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
-    private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
-    private static final int TIMEOUT_MULTIPLIER = 5;
-
-    private static int testNo = 0;
-    private static AdminClient adminClient = null;
-    private static KafkaAdminClient kafkaAdminClient = null;
-
-    private final MockTime mockTime = CLUSTER.time;
-    private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new 
WaitUntilConsumerGroupGotClosed();
-
     @AfterClass
     public static void globalCleanup() {
-        if (adminClient != null) {
-            adminClient.close();
-            adminClient = null;
-        }
-
-        if (kafkaAdminClient != null) {
-            kafkaAdminClient.close(10, TimeUnit.SECONDS);
-            kafkaAdminClient = null;
-        }
+        afterClassGlobalCleanup();
     }
 
     @Before
-    public void cleanup() throws Exception {
-        ++testNo;
-
-        if (adminClient == null) {
-            adminClient = 
AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers());
-        }
-
-        if (kafkaAdminClient == null) {
-            Properties props = new Properties();
-            props.put("bootstrap.servers", CLUSTER.bootstrapServers());
-            kafkaAdminClient =  (KafkaAdminClient) 
org.apache.kafka.clients.admin.AdminClient.create(props);
-        }
-
-        // busy wait until cluster (ie, ConsumerGroupCoordinator) is available
-        while (true) {
-            Thread.sleep(50);
-
-            try {
-                TestUtils.waitForCondition(consumerGroupInactive, 
TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT,
-                        "Test consumer group active even after waiting " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
-            } catch (final TimeoutException e) {
-                continue;
-            }
-            break;
-        }
-
-        prepareInputData();
+    public void before() throws Exception {
+        beforePrepareTest();
     }
 
     @Test
     public void 
testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws 
Exception {
-        CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC);
-
-        final Properties streamsConfiguration = prepareTest(4);
-        final Properties resultTopicConsumerConfig = TestUtils.consumerConfig(
-            CLUSTER.bootstrapServers(),
-            APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
-            LongDeserializer.class,
-            LongDeserializer.class);
-
-        // RUN
-        KafkaStreams streams = new 
KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), 
streamsConfiguration);
-        streams.start();
-        final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            resultTopicConsumerConfig,
-            OUTPUT_TOPIC,
-            10);
-        // receive only first values to make sure intermediate user topic is 
not consumed completely
-        // => required to test "seekToEnd" for intermediate topics
-        final List<KeyValue<Long, Long>> result2 = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            resultTopicConsumerConfig,
-            OUTPUT_TOPIC_2,
-            40
-        );
-
-        streams.close();
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * 
STREAMS_CONSUMER_TIMEOUT,
-            "Streams Application consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
-
-        // insert bad record to make sure intermediate user topic gets 
seekToEnd()
-        mockTime.sleep(1);
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-                INTERMEDIATE_USER_TOPIC,
-                Collections.singleton(new KeyValue<>(-1L, 
"badRecord-ShouldBeSkipped")), 
TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, 
StringSerializer.class), mockTime.milliseconds());
-
-        // RESET
-        streams = new 
KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), 
streamsConfiguration);
-        streams.cleanUp();
-        cleanGlobal(INTERMEDIATE_USER_TOPIC);
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * 
CLEANUP_CONSUMER_TIMEOUT,
-            "Reset Tool consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
-
-        assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC);
-
-        // RE-RUN
-        streams.start();
-        final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            resultTopicConsumerConfig,
-            OUTPUT_TOPIC,
-            10);
-        final List<KeyValue<Long, Long>> resultRerun2 = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            resultTopicConsumerConfig,
-            OUTPUT_TOPIC_2_RERUN,
-            40
-        );
-        streams.close();
-
-        assertThat(resultRerun, equalTo(result));
-        assertThat(resultRerun2, equalTo(result2));
-
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * 
CLEANUP_CONSUMER_TIMEOUT,
-                "Reset Tool consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
-        cleanGlobal(INTERMEDIATE_USER_TOPIC);
-
-        CLUSTER.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
+        super.testReprocessingFromScratchAfterResetWithIntermediateUserTopic();
     }
 
     @Test
     public void 
testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws 
Exception {
-        final Properties streamsConfiguration = prepareTest(1);
-        final Properties resultTopicConsumerConfig = TestUtils.consumerConfig(
-                CLUSTER.bootstrapServers(),
-                APP_ID + "-standard-consumer-" + OUTPUT_TOPIC,
-                LongDeserializer.class,
-                LongDeserializer.class);
-
-        // RUN
-        KafkaStreams streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
-        streams.start();
-        final List<KeyValue<Long, Long>> result = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                resultTopicConsumerConfig,
-                OUTPUT_TOPIC,
-                10);
-
-        streams.close();
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * 
STREAMS_CONSUMER_TIMEOUT,
-                "Streams Application consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms.");
-
-        // RESET
-        streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration);
-        streams.cleanUp();
-        cleanGlobal(null);
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * 
CLEANUP_CONSUMER_TIMEOUT,
-                "Reset Tool consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
-
-        assertInternalTopicsGotDeleted(null);
-
-        // RE-RUN
-        streams.start();
-        final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                resultTopicConsumerConfig,
-                OUTPUT_TOPIC,
-                10);
-        streams.close();
-
-        assertThat(resultRerun, equalTo(result));
-
-        TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * 
CLEANUP_CONSUMER_TIMEOUT,
-                "Reset Tool consumer group did not time out after " + 
(TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms.");
-        cleanGlobal(null);
+        
super.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic();
     }
-
-    private Properties prepareTest(final int threads) throws IOException {
-        final Properties streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
testNo);
-        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
-        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 
threads);
-        
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-        streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
100);
-        streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" 
+ STREAMS_CONSUMER_TIMEOUT);
-        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        
streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, 
true);
-
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
-
-        return streamsConfiguration;
-    }
-
-    private void prepareInputData() throws Exception {
-        CLUSTER.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, 
OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
-
-        final Properties producerConfig = 
TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, 
StringSerializer.class);
-
-        mockTime.sleep(10);
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, 
mockTime.milliseconds());
-        mockTime.sleep(10);
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, 
mockTime.milliseconds());
-        mockTime.sleep(10);
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, 
mockTime.milliseconds());
-        mockTime.sleep(10);
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, 
mockTime.milliseconds());
-        mockTime.sleep(10);
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, 
mockTime.milliseconds());
-        mockTime.sleep(10);
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, 
mockTime.milliseconds());
-        mockTime.sleep(1);
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, 
mockTime.milliseconds());
-        mockTime.sleep(1);
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, 
mockTime.milliseconds());
-        mockTime.sleep(1);
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, 
mockTime.milliseconds());
-        mockTime.sleep(1);
-        
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, 
Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, 
mockTime.milliseconds());
-    }
-
-    private Topology setupTopologyWithIntermediateUserTopic(final String 
outputTopic2) {
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
-
-        // use map to trigger internal re-partitioning before groupByKey
-        input.map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
-                @Override
-                public KeyValue<Long, String> apply(final Long key, final 
String value) {
-                    return new KeyValue<>(key, value);
-                }
-            })
-            .groupByKey()
-            .count("global-count")
-            .to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC);
-
-        input.through(INTERMEDIATE_USER_TOPIC)
-            .groupByKey()
-            .count(TimeWindows.of(35).advanceBy(10), "count")
-            .toStream()
-            .map(new KeyValueMapper<Windowed<Long>, Long, KeyValue<Long, 
Long>>() {
-                @Override
-                public KeyValue<Long, Long> apply(final Windowed<Long> key, 
final Long value) {
-                    return new KeyValue<>(key.window().start() + 
key.window().end(), value);
-                }
-            })
-            .to(Serdes.Long(), Serdes.Long(), outputTopic2);
-
-        return builder.build();
-    }
-
-    private Topology setupTopologyWithoutIntermediateUserTopic() {
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
-
-        // use map to trigger internal re-partitioning before groupByKey
-        input.map(new KeyValueMapper<Long, String, KeyValue<Long, Long>>() {
-            @Override
-            public KeyValue<Long, Long> apply(final Long key, final String 
value) {
-                return new KeyValue<>(key, key);
-            }
-        }).to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC);
-
-        return builder.build();
-    }
-
-    private void cleanGlobal(final String intermediateUserTopic) {
-        // leaving --zookeeper arg here to ensure tool works if users add it
-        final String[] parameters;
-        if (intermediateUserTopic != null) {
-            parameters = new String[]{
-                "--application-id", APP_ID + testNo,
-                "--bootstrap-servers", CLUSTER.bootstrapServers(),
-                "--input-topics", INPUT_TOPIC,
-                "--intermediate-topics", INTERMEDIATE_USER_TOPIC,
-                "--zookeeper", "localhost:2181"
-            };
-        } else {
-            parameters = new String[]{
-                "--application-id", APP_ID + testNo,
-                "--bootstrap-servers", CLUSTER.bootstrapServers(),
-                "--input-topics", INPUT_TOPIC
-            };
-        }
-        final Properties cleanUpConfig = new Properties();
-        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
-        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
CLEANUP_CONSUMER_TIMEOUT);
-
-        final int exitCode = new StreamsResetter().run(parameters, 
cleanUpConfig);
-        Assert.assertEquals(0, exitCode);
-    }
-
-    private void assertInternalTopicsGotDeleted(final String 
intermediateUserTopic) throws Exception {
-        final Set<String> expectedRemainingTopicsAfterCleanup = new 
HashSet<>();
-        expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC);
-        if (intermediateUserTopic != null) {
-            expectedRemainingTopicsAfterCleanup.add(intermediateUserTopic);
-        }
-        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC);
-        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2);
-        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN);
-        expectedRemainingTopicsAfterCleanup.add("__consumer_offsets");
-
-        final Set<String> allTopics = new HashSet<>();
-
-        final ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
-        listTopicsOptions.listInternal(true);
-        
allTopics.addAll(kafkaAdminClient.listTopics(listTopicsOptions).names().get(30000,
 TimeUnit.MILLISECONDS));
-        assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup));
-
-    }
-
-    private class WaitUntilConsumerGroupGotClosed implements TestCondition {
-        @Override
-        public boolean conditionMet() {
-            return adminClient.describeConsumerGroup(APP_ID + testNo, 
0).consumers().get().isEmpty();
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f9865d52/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
new file mode 100644
index 0000000..d018225
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.server.KafkaConfig$;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Tests command line SSL setup for reset tool.
+ */
+@Category({IntegrationTest.class})
+public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest {
+
+    private static Map<String, Object> sslConfig;
+    static {
+        try {
+            sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, 
TestUtils.tempFile(), "testCert");
+        } catch (final Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER;
+    static {
+        final Properties props = new Properties();
+        // we double the value passed to `time.sleep` in each iteration in one 
of the map functions, so we disable
+        // expiration of connections by the brokers to avoid errors when 
`AdminClient` sends requests after potentially
+        // very long sleep times
+        props.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L);
+        props.put(KafkaConfig$.MODULE$.ListenersProp(), 
"SSL://localhost:9092");
+        props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), "SSL");
+        props.putAll(sslConfig);
+        // we align time to seconds to get clean window boundaries and thus 
ensure the same result for each run
+        // otherwise, input records could fall into different windows for 
different runs depending on the initial mock time
+        final long alignedTime = (System.currentTimeMillis() / 1000) * 1000;
+        CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props, alignedTime);
+        cluster = CLUSTER;
+    }
+
+    @AfterClass
+    public static void globalCleanup() {
+        afterClassGlobalCleanup();
+    }
+
+    @Before
+    public void before() throws Exception {
+        beforePrepareTest();
+    }
+
+    Properties getClientSslConfig() {
+        final Properties props = new Properties();
+
+        props.put("bootstrap.servers", CLUSTER.bootstrapServers());
+        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
+        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) 
sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
+
+        return props;
+    }
+
+    @Test
+    public void 
testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws 
Exception {
+        
super.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f9865d52/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index 18c1995..1863484 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -29,7 +29,6 @@ import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
 import org.apache.kafka.common.network.ListenerName;
-import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -110,7 +109,9 @@ public class KafkaEmbedded {
      * You can use this to tell Kafka producers and consumers how to connect 
to this instance.
      */
     public String brokerList() {
-        return kafka.config().hostName() + ":" + 
kafka.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT));
+        Object listenerConfig = 
effectiveConfig.get(KafkaConfig$.MODULE$.InterBrokerListenerNameProp());
+        return kafka.config().hostName() + ":" + kafka.boundPort(
+            new ListenerName(listenerConfig != null ? 
listenerConfig.toString() : "PLAINTEXT"));
     }
 
 

Reply via email to