This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 2d8049b KAFKA-5697: issue Consumer#wakeup during Streams shutdown 2d8049b is described below commit 2d8049b713f2b9982bd43e7340a0e0f302f30d6b Author: John Roesler <j...@confluent.io> AuthorDate: Fri May 4 09:02:50 2018 -0700 KAFKA-5697: issue Consumer#wakeup during Streams shutdown Wakeup consumers during shutdown to break them out of any internally blocking calls. Semantically, it should be fine to treat a WakeupException as "no work to do", which will then continue the threads' polling loops, leading them to discover that they are supposed to shut down, which they will do gracefully. The existing tests should be sufficient to verify no regressions. Author: John Roesler <j...@confluent.io> Reviewers: Bill Bejeck <bbej...@gmail.com>, Guozhang Wang <wangg...@gmail.com> Closes #4930 from vvcephei/streams-client-wakeup-on-shutdown minor javadoc updates --- .../org/apache/kafka/streams/KafkaStreams.java | 13 +++++- .../kafka/streams/errors/ShutdownException.java | 31 ++++++++++++++ .../streams/processor/internals/ConsumerUtils.java | 38 +++++++++++++++++ .../internals/GlobalStateManagerImpl.java | 35 ++++++++++++---- .../processor/internals/GlobalStreamThread.java | 46 ++++++++++++++++++-- .../processor/internals/StoreChangelogReader.java | 4 +- .../streams/processor/internals/StreamThread.java | 19 ++++++++- .../org/apache/kafka/streams/KafkaStreamsTest.java | 49 ++++++++++++++++------ .../integration/InternalTopicIntegrationTest.java | 6 ++- .../integration/utils/IntegrationTestUtils.java | 40 +++++++++++++++++- .../apache/kafka/streams/perf/SimpleBenchmark.java | 6 ++- .../internals/GlobalStateManagerImplTest.java | 37 ++++++++++++---- .../processor/internals/StandbyTaskTest.java | 5 ++- .../streams/tests/BrokerCompatibilityTest.java | 4 +- .../apache/kafka/streams/tests/EosTestDriver.java | 8 ++-- .../kafka/streams/tests/SmokeTestDriver.java | 4 +- .../kafka/streams/tools/StreamsResetterTest.java | 21 +++++----- .../apache/kafka/streams/TopologyTestDriver.java | 8 +++- 18 files changed, 315 insertions(+), 59 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 66a8934..eed12f1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -67,6 +67,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -377,12 +378,20 @@ public class KafkaStreams { } /** - * Get read-only handle on global metrics registry. + * Get read-only handle on global metrics registry, including streams client's own metrics plus + * its embedded consumer clients' metrics. * * @return Map of all metrics. */ + // TODO: we can add metrics for producer and admin client as well public Map<MetricName, ? extends Metric> metrics() { - return Collections.unmodifiableMap(metrics.metrics()); + final Map<MetricName, Metric> result = new LinkedHashMap<>(); + for (final StreamThread thread : threads) { + result.putAll(thread.consumerMetrics()); + } + if (globalStreamThread != null) result.putAll(globalStreamThread.consumerMetrics()); + result.putAll(metrics.metrics()); + return Collections.unmodifiableMap(result); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java b/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java new file mode 100644 index 0000000..d404642 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public class ShutdownException extends StreamsException { + public ShutdownException(final String message) { + super(message); + } + + public ShutdownException(final String message, final Throwable throwable) { + super(message, throwable); + } + + public ShutdownException(final Throwable throwable) { + super(throwable); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java new file mode 100644 index 0000000..8b91257 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; + +import java.util.Collections; +import java.util.List; + +public final class ConsumerUtils { + private ConsumerUtils() {} + + public static <K, V> ConsumerRecords<K, V> poll(final Consumer<K, V> consumer, final long maxDurationMs) { + try { + return consumer.poll(maxDurationMs); + } catch (final WakeupException e) { + return new ConsumerRecords<>(Collections.<TopicPartition, List<ConsumerRecord<K, V>>>emptyMap()); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index e8ec5e9..017f2da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -23,12 +23,14 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.errors.ShutdownException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.BatchingStateRestoreCallback; import org.apache.kafka.streams.processor.StateRestoreCallback; @@ -46,6 +48,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll; + /** * This class is responsible for the initialization, restoration, closing, flushing etc * of Global State Stores. There is only ever 1 instance of this class per Application Instance. @@ -60,13 +64,15 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob private InternalProcessorContext processorContext; private final int retries; private final long retryBackoffMs; + private final IsRunning isRunning; public GlobalStateManagerImpl(final LogContext logContext, final ProcessorTopology topology, final Consumer<byte[], byte[]> globalConsumer, final StateDirectory stateDirectory, final StateRestoreListener stateRestoreListener, - final StreamsConfig config) { + final StreamsConfig config, + final IsRunning isRunning) { super(stateDirectory.globalStateDir()); this.log = logContext.logger(GlobalStateManagerImpl.class); @@ -76,6 +82,11 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob this.stateRestoreListener = stateRestoreListener; this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG); this.retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG); + this.isRunning = isRunning; + } + + public interface IsRunning { + boolean check(); } @Override @@ -200,6 +211,13 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob try { partitionInfos = globalConsumer.partitionsFor(sourceTopic); break; + } catch (final WakeupException wakeupException) { + if (isRunning.check()) { + // note we may decide later that this condition is ok and just let the retry loop continue + throw new IllegalStateException("Got unexpected WakeupException during initialization.", wakeupException); + } else { + throw new ShutdownException("Shutting down from fetching partitions"); + } } catch (final TimeoutException retryableException) { if (++attempts > retries) { log.error("Failed to get partitions for topic {} after {} retry attempts due to timeout. " + @@ -250,19 +268,20 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); - BatchingStateRestoreCallback - stateRestoreAdapter = - (BatchingStateRestoreCallback) ((stateRestoreCallback instanceof - BatchingStateRestoreCallback) - ? stateRestoreCallback - : new WrappedBatchingStateRestoreCallback(stateRestoreCallback)); + final BatchingStateRestoreCallback stateRestoreAdapter = + (BatchingStateRestoreCallback) ((stateRestoreCallback instanceof BatchingStateRestoreCallback) + ? stateRestoreCallback + : new WrappedBatchingStateRestoreCallback(stateRestoreCallback)); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; while (offset < highWatermark) { + if (!isRunning.check()) { + throw new ShutdownException("Streams is not running (any more)"); + } try { - final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(100); + final ConsumerRecords<byte[], byte[]> records = poll(globalConsumer, 100); final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>(); for (ConsumerRecord<byte[], byte[]> record : records) { if (record.key() != null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java index 1c34897..4b6bfb1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -20,6 +20,8 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.InvalidOffsetException; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogContext; @@ -27,6 +29,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LockException; +import org.apache.kafka.streams.errors.ShutdownException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -35,10 +38,12 @@ import org.slf4j.Logger; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; +import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll; import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD; import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.PENDING_SHUTDOWN; @@ -103,6 +108,10 @@ public class GlobalStreamThread extends Thread { return equals(RUNNING); } + public boolean isStarting() { + return equals(CREATED); + } + @Override public boolean isValidTransition(final ThreadStateTransitionValidator newState) { final State tmpState = (State) newState; @@ -170,6 +179,12 @@ public class GlobalStreamThread extends Thread { } } + private boolean stillStarting() { + synchronized (stateLock) { + return state.isStarting(); + } + } + public GlobalStreamThread(final ProcessorTopology topology, final StreamsConfig config, final Consumer<byte[], byte[]> globalConsumer, @@ -232,7 +247,7 @@ public class GlobalStreamThread extends Thread { void pollAndUpdate() { try { - final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollMs); + final ConsumerRecords<byte[], byte[]> received = poll(globalConsumer, pollMs); for (final ConsumerRecord<byte[], byte[]> record : received) { stateMaintainer.update(record); } @@ -263,7 +278,19 @@ public class GlobalStreamThread extends Thread { @Override public void run() { - final StateConsumer stateConsumer = initialize(); + final StateConsumer stateConsumer; + try { + stateConsumer = initialize(); + } catch (final ShutdownException e) { + log.info("Shutting down from initialization"); + // Almost certainly, we arrived here because the state is already PENDING_SHUTDOWN, but it's harmless to + // just make sure + setState(State.PENDING_SHUTDOWN); + setState(State.DEAD); + streamsMetrics.removeAllThreadLevelSensors(); + log.info("Shutdown complete"); + return; + } if (stateConsumer == null) { // during initialization, the caller thread would wait for the state consumer @@ -275,6 +302,7 @@ public class GlobalStreamThread extends Thread { setState(State.DEAD); log.warn("Error happened during initialization of the global state store; this thread has shutdown"); + streamsMetrics.removeAllThreadLevelSensors(); return; } @@ -314,7 +342,14 @@ public class GlobalStreamThread extends Thread { globalConsumer, stateDirectory, stateRestoreListener, - config); + config, + new GlobalStateManagerImpl.IsRunning() { + @Override + public boolean check() { + return stillStarting() || stillRunning(); + } + } + ); final GlobalProcessorContextImpl globalProcessorContext = new GlobalProcessorContextImpl( config, @@ -367,5 +402,10 @@ public class GlobalStreamThread extends Thread { // one could call shutdown() multiple times, so ignore subsequent calls // if already shutting down or dead setState(PENDING_SHUTDOWN); + globalConsumer.wakeup(); + } + + public Map<MetricName, Metric> consumerMetrics() { + return Collections.unmodifiableMap(globalConsumer.metrics()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 5fcba76..9b67fc4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -40,6 +40,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll; + public class StoreChangelogReader implements ChangelogReader { private final Logger log; @@ -81,7 +83,7 @@ public class StoreChangelogReader implements ChangelogReader { final Set<TopicPartition> restoringPartitions = new HashSet<>(needsRestoring.keySet()); try { - final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(10); + final ConsumerRecords<byte[], byte[]> allRecords = poll(restoreConsumer, 10); for (final TopicPartition partition : restoringPartitions) { restorePartition(allRecords, partition, active.restoringTaskFor(partition)); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index cc5a07f..32993f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -26,6 +26,8 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; @@ -54,6 +56,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -62,6 +65,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.singleton; +import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll; public class StreamThread extends Thread { @@ -824,7 +828,7 @@ public class StreamThread extends Thread { ConsumerRecords<byte[], byte[]> records = null; try { - records = consumer.poll(pollTimeMs); + records = poll(consumer, pollTimeMs); } catch (final InvalidOffsetException e) { resetInvalidOffsets(e); } @@ -1051,7 +1055,7 @@ public class StreamThread extends Thread { } try { - final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0); + final ConsumerRecords<byte[], byte[]> records = poll(restoreConsumer, 0); if (!records.isEmpty()) { for (final TopicPartition partition : records.partitions()) { @@ -1116,6 +1120,8 @@ public class StreamThread extends Thread { public void shutdown() { log.info("Informed to shut down"); final State oldState = setState(State.PENDING_SHUTDOWN); + consumer.wakeup(); + restoreConsumer.wakeup(); if (oldState == State.CREATED) { // The thread may not have been started. Take responsibility for shutting down completeShutdown(true); @@ -1210,4 +1216,13 @@ public class StreamThread extends Thread { Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() { return standbyRecords; } + + public Map<MetricName, Metric> consumerMetrics() { + final Map<MetricName, ? extends Metric> consumerMetrics = consumer.metrics(); + final Map<MetricName, ? extends Metric> restoreConsumerMetrics = restoreConsumer.metrics(); + final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>(); + result.putAll(consumerMetrics); + result.putAll(restoreConsumerMetrics); + return result; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 1dc8602..1c33f64 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -18,8 +18,6 @@ package org.apache.kafka.streams; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.config.ConfigException; @@ -43,6 +41,7 @@ import org.apache.kafka.test.TestUtils; import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -232,7 +231,43 @@ public class KafkaStreamsTest { streams.close(); assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING); + } + + @Ignore // this test cannot pass as long as GST blocks KS.start() + @Test + public void testGlobalThreadCloseWithoutConnectingToBroker() { + final Properties props = new Properties(); + props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1"); + props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS); + final StreamsBuilder builder = new StreamsBuilder(); + // make sure we have the global state thread running too + builder.globalTable("anyTopic"); + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + streams.start(); + streams.close(); + // There's nothing to assert... We're testing that this operation actually completes. + } + + @Test + public void testLocalThreadCloseWithoutConnectingToBroker() { + final Properties props = new Properties(); + props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1"); + props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS); + + final StreamsBuilder builder = new StreamsBuilder(); + // make sure we have the global state thread running too + builder.table("anyTopic"); + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + streams.start(); + streams.close(); + // There's nothing to assert... We're testing that this operation actually completes. } @@ -328,16 +363,6 @@ public class KafkaStreamsTest { } @Test - public void testNumberDefaultMetrics() { - props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1"); - final StreamsBuilder builder = new StreamsBuilder(); - final KafkaStreams streams = new KafkaStreams(builder.build(), props); - final Map<MetricName, ? extends Metric> metrics = streams.metrics(); - // all 22 default StreamThread metrics + 1 metric that keeps track of number of metrics - assertEquals(23, metrics.size()); - } - - @Test public void testIllegalMetricsConfig() { props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig"); final StreamsBuilder builder = new StreamsBuilder(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index 2d9c8c4..e8ee507 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -55,6 +55,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -118,8 +119,9 @@ public class InternalTopicIntegrationTest { final Map<String, Properties> topicConfigs = scala.collection.JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs()); for (Map.Entry<String, Properties> topicConfig : topicConfigs.entrySet()) { - if (topicConfig.getKey().equals(changelog)) + if (topicConfig.getKey().equals(changelog)) { return topicConfig.getValue(); + } } return new Properties(); @@ -157,6 +159,7 @@ public class InternalTopicIntegrationTest { // // Step 3: Verify the state changelog topics are compact // + waitForCompletion(streams, 2, 5000); streams.close(); final Properties changelogProps = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "Counts")); @@ -201,6 +204,7 @@ public class InternalTopicIntegrationTest { // // Step 3: Verify the state changelog topics are compact // + waitForCompletion(streams, 2, 5000); streams.close(); final Properties properties = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "CountWindows")); final List<String> policies = Arrays.asList(properties.getProperty(LogConfig.CleanupPolicyProp()).split(",")); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index e8cd59e..ec97f12 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -28,10 +28,12 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.requests.UpdateMetadataRequest; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.test.TestCondition; @@ -49,6 +51,8 @@ import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll; + /** * Utility functions to make integration testing more convenient. */ @@ -158,6 +162,40 @@ public class IntegrationTestUtils { produceKeyValuesSynchronously(topic, keyedRecords, producerConfig, time, enableTransactions); } + /** + * Wait for streams to "finish", based on the consumer lag metric. + * + * Caveats: + * - Inputs must be finite, fully loaded, and flushed before this method is called + * - expectedPartitions is the total number of partitions to watch the lag on, including both input and internal. + * It's somewhat ok to get this wrong, as the main failure case would be an immediate return due to the clients + * not being initialized, which you can avoid with any non-zero value. But it's probably better to get it right ;) + */ + public static void waitForCompletion(final KafkaStreams streams, + final int expectedPartitions, + final int timeoutMilliseconds) { + final long start = System.currentTimeMillis(); + while (true) { + int lagMetrics = 0; + double totalLag = 0.0; + for (final Metric metric : streams.metrics().values()) { + if (metric.metricName().name().equals("records-lag")) { + lagMetrics++; + totalLag += ((Number) metric.metricValue()).doubleValue(); + } + } + if (lagMetrics >= expectedPartitions && totalLag == 0.0) { + return; + } + if (System.currentTimeMillis() - start >= timeoutMilliseconds) { + throw new RuntimeException(String.format( + "Timed out waiting for completion. lagMetrics=[%s/%s] totalLag=[%s]", + lagMetrics, expectedPartitions, totalLag + )); + } + } + } + public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig, final String topic, final int expectedNumRecords) throws InterruptedException { @@ -352,7 +390,7 @@ public class IntegrationTestUtils { while (totalPollTimeMs < waitTime && continueConsuming(consumedValues.size(), maxMessages)) { totalPollTimeMs += pollIntervalMs; - final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs); + final ConsumerRecords<K, V> records = poll(consumer, pollIntervalMs); for (final ConsumerRecord<K, V> record : records) { consumedValues.add(new KeyValue<>(record.key(), record.value())); diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index d956f27..9cd6857 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -62,6 +62,8 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll; + /** * Class that provides support for a series of benchmarks. It is usually driven by * tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py. @@ -334,7 +336,7 @@ public class SimpleBenchmark { consumer.seekToBeginning(partitions); while (true) { - final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS); + final ConsumerRecords<Integer, byte[]> records = poll(consumer, POLL_MS); if (records.isEmpty()) { if (processedRecords == numRecords) { break; @@ -372,7 +374,7 @@ public class SimpleBenchmark { consumer.seekToBeginning(partitions); while (true) { - final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS); + final ConsumerRecords<Integer, byte[]> records = poll(consumer, POLL_MS); if (records.isEmpty()) { if (processedRecords == numRecords) { break; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index d19e63e..7935271 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -79,6 +79,14 @@ public class GlobalStateManagerImplTest { private final TopicPartition t2 = new TopicPartition("t2", 1); private final TopicPartition t3 = new TopicPartition("t3", 1); private final TopicPartition t4 = new TopicPartition("t4", 1); + + private final GlobalStateManagerImpl.IsRunning alwaysRunning = new GlobalStateManagerImpl.IsRunning() { + @Override + public boolean check() { + return true; + } + }; + private GlobalStateManagerImpl stateManager; private StateDirectory stateDirectory; private StreamsConfig streamsConfig; @@ -119,7 +127,8 @@ public class GlobalStateManagerImplTest { consumer, stateDirectory, stateRestoreListener, - streamsConfig); + streamsConfig, + alwaysRunning); processorContext = new InternalMockProcessorContext(stateDirectory.globalStateDir(), streamsConfig); stateManager.setGlobalProcessorContext(processorContext); checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME); @@ -496,12 +505,20 @@ public class GlobalStateManagerImplTest { @Test public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() { - stateManager = new GlobalStateManagerImpl(new LogContext("mock"), topology, consumer, new StateDirectory(streamsConfig, time) { - @Override - public boolean lockGlobalState() throws IOException { - throw new IOException("KABOOM!"); - } - }, stateRestoreListener, streamsConfig); + stateManager = new GlobalStateManagerImpl( + new LogContext("mock"), + topology, + consumer, + new StateDirectory(streamsConfig, time) { + @Override + public boolean lockGlobalState() throws IOException { + throw new IOException("KABOOM!"); + } + }, + stateRestoreListener, + streamsConfig, + alwaysRunning + ); try { stateManager.initialize(); @@ -538,7 +555,8 @@ public class GlobalStateManagerImplTest { consumer, stateDirectory, stateRestoreListener, - streamsConfig); + streamsConfig, + alwaysRunning); } catch (final StreamsException expected) { assertEquals(numberOfCalls.get(), retries); } @@ -571,7 +589,8 @@ public class GlobalStateManagerImplTest { consumer, stateDirectory, stateRestoreListener, - streamsConfig); + streamsConfig, + alwaysRunning); } catch (final StreamsException expected) { assertEquals(numberOfCalls.get(), retries); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 605ab33..54ea1ce 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -63,6 +63,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.singleton; +import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; @@ -189,7 +190,7 @@ public class StandbyTaskTest { } restoreStateConsumer.seekToBeginning(partition); - task.update(partition2, restoreStateConsumer.poll(100).records(partition2)); + task.update(partition2, poll(restoreStateConsumer, 100).records(partition2)); StandbyContextImpl context = (StandbyContextImpl) task.context(); MockStateStore store1 = (MockStateStore) context.getStateMgr().getStore(storeName1); @@ -246,7 +247,7 @@ public class StandbyTaskTest { } // The commit offset is at 0L. Records should not be processed - List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, restoreStateConsumer.poll(100).records(globalTopicPartition)); + List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, poll(restoreStateConsumer, 100).records(globalTopicPartition)); assertEquals(5, remaining.size()); committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(10L)); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java index e897088..c420d98 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java @@ -42,6 +42,8 @@ import java.util.Locale; import java.util.Properties; import java.util.concurrent.TimeUnit; +import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll; + public class BrokerCompatibilityTest { private static final String SOURCE_TOPIC = "brokerCompatibilitySourceTopic"; @@ -153,7 +155,7 @@ public class BrokerCompatibilityTest { consumer.subscribe(Collections.singletonList(SINK_TOPIC)); while (true) { - final ConsumerRecords<String, String> records = consumer.poll(100); + final ConsumerRecords<String, String> records = poll(consumer, 100); for (final ConsumerRecord<String, String> record : records) { if (record.key().equals("key") && record.value().equals("1")) { return; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java index 752cdd6..513d592 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java @@ -52,6 +52,8 @@ import java.util.Properties; import java.util.Random; import java.util.Set; +import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll; + public class EosTestDriver extends SmokeTestUtil { private static final int MAX_NUMBER_OF_KEYS = 100; @@ -254,7 +256,7 @@ public class EosTestDriver extends SmokeTestUtil { topics.add("repartition"); } consumer.subscribe(topics); - consumer.poll(0); + poll(consumer, 0); final Set<TopicPartition> partitions = new HashSet<>(); for (final String topic : topics) { @@ -284,7 +286,7 @@ public class EosTestDriver extends SmokeTestUtil { long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; boolean allRecordsReceived = false; while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) { - final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(100); + final ConsumerRecords<byte[], byte[]> receivedRecords = poll(consumer, 100); for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) { maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; @@ -591,7 +593,7 @@ public class EosTestDriver extends SmokeTestUtil { long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) { - final ConsumerRecords<byte[], byte[]> records = consumer.poll(100); + final ConsumerRecords<byte[], byte[]> records = poll(consumer, 100); if (records.isEmpty()) { System.out.println("No data received."); for (final TopicPartition tp : partitions) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 50330a0..74eac3f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -47,6 +47,8 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; +import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll; + public class SmokeTestDriver extends SmokeTestUtil { public static final int MAX_RECORD_EMPTY_RETRIES = 60; @@ -289,7 +291,7 @@ public class SmokeTestDriver extends SmokeTestUtil { int retry = 0; final long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { - ConsumerRecords<byte[], byte[]> records = consumer.poll(500); + ConsumerRecords<byte[], byte[]> records = poll(consumer, 500); if (records.isEmpty() && recordsProcessed >= recordsGenerated) { if (verifyMin(min, allData, false) && verifyMax(max, allData, false) diff --git a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java index ad19f32..49f699e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java @@ -40,6 +40,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -74,7 +75,7 @@ public class StreamsResetterTest { streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L); - final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); + final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500); assertEquals(3, records.count()); } @@ -90,7 +91,7 @@ public class StreamsResetterTest { streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L); - final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); + final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500); assertEquals(2, records.count()); } @@ -106,7 +107,7 @@ public class StreamsResetterTest { streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 4L); - final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); + final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500); assertEquals(2, records.count()); } @@ -122,7 +123,7 @@ public class StreamsResetterTest { streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 3L); - final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); + final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500); assertEquals(2, records.count()); } @@ -138,7 +139,7 @@ public class StreamsResetterTest { streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, -3L); - final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); + final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500); assertEquals(5, records.count()); } @@ -154,7 +155,7 @@ public class StreamsResetterTest { streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 5L); - final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); + final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500); assertEquals(2, records.count()); } @@ -172,7 +173,7 @@ public class StreamsResetterTest { topicPartitionsAndOffset.put(topicPartition, 3L); streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset); - final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); + final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500); assertEquals(2, records.count()); } @@ -190,7 +191,7 @@ public class StreamsResetterTest { topicPartitionsAndOffset.put(topicPartition, 1L); streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset); - final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); + final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500); assertEquals(2, records.count()); } @@ -208,7 +209,7 @@ public class StreamsResetterTest { topicPartitionsAndOffset.put(topicPartition, 5L); streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset); - final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); + final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500); assertEquals(2, records.count()); } @@ -226,7 +227,7 @@ public class StreamsResetterTest { intermediateTopicPartitions.add(topicPartition); streamsResetter.maybeSeekToEnd("g1", consumer, intermediateTopicPartitions); - final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); + final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500); assertEquals(2, records.count()); } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 7343d59..f6bbc4b 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -309,7 +309,13 @@ public class TopologyTestDriver implements Closeable { consumer, stateDirectory, stateRestoreListener, - streamsConfig); + streamsConfig, + new GlobalStateManagerImpl.IsRunning() { + @Override + public boolean check() { + return true; + } + }); final GlobalProcessorContextImpl globalProcessorContext = new GlobalProcessorContextImpl(streamsConfig, globalStateManager, streamsMetrics, cache); -- To stop receiving notification emails like this one, please contact guozh...@apache.org.