Repository: kafka Updated Branches: refs/heads/trunk cc84686a4 -> f9865d52e
KAFKA-5225; StreamsResetter doesn't allow custom Consumer properties Author: Matthias J. Sax <matth...@confluent.io> Author: Bharat Viswanadham <bhar...@us.ibm.com> Reviewers: Ismael Juma <ism...@juma.me.uk>, Damian Guy <damian....@gmail.com> Closes #3970 from mjsax/kafka-5225-streams-resetter-properties Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f9865d52 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f9865d52 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f9865d52 Branch: refs/heads/trunk Commit: f9865d52e81bbdddb7889d6c3cc7be537e610826 Parents: cc84686 Author: Matthias J. Sax <matth...@confluent.io> Authored: Mon Oct 2 13:47:45 2017 -0700 Committer: Damian Guy <damian....@gmail.com> Committed: Mon Oct 2 13:47:45 2017 -0700 ---------------------------------------------------------------------- build.gradle | 1 + .../main/scala/kafka/tools/StreamsResetter.java | 60 ++- .../AbstractResetIntegrationTest.java | 473 +++++++++++++++++++ .../integration/ResetIntegrationTest.java | 352 +------------- .../ResetIntegrationWithSslTest.java | 96 ++++ .../integration/utils/KafkaEmbedded.java | 5 +- 6 files changed, 615 insertions(+), 372 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f9865d52/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index cbae7b0..d7b799b 100644 --- a/build.gradle +++ b/build.gradle @@ -893,6 +893,7 @@ project(':streams') { testCompile project(':core').sourceSets.test.output testCompile libs.junit testCompile libs.easymock + testCompile libs.bcpkix testRuntime libs.slf4jlog4j } http://git-wip-us.apache.org/repos/asf/kafka/blob/f9865d52/core/src/main/scala/kafka/tools/StreamsResetter.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 09d0d75..5539258 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -16,7 +16,11 @@ */ package kafka.tools; - +import joptsimple.OptionException; +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.KafkaAdminClient; @@ -27,9 +31,11 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -38,12 +44,6 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; -import joptsimple.OptionException; -import joptsimple.OptionParser; -import joptsimple.OptionSet; -import joptsimple.OptionSpec; -import joptsimple.OptionSpecBuilder; - /** * {@link StreamsResetter} resets the processing state of a Kafka Streams application so that, for example, you can reprocess its input from scratch. * <p> @@ -71,14 +71,13 @@ public class StreamsResetter { private static final int EXIT_CODE_ERROR = 1; private static OptionSpec<String> bootstrapServerOption; - private static OptionSpecBuilder zookeeperOption; private static OptionSpec<String> applicationIdOption; private static OptionSpec<String> inputTopicsOption; private static OptionSpec<String> intermediateTopicsOption; private static OptionSpecBuilder dryRunOption; + private static OptionSpec<String> commandConfigOption; private OptionSet options = null; - private final Properties consumerConfig = new Properties(); private final List<String> allTopics = new LinkedList<>(); private boolean dryRun = false; @@ -86,10 +85,8 @@ public class StreamsResetter { return run(args, new Properties()); } - public int run(final String[] args, final Properties config) { - consumerConfig.clear(); - consumerConfig.putAll(config); - + public int run(final String[] args, + final Properties config) { int exitCode = EXIT_CODE_SUCCESS; KafkaAdminClient kafkaAdminClient = null; @@ -99,12 +96,14 @@ public class StreamsResetter { dryRun = options.has(dryRunOption); final String groupId = options.valueOf(applicationIdOption); + final Properties properties = new Properties(); + if (options.has(commandConfigOption)) { + properties.putAll(Utils.loadProps(options.valueOf(commandConfigOption))); + } + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption)); - validateNoActiveConsumers(groupId); - - final Properties adminClientProperties = new Properties(); - adminClientProperties.put("bootstrap.servers", options.valueOf(bootstrapServerOption)); - kafkaAdminClient = (KafkaAdminClient) AdminClient.create(adminClientProperties); + validateNoActiveConsumers(groupId, properties); + kafkaAdminClient = (KafkaAdminClient) AdminClient.create(properties); allTopics.clear(); allTopics.addAll(kafkaAdminClient.listTopics().names().get(60, TimeUnit.SECONDS)); @@ -112,7 +111,10 @@ public class StreamsResetter { if (dryRun) { System.out.println("----Dry run displays the actions which will be performed when running Streams Reset Tool----"); } - maybeResetInputAndSeekToEndIntermediateTopicOffsets(); + + final HashMap<Object, Object> consumerConfig = new HashMap<>(config); + consumerConfig.putAll(properties); + maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig); maybeDeleteInternalTopics(kafkaAdminClient); } catch (final Throwable e) { @@ -128,10 +130,11 @@ public class StreamsResetter { return exitCode; } - private void validateNoActiveConsumers(final String groupId) { + private void validateNoActiveConsumers(final String groupId, + final Properties properties) { kafka.admin.AdminClient olderAdminClient = null; try { - olderAdminClient = kafka.admin.AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption)); + olderAdminClient = kafka.admin.AdminClient.create(properties); if (!olderAdminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty()) { throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " + "Make sure to stop all running application instances before running the reset tool."); @@ -156,8 +159,6 @@ public class StreamsResetter { .ofType(String.class) .defaultsTo("localhost:9092") .describedAs("urls"); - zookeeperOption = optionParser.accepts("zookeeper", "Zookeeper option is deprecated by bootstrap.servers, as the reset tool would no longer access Zookeeper directly."); - inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics. For these topics, the tool will reset the offset to the earliest available offset.") .withRequiredArg() .ofType(String.class) @@ -168,8 +169,15 @@ public class StreamsResetter { .ofType(String.class) .withValuesSeparatedBy(',') .describedAs("list"); + commandConfigOption = optionParser.accepts("config-file", "Property file containing configs to be passed to admin clients and embedded consumer.") + .withRequiredArg() + .ofType(String.class) + .describedAs("file name"); dryRunOption = optionParser.accepts("dry-run", "Display the actions that would be performed without executing the reset commands."); + // TODO: deprecated in 1.0; can be removed eventually + optionParser.accepts("zookeeper", "Zookeeper option is deprecated by bootstrap.servers, as the reset tool would no longer access Zookeeper directly."); + try { options = optionParser.parse(args); } catch (final OptionException e) { @@ -178,7 +186,7 @@ public class StreamsResetter { } } - private void maybeResetInputAndSeekToEndIntermediateTopicOffsets() { + private void maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig) { final List<String> inputTopics = options.valuesOf(inputTopicsOption); final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption); @@ -219,7 +227,6 @@ public class StreamsResetter { final Properties config = new Properties(); config.putAll(consumerConfig); - config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOption)); config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); @@ -274,7 +281,8 @@ public class StreamsResetter { System.out.println("Done."); } - private void maybeSeekToEnd(final KafkaConsumer<byte[], byte[]> client, final Set<TopicPartition> intermediateTopicPartitions) { + private void maybeSeekToEnd(final KafkaConsumer<byte[], byte[]> client, + final Set<TopicPartition> intermediateTopicPartitions) { final String groupId = options.valueOf(applicationIdOption); final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption); http://git-wip-us.apache.org/repos/asf/kafka/blob/f9865d52/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java new file mode 100644 index 0000000..6ab7141 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java @@ -0,0 +1,473 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import kafka.admin.AdminClient; +import kafka.tools.StreamsResetter; +import kafka.utils.MockTime; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +abstract class AbstractResetIntegrationTest { + private final static Logger log = LoggerFactory.getLogger(AbstractResetIntegrationTest.class); + + static final int NUM_BROKERS = 1; + + private static final String APP_ID = "cleanup-integration-test"; + private static final String INPUT_TOPIC = "inputTopic"; + private static final String OUTPUT_TOPIC = "outputTopic"; + private static final String OUTPUT_TOPIC_2 = "outputTopic2"; + private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun"; + private static final String INTERMEDIATE_USER_TOPIC = "userTopic"; + + private static final long STREAMS_CONSUMER_TIMEOUT = 2000L; + private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L; + private static final int TIMEOUT_MULTIPLIER = 5; + + private static AdminClient adminClient = null; + private static KafkaAdminClient kafkaAdminClient = null; + private static int testNo = 0; + + static EmbeddedKafkaCluster cluster; + static String bootstrapServers; + static MockTime mockTime; + + private final AbstractResetIntegrationTest.WaitUntilConsumerGroupGotClosed consumerGroupInactive = new AbstractResetIntegrationTest.WaitUntilConsumerGroupGotClosed(); + + private class WaitUntilConsumerGroupGotClosed implements TestCondition { + @Override + public boolean conditionMet() { + return adminClient.describeConsumerGroup(APP_ID, 0).consumers().get().isEmpty(); + } + } + + static void afterClassGlobalCleanup() { + if (adminClient != null) { + adminClient.close(); + adminClient = null; + } + + if (kafkaAdminClient != null) { + kafkaAdminClient.close(10, TimeUnit.SECONDS); + kafkaAdminClient = null; + } + } + + void beforePrepareTest() throws Exception { + ++testNo; + bootstrapServers = cluster.bootstrapServers(); + mockTime = cluster.time; + + Properties sslConfig = getClientSslConfig(); + if (sslConfig == null) { + sslConfig = new Properties(); + sslConfig.put("bootstrap.servers", bootstrapServers); + } + + if (adminClient == null) { + adminClient = AdminClient.create(sslConfig); + } + + if (kafkaAdminClient == null) { + kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(sslConfig); + } + + // busy wait until cluster (ie, ConsumerGroupCoordinator) is available + while (true) { + Thread.sleep(50); + + try { + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + } catch (final TimeoutException e) { + continue; + } + break; + } + + prepareInputData(); + } + + Properties getClientSslConfig() { + return null; + } + + void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception { + final Properties sslConfig = getClientSslConfig(); + final Properties streamsConfiguration = prepareTest(); + + final Properties resultTopicConsumerConfig = new Properties(); + if (sslConfig != null) { + resultTopicConsumerConfig.putAll(sslConfig); + } + resultTopicConsumerConfig.putAll(TestUtils.consumerConfig( + bootstrapServers, + APP_ID + "-standard-consumer-" + OUTPUT_TOPIC, + LongDeserializer.class, + LongDeserializer.class)); + + // RUN + KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); + final KafkaStreams handlerReference = streams; + streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + handlerReference.close(10, TimeUnit.SECONDS); + log.error("Streams application failed: ", e); + } + }); + streams.start(); + final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + resultTopicConsumerConfig, + OUTPUT_TOPIC, + 10); + + streams.close(); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, + "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); + + // RESET + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); + final KafkaStreams handlerReference2 = streams; + streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + handlerReference2.close(10, TimeUnit.SECONDS); + log.error("Streams application failed: ", e); + } + }); + streams.cleanUp(); + cleanGlobal(null, sslConfig); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + + assertInternalTopicsGotDeleted(null); + + // RE-RUN + streams.start(); + final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + resultTopicConsumerConfig, + OUTPUT_TOPIC, + 10); + streams.close(); + + assertThat(resultRerun, equalTo(result)); + + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + cleanGlobal(null, sslConfig); + } + + void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception { + cluster.createTopic(INTERMEDIATE_USER_TOPIC); + + final Properties sslConfig = getClientSslConfig(); + final Properties streamsConfiguration = prepareTest(); + + final Properties resultTopicConsumerConfig = new Properties(); + if (sslConfig != null) { + resultTopicConsumerConfig.putAll(sslConfig); + } + resultTopicConsumerConfig.putAll(TestUtils.consumerConfig( + bootstrapServers, + APP_ID + "-standard-consumer-" + OUTPUT_TOPIC, + LongDeserializer.class, + LongDeserializer.class)); + + // RUN + KafkaStreams streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), streamsConfiguration); + final KafkaStreams handlerReference = streams; + streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + handlerReference.close(10, TimeUnit.SECONDS); + log.error("Streams application failed: ", e); + } + }); + streams.start(); + final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + resultTopicConsumerConfig, + OUTPUT_TOPIC, + 10); + // receive only first values to make sure intermediate user topic is not consumed completely + // => required to test "seekToEnd" for intermediate topics + final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + resultTopicConsumerConfig, + OUTPUT_TOPIC_2, + 40 + ); + + streams.close(); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, + "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); + + // insert bad record to make sure intermediate user topic gets seekToEnd() + mockTime.sleep(1); + Properties producerConfig = sslConfig; + if (producerConfig == null) { + producerConfig = new Properties(); + } + producerConfig.putAll(TestUtils.producerConfig(bootstrapServers, LongSerializer.class, StringSerializer.class)); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + INTERMEDIATE_USER_TOPIC, + Collections.singleton(new KeyValue<>(-1L, "badRecord-ShouldBeSkipped")), + producerConfig, + mockTime.milliseconds()); + + // RESET + streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration); + final KafkaStreams handlerReference2 = streams; + streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + handlerReference2.close(10, TimeUnit.SECONDS); + log.error("Streams application failed: ", e); + } + }); + streams.cleanUp(); + cleanGlobal(INTERMEDIATE_USER_TOPIC, sslConfig); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + + assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC); + + // RE-RUN + streams.start(); + final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + resultTopicConsumerConfig, + OUTPUT_TOPIC, + 10); + final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + resultTopicConsumerConfig, + OUTPUT_TOPIC_2_RERUN, + 40 + ); + streams.close(); + + assertThat(resultRerun, equalTo(result)); + assertThat(resultRerun2, equalTo(result2)); + + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + cleanGlobal(INTERMEDIATE_USER_TOPIC, sslConfig); + + cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC); + } + + private Properties prepareTest() throws IOException { + Properties streamsConfiguration = getClientSslConfig(); + if (streamsConfiguration == null) { + streamsConfiguration = new Properties(); + } + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); + streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); + + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + + return streamsConfiguration; + } + + private void prepareInputData() throws Exception { + cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN); + + Properties producerConfig = getClientSslConfig(); + if (producerConfig == null) { + producerConfig = new Properties(); + } + producerConfig.putAll(TestUtils.producerConfig(bootstrapServers, LongSerializer.class, StringSerializer.class)); + + mockTime.sleep(10); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, mockTime.milliseconds()); + mockTime.sleep(10); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, mockTime.milliseconds()); + mockTime.sleep(10); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, mockTime.milliseconds()); + mockTime.sleep(10); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, mockTime.milliseconds()); + mockTime.sleep(10); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, mockTime.milliseconds()); + mockTime.sleep(10); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, mockTime.milliseconds()); + mockTime.sleep(1); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, mockTime.milliseconds()); + mockTime.sleep(1); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, mockTime.milliseconds()); + mockTime.sleep(1); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, mockTime.milliseconds()); + mockTime.sleep(1); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds()); + } + + private Topology setupTopologyWithIntermediateUserTopic(final String outputTopic2) { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream<Long, String> input = builder.stream(INPUT_TOPIC); + + // use map to trigger internal re-partitioning before groupByKey + input.map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() { + @Override + public KeyValue<Long, String> apply(final Long key, final String value) { + return new KeyValue<>(key, value); + } + }) + .groupByKey() + .count() + .toStream() + .to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long())); + + input.through(INTERMEDIATE_USER_TOPIC) + .groupByKey() + .windowedBy(TimeWindows.of(35).advanceBy(10)) + .count() + .toStream() + .map(new KeyValueMapper<Windowed<Long>, Long, KeyValue<Long, Long>>() { + @Override + public KeyValue<Long, Long> apply(final Windowed<Long> key, final Long value) { + return new KeyValue<>(key.window().start() + key.window().end(), value); + } + }) + .to(outputTopic2, Produced.with(Serdes.Long(), Serdes.Long())); + + return builder.build(); + } + + private Topology setupTopologyWithoutIntermediateUserTopic() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream<Long, String> input = builder.stream(INPUT_TOPIC); + + // use map to trigger internal re-partitioning before groupByKey + input.map(new KeyValueMapper<Long, String, KeyValue<Long, Long>>() { + @Override + public KeyValue<Long, Long> apply(final Long key, final String value) { + return new KeyValue<>(key, key); + } + }).to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long())); + + return builder.build(); + } + + private void cleanGlobal(final String intermediateUserTopic, final Properties sslConfig) throws Exception { + // leaving --zookeeper arg here to ensure tool works if users add it + final String[] parameters; + if (intermediateUserTopic != null) { + parameters = new String[]{ + "--application-id", APP_ID + testNo, + "--bootstrap-servers", bootstrapServers, + "--input-topics", INPUT_TOPIC, + "--intermediate-topics", INTERMEDIATE_USER_TOPIC, + "--zookeeper", "localhost:2181" + }; + } else { + if (sslConfig != null) { + final File configFile = TestUtils.tempFile(); + final BufferedWriter writer = new BufferedWriter(new FileWriter(configFile)); + writer.write(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG + "=SSL\n"); + writer.write(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) + "\n"); + writer.write(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) + "\n"); + writer.close(); + + parameters = new String[]{ + "--application-id", APP_ID + testNo, + "--bootstrap-servers", bootstrapServers, + "--input-topics", INPUT_TOPIC, + "--config-file", configFile.getAbsolutePath() + }; + } else { + parameters = new String[]{ + "--application-id", APP_ID + testNo, + "--bootstrap-servers", bootstrapServers, + "--input-topics", INPUT_TOPIC + }; + } + } + final Properties cleanUpConfig = new Properties(); + cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); + cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); + + final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); + Assert.assertEquals(0, exitCode); + } + + private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) throws Exception { + final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>(); + expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC); + if (intermediateUserTopic != null) { + expectedRemainingTopicsAfterCleanup.add(intermediateUserTopic); + } + expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC); + expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2); + expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN); + expectedRemainingTopicsAfterCleanup.add("__consumer_offsets"); + + final Set<String> allTopics = new HashSet<>(); + + final ListTopicsOptions listTopicsOptions = new ListTopicsOptions(); + listTopicsOptions.listInternal(true); + allTopics.addAll(kafkaAdminClient.listTopics(listTopicsOptions).names().get(30000, TimeUnit.MILLISECONDS)); + assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup)); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f9865d52/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index d76f5da..549f8f1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -16,57 +16,22 @@ */ package org.apache.kafka.streams.integration; -import org.apache.kafka.clients.admin.KafkaAdminClient; -import org.apache.kafka.clients.admin.ListTopicsOptions; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.Topology; +import kafka.server.KafkaConfig$; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.TestCondition; -import org.apache.kafka.test.TestUtils; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.IOException; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; import java.util.Properties; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import kafka.admin.AdminClient; -import kafka.server.KafkaConfig$; -import kafka.tools.StreamsResetter; -import kafka.utils.MockTime; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; /** * Tests local state store and global application cleanup. */ @Category({IntegrationTest.class}) -public class ResetIntegrationTest { - private static final int NUM_BROKERS = 1; +public class ResetIntegrationTest extends AbstractResetIntegrationTest { @ClassRule public static final EmbeddedKafkaCluster CLUSTER; @@ -80,327 +45,26 @@ public class ResetIntegrationTest { // otherwise, input records could fall into different windows for different runs depending on the initial mock time final long alignedTime = (System.currentTimeMillis() / 1000) * 1000; CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props, alignedTime); + cluster = CLUSTER; } - private static final String APP_ID = "cleanup-integration-test"; - private static final String INPUT_TOPIC = "inputTopic"; - private static final String OUTPUT_TOPIC = "outputTopic"; - private static final String OUTPUT_TOPIC_2 = "outputTopic2"; - private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun"; - private static final String INTERMEDIATE_USER_TOPIC = "userTopic"; - - private static final long STREAMS_CONSUMER_TIMEOUT = 2000L; - private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L; - private static final int TIMEOUT_MULTIPLIER = 5; - - private static int testNo = 0; - private static AdminClient adminClient = null; - private static KafkaAdminClient kafkaAdminClient = null; - - private final MockTime mockTime = CLUSTER.time; - private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed(); - @AfterClass public static void globalCleanup() { - if (adminClient != null) { - adminClient.close(); - adminClient = null; - } - - if (kafkaAdminClient != null) { - kafkaAdminClient.close(10, TimeUnit.SECONDS); - kafkaAdminClient = null; - } + afterClassGlobalCleanup(); } @Before - public void cleanup() throws Exception { - ++testNo; - - if (adminClient == null) { - adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers()); - } - - if (kafkaAdminClient == null) { - Properties props = new Properties(); - props.put("bootstrap.servers", CLUSTER.bootstrapServers()); - kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(props); - } - - // busy wait until cluster (ie, ConsumerGroupCoordinator) is available - while (true) { - Thread.sleep(50); - - try { - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); - } catch (final TimeoutException e) { - continue; - } - break; - } - - prepareInputData(); + public void before() throws Exception { + beforePrepareTest(); } @Test public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception { - CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC); - - final Properties streamsConfiguration = prepareTest(4); - final Properties resultTopicConsumerConfig = TestUtils.consumerConfig( - CLUSTER.bootstrapServers(), - APP_ID + "-standard-consumer-" + OUTPUT_TOPIC, - LongDeserializer.class, - LongDeserializer.class); - - // RUN - KafkaStreams streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), streamsConfiguration); - streams.start(); - final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - resultTopicConsumerConfig, - OUTPUT_TOPIC, - 10); - // receive only first values to make sure intermediate user topic is not consumed completely - // => required to test "seekToEnd" for intermediate topics - final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - resultTopicConsumerConfig, - OUTPUT_TOPIC_2, - 40 - ); - - streams.close(); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, - "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); - - // insert bad record to make sure intermediate user topic gets seekToEnd() - mockTime.sleep(1); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( - INTERMEDIATE_USER_TOPIC, - Collections.singleton(new KeyValue<>(-1L, "badRecord-ShouldBeSkipped")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class), mockTime.milliseconds()); - - // RESET - streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration); - streams.cleanUp(); - cleanGlobal(INTERMEDIATE_USER_TOPIC); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); - - assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC); - - // RE-RUN - streams.start(); - final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - resultTopicConsumerConfig, - OUTPUT_TOPIC, - 10); - final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - resultTopicConsumerConfig, - OUTPUT_TOPIC_2_RERUN, - 40 - ); - streams.close(); - - assertThat(resultRerun, equalTo(result)); - assertThat(resultRerun2, equalTo(result2)); - - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); - cleanGlobal(INTERMEDIATE_USER_TOPIC); - - CLUSTER.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC); + super.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(); } @Test public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception { - final Properties streamsConfiguration = prepareTest(1); - final Properties resultTopicConsumerConfig = TestUtils.consumerConfig( - CLUSTER.bootstrapServers(), - APP_ID + "-standard-consumer-" + OUTPUT_TOPIC, - LongDeserializer.class, - LongDeserializer.class); - - // RUN - KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); - streams.start(); - final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - resultTopicConsumerConfig, - OUTPUT_TOPIC, - 10); - - streams.close(); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, - "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); - - // RESET - streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); - streams.cleanUp(); - cleanGlobal(null); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); - - assertInternalTopicsGotDeleted(null); - - // RE-RUN - streams.start(); - final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - resultTopicConsumerConfig, - OUTPUT_TOPIC, - 10); - streams.close(); - - assertThat(resultRerun, equalTo(result)); - - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); - cleanGlobal(null); + super.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic(); } - - private Properties prepareTest(final int threads) throws IOException { - final Properties streamsConfiguration = new Properties(); - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo); - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); - streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); - streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); - - IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); - - return streamsConfiguration; - } - - private void prepareInputData() throws Exception { - CLUSTER.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN); - - final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class); - - mockTime.sleep(10); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, mockTime.milliseconds()); - mockTime.sleep(10); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, mockTime.milliseconds()); - mockTime.sleep(10); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, mockTime.milliseconds()); - mockTime.sleep(10); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, mockTime.milliseconds()); - mockTime.sleep(10); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, mockTime.milliseconds()); - mockTime.sleep(10); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, mockTime.milliseconds()); - mockTime.sleep(1); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, mockTime.milliseconds()); - mockTime.sleep(1); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, mockTime.milliseconds()); - mockTime.sleep(1); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, mockTime.milliseconds()); - mockTime.sleep(1); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds()); - } - - private Topology setupTopologyWithIntermediateUserTopic(final String outputTopic2) { - final StreamsBuilder builder = new StreamsBuilder(); - - final KStream<Long, String> input = builder.stream(INPUT_TOPIC); - - // use map to trigger internal re-partitioning before groupByKey - input.map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() { - @Override - public KeyValue<Long, String> apply(final Long key, final String value) { - return new KeyValue<>(key, value); - } - }) - .groupByKey() - .count("global-count") - .to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC); - - input.through(INTERMEDIATE_USER_TOPIC) - .groupByKey() - .count(TimeWindows.of(35).advanceBy(10), "count") - .toStream() - .map(new KeyValueMapper<Windowed<Long>, Long, KeyValue<Long, Long>>() { - @Override - public KeyValue<Long, Long> apply(final Windowed<Long> key, final Long value) { - return new KeyValue<>(key.window().start() + key.window().end(), value); - } - }) - .to(Serdes.Long(), Serdes.Long(), outputTopic2); - - return builder.build(); - } - - private Topology setupTopologyWithoutIntermediateUserTopic() { - final StreamsBuilder builder = new StreamsBuilder(); - - final KStream<Long, String> input = builder.stream(INPUT_TOPIC); - - // use map to trigger internal re-partitioning before groupByKey - input.map(new KeyValueMapper<Long, String, KeyValue<Long, Long>>() { - @Override - public KeyValue<Long, Long> apply(final Long key, final String value) { - return new KeyValue<>(key, key); - } - }).to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC); - - return builder.build(); - } - - private void cleanGlobal(final String intermediateUserTopic) { - // leaving --zookeeper arg here to ensure tool works if users add it - final String[] parameters; - if (intermediateUserTopic != null) { - parameters = new String[]{ - "--application-id", APP_ID + testNo, - "--bootstrap-servers", CLUSTER.bootstrapServers(), - "--input-topics", INPUT_TOPIC, - "--intermediate-topics", INTERMEDIATE_USER_TOPIC, - "--zookeeper", "localhost:2181" - }; - } else { - parameters = new String[]{ - "--application-id", APP_ID + testNo, - "--bootstrap-servers", CLUSTER.bootstrapServers(), - "--input-topics", INPUT_TOPIC - }; - } - final Properties cleanUpConfig = new Properties(); - cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); - cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); - - final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); - Assert.assertEquals(0, exitCode); - } - - private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) throws Exception { - final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>(); - expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC); - if (intermediateUserTopic != null) { - expectedRemainingTopicsAfterCleanup.add(intermediateUserTopic); - } - expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC); - expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2); - expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN); - expectedRemainingTopicsAfterCleanup.add("__consumer_offsets"); - - final Set<String> allTopics = new HashSet<>(); - - final ListTopicsOptions listTopicsOptions = new ListTopicsOptions(); - listTopicsOptions.listInternal(true); - allTopics.addAll(kafkaAdminClient.listTopics(listTopicsOptions).names().get(30000, TimeUnit.MILLISECONDS)); - assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup)); - - } - - private class WaitUntilConsumerGroupGotClosed implements TestCondition { - @Override - public boolean conditionMet() { - return adminClient.describeConsumerGroup(APP_ID + testNo, 0).consumers().get().isEmpty(); - } - } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/f9865d52/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java new file mode 100644 index 0000000..d018225 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import kafka.server.KafkaConfig$; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestSslUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.Map; +import java.util.Properties; + +/** + * Tests command line SSL setup for reset tool. + */ +@Category({IntegrationTest.class}) +public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest { + + private static Map<String, Object> sslConfig; + static { + try { + sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert"); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER; + static { + final Properties props = new Properties(); + // we double the value passed to `time.sleep` in each iteration in one of the map functions, so we disable + // expiration of connections by the brokers to avoid errors when `AdminClient` sends requests after potentially + // very long sleep times + props.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L); + props.put(KafkaConfig$.MODULE$.ListenersProp(), "SSL://localhost:9092"); + props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), "SSL"); + props.putAll(sslConfig); + // we align time to seconds to get clean window boundaries and thus ensure the same result for each run + // otherwise, input records could fall into different windows for different runs depending on the initial mock time + final long alignedTime = (System.currentTimeMillis() / 1000) * 1000; + CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props, alignedTime); + cluster = CLUSTER; + } + + @AfterClass + public static void globalCleanup() { + afterClassGlobalCleanup(); + } + + @Before + public void before() throws Exception { + beforePrepareTest(); + } + + Properties getClientSslConfig() { + final Properties props = new Properties(); + + props.put("bootstrap.servers", CLUSTER.bootstrapServers()); + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); + props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value()); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + + return props; + } + + @Test + public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception { + super.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic(); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/f9865d52/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java index 18c1995..1863484 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java @@ -29,7 +29,6 @@ import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.kafka.common.network.ListenerName; -import org.apache.kafka.common.protocol.SecurityProtocol; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,7 +109,9 @@ public class KafkaEmbedded { * You can use this to tell Kafka producers and consumers how to connect to this instance. */ public String brokerList() { - return kafka.config().hostName() + ":" + kafka.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)); + Object listenerConfig = effectiveConfig.get(KafkaConfig$.MODULE$.InterBrokerListenerNameProp()); + return kafka.config().hostName() + ":" + kafka.boundPort( + new ListenerName(listenerConfig != null ? listenerConfig.toString() : "PLAINTEXT")); }