This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2d8049b  KAFKA-5697: issue Consumer#wakeup during Streams shutdown
2d8049b is described below

commit 2d8049b713f2b9982bd43e7340a0e0f302f30d6b
Author: John Roesler <j...@confluent.io>
AuthorDate: Fri May 4 09:02:50 2018 -0700

    KAFKA-5697: issue Consumer#wakeup during Streams shutdown
    
    Wakeup consumers during shutdown to break them out of any internally 
blocking calls.
    
    Semantically, it should be fine to treat a WakeupException as "no work to 
do", which will then continue the threads' polling loops, leading them to 
discover that they are supposed to shut down, which they will do gracefully.
    
    The existing tests should be sufficient to verify no regressions.
    
    Author: John Roesler <j...@confluent.io>
    
    Reviewers: Bill Bejeck <bbej...@gmail.com>, Guozhang Wang 
<wangg...@gmail.com>
    
    Closes #4930 from vvcephei/streams-client-wakeup-on-shutdown
    
    minor javadoc updates
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 13 +++++-
 .../kafka/streams/errors/ShutdownException.java    | 31 ++++++++++++++
 .../streams/processor/internals/ConsumerUtils.java | 38 +++++++++++++++++
 .../internals/GlobalStateManagerImpl.java          | 35 ++++++++++++----
 .../processor/internals/GlobalStreamThread.java    | 46 ++++++++++++++++++--
 .../processor/internals/StoreChangelogReader.java  |  4 +-
 .../streams/processor/internals/StreamThread.java  | 19 ++++++++-
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 49 ++++++++++++++++------
 .../integration/InternalTopicIntegrationTest.java  |  6 ++-
 .../integration/utils/IntegrationTestUtils.java    | 40 +++++++++++++++++-
 .../apache/kafka/streams/perf/SimpleBenchmark.java |  6 ++-
 .../internals/GlobalStateManagerImplTest.java      | 37 ++++++++++++----
 .../processor/internals/StandbyTaskTest.java       |  5 ++-
 .../streams/tests/BrokerCompatibilityTest.java     |  4 +-
 .../apache/kafka/streams/tests/EosTestDriver.java  |  8 ++--
 .../kafka/streams/tests/SmokeTestDriver.java       |  4 +-
 .../kafka/streams/tools/StreamsResetterTest.java   | 21 +++++-----
 .../apache/kafka/streams/TopologyTestDriver.java   |  8 +++-
 18 files changed, 315 insertions(+), 59 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 66a8934..eed12f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -67,6 +67,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -377,12 +378,20 @@ public class KafkaStreams {
     }
 
     /**
-     * Get read-only handle on global metrics registry.
+     * Get read-only handle on global metrics registry, including streams 
client's own metrics plus
+     * its embedded consumer clients' metrics.
      *
      * @return Map of all metrics.
      */
