KAFKA-4331: Kafka Streams resetter is slow because it joins the same group for each topic
- bug-fix follow up - Resetter fails if no intermediate topic is used because seekToEnd() commit ALL partitions to EOL Author: Matthias J. Sax <[email protected]> Reviewers: Michael G. Noll, Roger Hoover, Guozhang Wang Closes #2138 from mjsax/kafka-4331-streams-resetter-bugfix Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ecb51680 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ecb51680 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ecb51680 Branch: refs/heads/0.10.1 Commit: ecb51680a982120691becc37c302aa78135dcfdf Parents: 9d3003b Author: Matthias J. Sax <[email protected]> Authored: Wed Nov 23 17:31:34 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Nov 23 20:58:18 2016 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/tools/StreamsResetter.java | 8 +- .../integration/ResetIntegrationTest.java | 195 +++++++++++++++---- 2 files changed, 164 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ecb51680/core/src/main/scala/kafka/tools/StreamsResetter.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 1bb63f7..828eeef 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -218,8 +218,12 @@ public class StreamsResetter { } } - client.seekToBeginning(inputAndInternalTopicPartitions); - client.seekToEnd(intermediateTopicPartitions); + if (inputAndInternalTopicPartitions.size() > 0) { + client.seekToBeginning(inputAndInternalTopicPartitions); + } + if (intermediateTopicPartitions.size() > 0) { + client.seekToEnd(intermediateTopicPartitions); + } for (final TopicPartition p : partitions) { client.position(p); http://git-wip-us.apache.org/repos/asf/kafka/blob/ecb51680/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index d07970f..5f85536 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -21,6 +21,7 @@ import kafka.tools.StreamsResetter; import kafka.utils.MockTime; import kafka.utils.ZkUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; @@ -40,7 +41,7 @@ import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; -import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -75,10 +76,11 @@ public class ResetIntegrationTest { private static final long STREAMS_CONSUMER_TIMEOUT = 2000L; private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L; + private static final int TIMEOUT_MULTIPLYER = 5; private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed(); - - private AdminClient adminClient = null; + private static int testNo = 0; + private static AdminClient adminClient = null; @BeforeClass public static void startKafkaCluster() throws Exception { @@ -86,24 +88,48 @@ public class ResetIntegrationTest { CLUSTER.createTopic(OUTPUT_TOPIC); CLUSTER.createTopic(OUTPUT_TOPIC_2); CLUSTER.createTopic(OUTPUT_TOPIC_2_RERUN); - CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC); - } - - @Before - public void prepare() { - adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers()); } - @After - public void cleanup() { + @AfterClass + public static void globalCleanup() { if (adminClient != null) { adminClient.close(); adminClient = null; } } + @Before + public void cleanup() throws Exception { + ++testNo; + + if (adminClient == null) { + adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers()); + } + + // busy wait until cluster (ie, ConsumerGroupCoordinator) is available + while (true) { + Thread.sleep(50); + + try { + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT, + "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + } catch (GroupCoordinatorNotAvailableException e) { + continue; + } catch (IllegalArgumentException e) { + continue; + } + break; + } + + if (testNo == 1) { + prepareInputData(); + } + } + @Test - public void testReprocessingFromScratchAfterReset() throws Exception { + public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception { + CLUSTER.createTopic(INTERMEDIATE_USER_TOPIC); + final Properties streamsConfiguration = prepareTest(); final Properties resultTopicConsumerConfig = TestUtils.consumerConfig( CLUSTER.bootstrapServers(), @@ -111,11 +137,8 @@ public class ResetIntegrationTest { LongDeserializer.class, LongDeserializer.class); - prepareInputData(); - final KStreamBuilder builder = setupTopology(OUTPUT_TOPIC_2); - // RUN - KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + KafkaStreams streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), streamsConfiguration); streams.start(); final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( resultTopicConsumerConfig, @@ -131,17 +154,17 @@ public class ResetIntegrationTest { ).get(0); streams.close(); - TestUtils.waitForCondition(consumerGroupInactive, 5 * STREAMS_CONSUMER_TIMEOUT, - "Streams Application consumer group did not time out after " + (5 * STREAMS_CONSUMER_TIMEOUT) + " ms."); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT, + "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT) + " ms."); // RESET - streams = new KafkaStreams(setupTopology(OUTPUT_TOPIC_2_RERUN), streamsConfiguration); + streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration); streams.cleanUp(); - cleanGlobal(); - TestUtils.waitForCondition(consumerGroupInactive, 5 * CLEANUP_CONSUMER_TIMEOUT, - "Reset Tool consumer group did not time out after " + (5 * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + cleanGlobal(INTERMEDIATE_USER_TOPIC); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); - assertInternalTopicsGotDeleted(); + assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC); // RE-RUN streams.start(); @@ -159,11 +182,82 @@ public class ResetIntegrationTest { assertThat(resultRerun, equalTo(result)); assertThat(resultRerun2, equalTo(result2)); + + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + cleanGlobal(INTERMEDIATE_USER_TOPIC); + + CLUSTER.deleteTopic(INTERMEDIATE_USER_TOPIC); + Set<String> allTopics; + ZkUtils zkUtils = null; + try { + zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(), + 30000, + 30000, + JaasUtils.isZkSecurityEnabled()); + + do { + Utils.sleep(100); + allTopics = new HashSet<>(); + allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())); + } while (allTopics.contains(INTERMEDIATE_USER_TOPIC)); + } finally { + if (zkUtils != null) { + zkUtils.close(); + } + } + } + + @Test + public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception { + final Properties streamsConfiguration = prepareTest(); + final Properties resultTopicConsumerConfig = TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + APP_ID + "-standard-consumer-" + OUTPUT_TOPIC, + LongDeserializer.class, + LongDeserializer.class); + + // RUN + KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); + streams.start(); + final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + resultTopicConsumerConfig, + OUTPUT_TOPIC, + 10, + 60000); + + streams.close(); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT, + "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLYER * STREAMS_CONSUMER_TIMEOUT) + " ms."); + + // RESET + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); + streams.cleanUp(); + cleanGlobal(null); + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + + assertInternalTopicsGotDeleted(null); + + // RE-RUN + streams.start(); + final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( + resultTopicConsumerConfig, + OUTPUT_TOPIC, + 10, + 60000); + streams.close(); + + assertThat(resultRerun, equalTo(result)); + + TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT, + "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLYER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); + cleanGlobal(null); } private Properties prepareTest() throws Exception { final Properties streamsConfiguration = new Properties(); - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); @@ -205,7 +299,7 @@ public class ResetIntegrationTest { IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds()); } - private KStreamBuilder setupTopology(final String outputTopic2) { + private KStreamBuilder setupTopologyWithIntermediateUserTopic(final String outputTopic2) { final KStreamBuilder builder = new KStreamBuilder(); final KStream<Long, String> input = builder.stream(INPUT_TOPIC); @@ -251,27 +345,54 @@ public class ResetIntegrationTest { return builder; } - private void cleanGlobal() { - final Properties cleanUpConfig = new Properties(); - cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); - cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); + private KStreamBuilder setupTopologyWithoutIntermediateUserTopic() { + final KStreamBuilder builder = new KStreamBuilder(); + + final KStream<Long, String> input = builder.stream(INPUT_TOPIC); - final int exitCode = new StreamsResetter().run( - new String[]{ - "--application-id", APP_ID, + // use map to trigger internal re-partitioning before groupByKey + input.map(new KeyValueMapper<Long, String, KeyValue<Long, Long>>() { + @Override + public KeyValue<Long, Long> apply(final Long key, final String value) { + return new KeyValue<>(key, key); + } + }).to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC); + + return builder; + } + + private void cleanGlobal(final String intermediateUserTopic) { + final String[] parameters; + if (intermediateUserTopic != null) { + parameters = new String[]{ + "--application-id", APP_ID + testNo, "--bootstrap-server", CLUSTER.bootstrapServers(), "--zookeeper", CLUSTER.zKConnectString(), "--input-topics", INPUT_TOPIC, "--intermediate-topics", INTERMEDIATE_USER_TOPIC - }, - cleanUpConfig); + }; + } else { + parameters = new String[]{ + "--application-id", APP_ID + testNo, + "--bootstrap-server", CLUSTER.bootstrapServers(), + "--zookeeper", CLUSTER.zKConnectString(), + "--input-topics", INPUT_TOPIC + }; + } + final Properties cleanUpConfig = new Properties(); + cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); + cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); + + final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); Assert.assertEquals(0, exitCode); } - private void assertInternalTopicsGotDeleted() { + private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) { final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>(); expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC); - expectedRemainingTopicsAfterCleanup.add(INTERMEDIATE_USER_TOPIC); + if (intermediateUserTopic != null) { + expectedRemainingTopicsAfterCleanup.add(intermediateUserTopic); + } expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC); expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2); expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN); @@ -301,7 +422,7 @@ public class ResetIntegrationTest { private class WaitUntilConsumerGroupGotClosed implements TestCondition { @Override public boolean conditionMet() { - return adminClient.describeGroup(APP_ID).members().isEmpty(); + return adminClient.describeGroup(APP_ID + testNo).members().isEmpty(); } }