+    // TODO: we can add metrics for producer and admin client as well
     public Map<MetricName, ? extends Metric> metrics() {
-        return Collections.unmodifiableMap(metrics.metrics());
+        final Map<MetricName, Metric> result = new LinkedHashMap<>();
+        for (final StreamThread thread : threads) {
+            result.putAll(thread.consumerMetrics());
+        }
+        if (globalStreamThread != null) 
result.putAll(globalStreamThread.consumerMetrics());
+        result.putAll(metrics.metrics());
+        return Collections.unmodifiableMap(result);
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java 
b/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java
new file mode 100644
index 0000000..d404642
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public class ShutdownException extends StreamsException {
+    public ShutdownException(final String message) {
+        super(message);
+    }
+
+    public ShutdownException(final String message, final Throwable throwable) {
+        super(message, throwable);
+    }
+
+    public ShutdownException(final Throwable throwable) {
+        super(throwable);
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java
new file mode 100644
index 0000000..8b91257
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import java.util.Collections;
+import java.util.List;
+
+public final class ConsumerUtils {
+    private ConsumerUtils() {}
+
+    public static <K, V> ConsumerRecords<K, V> poll(final Consumer<K, V> 
consumer, final long maxDurationMs) {
+        try {
+            return consumer.poll(maxDurationMs);
+        } catch (final WakeupException e) {
+            return new ConsumerRecords<>(Collections.<TopicPartition, 
List<ConsumerRecord<K, V>>>emptyMap());
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index e8ec5e9..017f2da 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -23,12 +23,14 @@ import 
org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.errors.ShutdownException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -46,6 +48,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
+
 /**
  * This class is responsible for the initialization, restoration, closing, 
flushing etc
  * of Global State Stores. There is only ever 1 instance of this class per 
Application Instance.
@@ -60,13 +64,15 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
     private InternalProcessorContext processorContext;
     private final int retries;
     private final long retryBackoffMs;
+    private final IsRunning isRunning;
 
     public GlobalStateManagerImpl(final LogContext logContext,
                                   final ProcessorTopology topology,
                                   final Consumer<byte[], byte[]> 
globalConsumer,
                                   final StateDirectory stateDirectory,
                                   final StateRestoreListener 
stateRestoreListener,
-                                  final StreamsConfig config) {
+                                  final StreamsConfig config,
+                                  final IsRunning isRunning) {
         super(stateDirectory.globalStateDir());
 
         this.log = logContext.logger(GlobalStateManagerImpl.class);
@@ -76,6 +82,11 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
         this.stateRestoreListener = stateRestoreListener;
         this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG);
         this.retryBackoffMs = 
config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
+        this.isRunning = isRunning;
+    }
+
+    public interface IsRunning {
+        boolean check();
     }
 
     @Override
@@ -200,6 +211,13 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
             try {
                 partitionInfos = globalConsumer.partitionsFor(sourceTopic);
                 break;
+            } catch (final WakeupException wakeupException) {
+                if (isRunning.check()) {
+                    // note we may decide later that this condition is ok and 
just let the retry loop continue
+                    throw new IllegalStateException("Got unexpected 
WakeupException during initialization.", wakeupException);
+                } else {
+                    throw new ShutdownException("Shutting down from fetching 
partitions");
+                }
             } catch (final TimeoutException retryableException) {
                 if (++attempts > retries) {
                     log.error("Failed to get partitions for topic {} after {} 
retry attempts due to timeout. " +
@@ -250,19 +268,20 @@ public class GlobalStateManagerImpl extends 
AbstractStateManager implements Glob
 
             long offset = globalConsumer.position(topicPartition);
             final Long highWatermark = highWatermarks.get(topicPartition);
-            BatchingStateRestoreCallback
-                stateRestoreAdapter =
-                (BatchingStateRestoreCallback) ((stateRestoreCallback 
instanceof
-                                                     
BatchingStateRestoreCallback)
-                                                ? stateRestoreCallback
-                                                : new 
WrappedBatchingStateRestoreCallback(stateRestoreCallback));
+            final BatchingStateRestoreCallback stateRestoreAdapter =
+                (BatchingStateRestoreCallback) ((stateRestoreCallback 
instanceof BatchingStateRestoreCallback)
+                    ? stateRestoreCallback
+                    : new 
WrappedBatchingStateRestoreCallback(stateRestoreCallback));
 
             stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
             long restoreCount = 0L;
 
             while (offset < highWatermark) {
+                if (!isRunning.check()) {
+                    throw new ShutdownException("Streams is not running (any 
more)");
+                }
                 try {
-                    final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(100);
+                    final ConsumerRecords<byte[], byte[]> records = 
poll(globalConsumer, 100);
                     final List<KeyValue<byte[], byte[]>> restoreRecords = new 
ArrayList<>();
                     for (ConsumerRecord<byte[], byte[]> record : records) {
                         if (record.key() != null) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 1c34897..4b6bfb1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -20,6 +20,8 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.LogContext;
@@ -27,6 +29,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
+import org.apache.kafka.streams.errors.ShutdownException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -35,10 +38,12 @@ import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
 import static 
org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
 import static 
org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.PENDING_SHUTDOWN;
 
@@ -103,6 +108,10 @@ public class GlobalStreamThread extends Thread {
             return equals(RUNNING);
         }
 
+        public boolean isStarting() {
+            return equals(CREATED);
+        }
+
         @Override
         public boolean isValidTransition(final ThreadStateTransitionValidator 
newState) {
             final State tmpState = (State) newState;
@@ -170,6 +179,12 @@ public class GlobalStreamThread extends Thread {
         }
     }
 
+    private boolean stillStarting() {
+        synchronized (stateLock) {
+            return state.isStarting();
+        }
+    }
+
     public GlobalStreamThread(final ProcessorTopology topology,
                               final StreamsConfig config,
                               final Consumer<byte[], byte[]> globalConsumer,
@@ -232,7 +247,7 @@ public class GlobalStreamThread extends Thread {
 
         void pollAndUpdate() {
             try {
-                final ConsumerRecords<byte[], byte[]> received = 
globalConsumer.poll(pollMs);
+                final ConsumerRecords<byte[], byte[]> received = 
poll(globalConsumer, pollMs);
                 for (final ConsumerRecord<byte[], byte[]> record : received) {
                     stateMaintainer.update(record);
                 }
@@ -263,7 +278,19 @@ public class GlobalStreamThread extends Thread {
 
     @Override
     public void run() {
-        final StateConsumer stateConsumer = initialize();
+        final StateConsumer stateConsumer;
+        try {
+            stateConsumer = initialize();
+        } catch (final ShutdownException e) {
+            log.info("Shutting down from initialization");
+            // Almost certainly, we arrived here because the state is already 
PENDING_SHUTDOWN, but it's harmless to
+            // just make sure
+            setState(State.PENDING_SHUTDOWN);
+            setState(State.DEAD);
+            streamsMetrics.removeAllThreadLevelSensors();
+            log.info("Shutdown complete");
+            return;
+        }
 
         if (stateConsumer == null) {
             // during initialization, the caller thread would wait for the 
state consumer
@@ -275,6 +302,7 @@ public class GlobalStreamThread extends Thread {
             setState(State.DEAD);
 
             log.warn("Error happened during initialization of the global state 
store; this thread has shutdown");
+            streamsMetrics.removeAllThreadLevelSensors();
 
             return;
         }
@@ -314,7 +342,14 @@ public class GlobalStreamThread extends Thread {
                 globalConsumer,
                 stateDirectory,
                 stateRestoreListener,
-                config);
+                config,
+                new GlobalStateManagerImpl.IsRunning() {
+                    @Override
+                    public boolean check() {
+                        return stillStarting() || stillRunning();
+                    }
+                }
+            );
 
             final GlobalProcessorContextImpl globalProcessorContext = new 
GlobalProcessorContextImpl(
                 config,
@@ -367,5 +402,10 @@ public class GlobalStreamThread extends Thread {
         // one could call shutdown() multiple times, so ignore subsequent calls
         // if already shutting down or dead
         setState(PENDING_SHUTDOWN);
+        globalConsumer.wakeup();
+    }
+
+    public Map<MetricName, Metric> consumerMetrics() {
+        return Collections.unmodifiableMap(globalConsumer.metrics());
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 5fcba76..9b67fc4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -40,6 +40,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
+
 public class StoreChangelogReader implements ChangelogReader {
 
     private final Logger log;
@@ -81,7 +83,7 @@ public class StoreChangelogReader implements ChangelogReader {
 
         final Set<TopicPartition> restoringPartitions = new 
HashSet<>(needsRestoring.keySet());
         try {
-            final ConsumerRecords<byte[], byte[]> allRecords = 
restoreConsumer.poll(10);
+            final ConsumerRecords<byte[], byte[]> allRecords = 
poll(restoreConsumer, 10);
             for (final TopicPartition partition : restoringPartitions) {
                 restorePartition(allRecords, partition, 
active.restoringTaskFor(partition));
             }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index cc5a07f..32993f6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -26,6 +26,8 @@ import 
org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
@@ -54,6 +56,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -62,6 +65,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Collections.singleton;
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
 
 public class StreamThread extends Thread {
 
@@ -824,7 +828,7 @@ public class StreamThread extends Thread {
         ConsumerRecords<byte[], byte[]> records = null;
 
         try {
-            records = consumer.poll(pollTimeMs);
+            records = poll(consumer, pollTimeMs);
         } catch (final InvalidOffsetException e) {
             resetInvalidOffsets(e);
         }
@@ -1051,7 +1055,7 @@ public class StreamThread extends Thread {
             }
 
             try {
-                final ConsumerRecords<byte[], byte[]> records = 
restoreConsumer.poll(0);
+                final ConsumerRecords<byte[], byte[]> records = 
poll(restoreConsumer, 0);
 
                 if (!records.isEmpty()) {
                     for (final TopicPartition partition : 
records.partitions()) {
@@ -1116,6 +1120,8 @@ public class StreamThread extends Thread {
     public void shutdown() {
         log.info("Informed to shut down");
         final State oldState = setState(State.PENDING_SHUTDOWN);
+        consumer.wakeup();
+        restoreConsumer.wakeup();
         if (oldState == State.CREATED) {
             // The thread may not have been started. Take responsibility for 
shutting down
             completeShutdown(true);
@@ -1210,4 +1216,13 @@ public class StreamThread extends Thread {
     Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() 
{
         return standbyRecords;
     }
+
+    public Map<MetricName, Metric> consumerMetrics() {
+        final Map<MetricName, ? extends Metric> consumerMetrics = 
consumer.metrics();
+        final Map<MetricName, ? extends Metric> restoreConsumerMetrics = 
restoreConsumer.metrics();
+        final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>();
+        result.putAll(consumerMetrics);
+        result.putAll(restoreConsumerMetrics);
+        return result;
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 1dc8602..1c33f64 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -18,8 +18,6 @@ package org.apache.kafka.streams;
 
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.config.ConfigException;
@@ -43,6 +41,7 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -232,7 +231,43 @@ public class KafkaStreamsTest {
 
         streams.close();
         assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
+    }
+
+    @Ignore // this test cannot pass as long as GST blocks KS.start()
+    @Test
+    public void testGlobalThreadCloseWithoutConnectingToBroker() {
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:1");
+        props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MockMetricsReporter.class.getName());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
 
+        final StreamsBuilder builder = new StreamsBuilder();
+        // make sure we have the global state thread running too
+        builder.globalTable("anyTopic");
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        streams.start();
+        streams.close();
+        // There's nothing to assert... We're testing that this operation 
actually completes.
+    }
+
+    @Test
+    public void testLocalThreadCloseWithoutConnectingToBroker() {
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:1");
+        props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MockMetricsReporter.class.getName());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        // make sure we have the global state thread running too
+        builder.table("anyTopic");
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        streams.start();
+        streams.close();
+        // There's nothing to assert... We're testing that this operation 
actually completes.
     }
 
 
@@ -328,16 +363,6 @@ public class KafkaStreamsTest {
     }
 
     @Test
-    public void testNumberDefaultMetrics() {
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1");
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        final Map<MetricName, ? extends Metric> metrics = streams.metrics();
-        // all 22 default StreamThread metrics + 1 metric that keeps track of 
number of metrics
-        assertEquals(23, metrics.size());
-    }
-
-    @Test
     public void testIllegalMetricsConfig() {
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, 
"illegalConfig");
         final StreamsBuilder builder = new StreamsBuilder();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 2d9c8c4..e8ee507 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -55,6 +55,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -118,8 +119,9 @@ public class InternalTopicIntegrationTest {
             final Map<String, Properties> topicConfigs = 
scala.collection.JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs());
 
             for (Map.Entry<String, Properties> topicConfig : 
topicConfigs.entrySet()) {
-                if (topicConfig.getKey().equals(changelog))
+                if (topicConfig.getKey().equals(changelog)) {
                     return topicConfig.getValue();
+                }
             }
 
             return new Properties();
@@ -157,6 +159,7 @@ public class InternalTopicIntegrationTest {
         //
         // Step 3: Verify the state changelog topics are compact
         //
+        waitForCompletion(streams, 2, 5000);
         streams.close();
 
         final Properties changelogProps = 
getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "Counts"));
@@ -201,6 +204,7 @@ public class InternalTopicIntegrationTest {
         //
         // Step 3: Verify the state changelog topics are compact
         //
+        waitForCompletion(streams, 2, 5000);
         streams.close();
         final Properties properties = 
getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, 
"CountWindows"));
         final List<String> policies = 
Arrays.asList(properties.getProperty(LogConfig.CleanupPolicyProp()).split(","));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index e8cd59e..ec97f12 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -28,10 +28,12 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.requests.UpdateMetadataRequest;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.test.TestCondition;
@@ -49,6 +51,8 @@ import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
+
 /**
  * Utility functions to make integration testing more convenient.
  */
@@ -158,6 +162,40 @@ public class IntegrationTestUtils {
         produceKeyValuesSynchronously(topic, keyedRecords, producerConfig, 
time, enableTransactions);
     }
 
+    /**
+     * Wait for streams to "finish", based on the consumer lag metric.
+     *
+     * Caveats:
+     * - Inputs must be finite, fully loaded, and flushed before this method 
is called
+     * - expectedPartitions is the total number of partitions to watch the lag 
on, including both input and internal.
+     *   It's somewhat ok to get this wrong, as the main failure case would be 
an immediate return due to the clients
+     *   not being initialized, which you can avoid with any non-zero value. 
But it's probably better to get it right ;)
+     */
+    public static void waitForCompletion(final KafkaStreams streams,
+                                         final int expectedPartitions,
+                                         final int timeoutMilliseconds) {
+        final long start = System.currentTimeMillis();
+        while (true) {
+            int lagMetrics = 0;
+            double totalLag = 0.0;
+            for (final Metric metric : streams.metrics().values()) {
+                if (metric.metricName().name().equals("records-lag")) {
+                    lagMetrics++;
+                    totalLag += ((Number) metric.metricValue()).doubleValue();
+                }
+            }
+            if (lagMetrics >= expectedPartitions && totalLag == 0.0) {
+                return;
+            }
+            if (System.currentTimeMillis() - start >= timeoutMilliseconds) {
+                throw new RuntimeException(String.format(
+                    "Timed out waiting for completion. lagMetrics=[%s/%s] 
totalLag=[%s]",
+                    lagMetrics, expectedPartitions, totalLag
+                ));
+            }
+        }
+    }
+
     public static <K, V> List<KeyValue<K, V>> 
waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
                                                                                
   final String topic,
                                                                                
   final int expectedNumRecords) throws InterruptedException {
@@ -352,7 +390,7 @@ public class IntegrationTestUtils {
         while (totalPollTimeMs < waitTime &&
             continueConsuming(consumedValues.size(), maxMessages)) {
             totalPollTimeMs += pollIntervalMs;
-            final ConsumerRecords<K, V> records = 
consumer.poll(pollIntervalMs);
+            final ConsumerRecords<K, V> records = poll(consumer, 
pollIntervalMs);
 
             for (final ConsumerRecord<K, V> record : records) {
                 consumedValues.add(new KeyValue<>(record.key(), 
record.value()));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java 
b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index d956f27..9cd6857 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -62,6 +62,8 @@ import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
+
 /**
  * Class that provides support for a series of benchmarks. It is usually 
driven by
  * tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py.
@@ -334,7 +336,7 @@ public class SimpleBenchmark {
             consumer.seekToBeginning(partitions);
 
             while (true) {
-                final ConsumerRecords<Integer, byte[]> records = 
consumer.poll(POLL_MS);
+                final ConsumerRecords<Integer, byte[]> records = 
poll(consumer, POLL_MS);
                 if (records.isEmpty()) {
                     if (processedRecords == numRecords) {
                         break;
@@ -372,7 +374,7 @@ public class SimpleBenchmark {
             consumer.seekToBeginning(partitions);
 
             while (true) {
-                final ConsumerRecords<Integer, byte[]> records = 
consumer.poll(POLL_MS);
+                final ConsumerRecords<Integer, byte[]> records = 
poll(consumer, POLL_MS);
                 if (records.isEmpty()) {
                     if (processedRecords == numRecords) {
                         break;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index d19e63e..7935271 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -79,6 +79,14 @@ public class GlobalStateManagerImplTest {
     private final TopicPartition t2 = new TopicPartition("t2", 1);
     private final TopicPartition t3 = new TopicPartition("t3", 1);
     private final TopicPartition t4 = new TopicPartition("t4", 1);
+
+    private final GlobalStateManagerImpl.IsRunning alwaysRunning = new 
GlobalStateManagerImpl.IsRunning() {
+        @Override
+        public boolean check() {
+            return true;
+        }
+    };
+
     private GlobalStateManagerImpl stateManager;
     private StateDirectory stateDirectory;
     private StreamsConfig streamsConfig;
@@ -119,7 +127,8 @@ public class GlobalStateManagerImplTest {
             consumer,
             stateDirectory,
             stateRestoreListener,
-            streamsConfig);
+            streamsConfig,
+            alwaysRunning);
         processorContext = new 
InternalMockProcessorContext(stateDirectory.globalStateDir(), streamsConfig);
         stateManager.setGlobalProcessorContext(processorContext);
         checkpointFile = new File(stateManager.baseDir(), 
ProcessorStateManager.CHECKPOINT_FILE_NAME);
@@ -496,12 +505,20 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void 
shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() {
-        stateManager = new GlobalStateManagerImpl(new LogContext("mock"), 
topology, consumer, new StateDirectory(streamsConfig, time) {
-            @Override
-            public boolean lockGlobalState() throws IOException {
-                throw new IOException("KABOOM!");
-            }
-        }, stateRestoreListener, streamsConfig);
+        stateManager = new GlobalStateManagerImpl(
+            new LogContext("mock"),
+            topology,
+            consumer,
+            new StateDirectory(streamsConfig, time) {
+                @Override
+                public boolean lockGlobalState() throws IOException {
+                    throw new IOException("KABOOM!");
+                }
+            },
+            stateRestoreListener,
+            streamsConfig,
+            alwaysRunning
+        );
 
         try {
             stateManager.initialize();
@@ -538,7 +555,8 @@ public class GlobalStateManagerImplTest {
                 consumer,
                 stateDirectory,
                 stateRestoreListener,
-                streamsConfig);
+                streamsConfig,
+                alwaysRunning);
         } catch (final StreamsException expected) {
             assertEquals(numberOfCalls.get(), retries);
         }
@@ -571,7 +589,8 @@ public class GlobalStateManagerImplTest {
                 consumer,
                 stateDirectory,
                 stateRestoreListener,
-                streamsConfig);
+                streamsConfig,
+                alwaysRunning);
         } catch (final StreamsException expected) {
             assertEquals(numberOfCalls.get(), retries);
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 605ab33..54ea1ce 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -63,6 +63,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Collections.singleton;
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -189,7 +190,7 @@ public class StandbyTaskTest {
         }
 
         restoreStateConsumer.seekToBeginning(partition);
-        task.update(partition2, 
restoreStateConsumer.poll(100).records(partition2));
+        task.update(partition2, poll(restoreStateConsumer, 
100).records(partition2));
 
         StandbyContextImpl context = (StandbyContextImpl) task.context();
         MockStateStore store1 = (MockStateStore) 
context.getStateMgr().getStore(storeName1);
@@ -246,7 +247,7 @@ public class StandbyTaskTest {
         }
 
         // The commit offset is at 0L. Records should not be processed
-        List<ConsumerRecord<byte[], byte[]>> remaining = 
task.update(globalTopicPartition, 
restoreStateConsumer.poll(100).records(globalTopicPartition));
+        List<ConsumerRecord<byte[], byte[]>> remaining = 
task.update(globalTopicPartition, poll(restoreStateConsumer, 
100).records(globalTopicPartition));
         assertEquals(5, remaining.size());
 
         committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), 
globalTopicPartition.partition()), new OffsetAndMetadata(10L));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index e897088..c420d98 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -42,6 +42,8 @@ import java.util.Locale;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
+
 public class BrokerCompatibilityTest {
 
     private static final String SOURCE_TOPIC = 
"brokerCompatibilitySourceTopic";
@@ -153,7 +155,7 @@ public class BrokerCompatibilityTest {
             consumer.subscribe(Collections.singletonList(SINK_TOPIC));
 
             while (true) {
-                final ConsumerRecords<String, String> records = 
consumer.poll(100);
+                final ConsumerRecords<String, String> records = poll(consumer, 
100);
                 for (final ConsumerRecord<String, String> record : records) {
                     if (record.key().equals("key") && 
record.value().equals("1")) {
                         return;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index 752cdd6..513d592 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -52,6 +52,8 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
+
 public class EosTestDriver extends SmokeTestUtil {
 
     private static final int MAX_NUMBER_OF_KEYS = 100;
@@ -254,7 +256,7 @@ public class EosTestDriver extends SmokeTestUtil {
                 topics.add("repartition");
             }
             consumer.subscribe(topics);
-            consumer.poll(0);
+            poll(consumer, 0);
 
             final Set<TopicPartition> partitions = new HashSet<>();
             for (final String topic : topics) {
@@ -284,7 +286,7 @@ public class EosTestDriver extends SmokeTestUtil {
         long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
         boolean allRecordsReceived = false;
         while (!allRecordsReceived && System.currentTimeMillis() < 
maxWaitTime) {
-            final ConsumerRecords<byte[], byte[]> receivedRecords = 
consumer.poll(100);
+            final ConsumerRecords<byte[], byte[]> receivedRecords = 
poll(consumer, 100);
 
             for (final ConsumerRecord<byte[], byte[]> record : 
receivedRecords) {
                 maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
@@ -591,7 +593,7 @@ public class EosTestDriver extends SmokeTestUtil {
 
         long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
         while (!partitions.isEmpty() && System.currentTimeMillis() < 
maxWaitTime) {
-            final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
+            final ConsumerRecords<byte[], byte[]> records = poll(consumer, 
100);
             if (records.isEmpty()) {
                 System.out.println("No data received.");
                 for (final TopicPartition tp : partitions) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 50330a0..74eac3f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -47,6 +47,8 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
+
 public class SmokeTestDriver extends SmokeTestUtil {
 
     public static final int MAX_RECORD_EMPTY_RETRIES = 60;
@@ -289,7 +291,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         int retry = 0;
         final long start = System.currentTimeMillis();
         while (System.currentTimeMillis() - start < 
TimeUnit.MINUTES.toMillis(6)) {
-            ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+            ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
             if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
                 if (verifyMin(min, allData, false)
                     && verifyMax(max, allData, false)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java 
b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
index ad19f32..49f699e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
@@ -40,6 +40,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
+import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -74,7 +75,7 @@ public class StreamsResetterTest {
 
         streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(3, records.count());
     }
 
@@ -90,7 +91,7 @@ public class StreamsResetterTest {
 
         streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(2, records.count());
     }
 
@@ -106,7 +107,7 @@ public class StreamsResetterTest {
 
         streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 4L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(2, records.count());
     }
 
@@ -122,7 +123,7 @@ public class StreamsResetterTest {
 
         streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 3L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(2, records.count());
     }
 
@@ -138,7 +139,7 @@ public class StreamsResetterTest {
 
         streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, -3L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(5, records.count());
     }
 
@@ -154,7 +155,7 @@ public class StreamsResetterTest {
 
         streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 5L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(2, records.count());
     }
 
@@ -172,7 +173,7 @@ public class StreamsResetterTest {
         topicPartitionsAndOffset.put(topicPartition, 3L);
         streamsResetter.resetOffsetsFromResetPlan(consumer, 
inputTopicPartitions, topicPartitionsAndOffset);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(2, records.count());
     }
 
@@ -190,7 +191,7 @@ public class StreamsResetterTest {
         topicPartitionsAndOffset.put(topicPartition, 1L);
         streamsResetter.resetOffsetsFromResetPlan(consumer, 
inputTopicPartitions, topicPartitionsAndOffset);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(2, records.count());
     }
 
@@ -208,7 +209,7 @@ public class StreamsResetterTest {
         topicPartitionsAndOffset.put(topicPartition, 5L);
         streamsResetter.resetOffsetsFromResetPlan(consumer, 
inputTopicPartitions, topicPartitionsAndOffset);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(2, records.count());
     }
 
@@ -226,7 +227,7 @@ public class StreamsResetterTest {
         intermediateTopicPartitions.add(topicPartition);
         streamsResetter.maybeSeekToEnd("g1", consumer, 
intermediateTopicPartitions);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
         assertEquals(2, records.count());
     }
 
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 7343d59..f6bbc4b 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -309,7 +309,13 @@ public class TopologyTestDriver implements Closeable {
                 consumer,
                 stateDirectory,
                 stateRestoreListener,
-                streamsConfig);
+                streamsConfig,
+                new GlobalStateManagerImpl.IsRunning() {
+                    @Override
+                    public boolean check() {
+                        return true;
+                    }
+                });
 
             final GlobalProcessorContextImpl globalProcessorContext
                 = new GlobalProcessorContextImpl(streamsConfig, 
globalStateManager, streamsMetrics, cache);

-- 
To stop receiving notification emails like this one, please contact
guozh...@apache.org.

Reply via email to