[ 
https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480778#comment-16480778
 ] 

ASF GitHub Bot commented on KAFKA-5697:
---------------------------------------

guozhangwang closed pull request #5035: KAFKA-5697: Revert streams wakeup
URL: https://github.com/apache/kafka/pull/5035
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java 
b/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java
deleted file mode 100644
index d404642793c..00000000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.errors;
-
-public class ShutdownException extends StreamsException {
-    public ShutdownException(final String message) {
-        super(message);
-    }
-
-    public ShutdownException(final String message, final Throwable throwable) {
-        super(message, throwable);
-    }
-
-    public ShutdownException(final Throwable throwable) {
-        super(throwable);
-    }
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java
deleted file mode 100644
index 8b912579b9a..00000000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-
-import java.util.Collections;
-import java.util.List;
-
-public final class ConsumerUtils {
-    private ConsumerUtils() {}
-
-    public static <K, V> ConsumerRecords<K, V> poll(final Consumer<K, V> 
consumer, final long maxDurationMs) {
-        try {
-            return consumer.poll(maxDurationMs);
-        } catch (final WakeupException e) {
-            return new ConsumerRecords<>(Collections.<TopicPartition, 
List<ConsumerRecord<K, V>>>emptyMap());
-        }
-    }
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 017f2da198f..e8ec5e9fe5f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -23,14 +23,12 @@
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.errors.ShutdownException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -48,8 +46,6 @@
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
-
 /**
  * This class is responsible for the initialization, restoration, closing, 
flushing etc
  * of Global State Stores. There is only ever 1 instance of this class per 
Application Instance.
@@ -64,15 +60,13 @@
     private InternalProcessorContext processorContext;
     private final int retries;
     private final long retryBackoffMs;
-    private final IsRunning isRunning;
 
     public GlobalStateManagerImpl(final LogContext logContext,
                                   final ProcessorTopology topology,
                                   final Consumer<byte[], byte[]> 
globalConsumer,
                                   final StateDirectory stateDirectory,
                                   final StateRestoreListener 
stateRestoreListener,
-                                  final StreamsConfig config,
-                                  final IsRunning isRunning) {
+                                  final StreamsConfig config) {
         super(stateDirectory.globalStateDir());
 
         this.log = logContext.logger(GlobalStateManagerImpl.class);
@@ -82,11 +76,6 @@ public GlobalStateManagerImpl(final LogContext logContext,
         this.stateRestoreListener = stateRestoreListener;
         this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG);
         this.retryBackoffMs = 
config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
-        this.isRunning = isRunning;
-    }
-
-    public interface IsRunning {
-        boolean check();
     }
 
     @Override
@@ -211,13 +200,6 @@ public void register(final StateStore store,
             try {
                 partitionInfos = globalConsumer.partitionsFor(sourceTopic);
                 break;
-            } catch (final WakeupException wakeupException) {
-                if (isRunning.check()) {
-                    // note we may decide later that this condition is ok and 
just let the retry loop continue
-                    throw new IllegalStateException("Got unexpected 
WakeupException during initialization.", wakeupException);
-                } else {
-                    throw new ShutdownException("Shutting down from fetching 
partitions");
-                }
             } catch (final TimeoutException retryableException) {
                 if (++attempts > retries) {
                     log.error("Failed to get partitions for topic {} after {} 
retry attempts due to timeout. " +
@@ -268,20 +250,19 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 
             long offset = globalConsumer.position(topicPartition);
             final Long highWatermark = highWatermarks.get(topicPartition);
-            final BatchingStateRestoreCallback stateRestoreAdapter =
-                (BatchingStateRestoreCallback) ((stateRestoreCallback 
instanceof BatchingStateRestoreCallback)
-                    ? stateRestoreCallback
-                    : new 
WrappedBatchingStateRestoreCallback(stateRestoreCallback));
+            BatchingStateRestoreCallback
+                stateRestoreAdapter =
+                (BatchingStateRestoreCallback) ((stateRestoreCallback 
instanceof
+                                                     
BatchingStateRestoreCallback)
+                                                ? stateRestoreCallback
+                                                : new 
WrappedBatchingStateRestoreCallback(stateRestoreCallback));
 
             stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
             long restoreCount = 0L;
 
             while (offset < highWatermark) {
-                if (!isRunning.check()) {
-                    throw new ShutdownException("Streams is not running (any 
more)");
-                }
                 try {
-                    final ConsumerRecords<byte[], byte[]> records = 
poll(globalConsumer, 100);
+                    final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(100);
                     final List<KeyValue<byte[], byte[]>> restoreRecords = new 
ArrayList<>();
                     for (ConsumerRecord<byte[], byte[]> record : records) {
                         if (record.key() != null) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 4b6bfb1fd65..112011f47b8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -29,7 +29,6 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
-import org.apache.kafka.streams.errors.ShutdownException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -43,7 +42,6 @@
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
 import static 
org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
 import static 
org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.PENDING_SHUTDOWN;
 
@@ -108,10 +106,6 @@ public boolean isRunning() {
             return equals(RUNNING);
         }
 
-        public boolean isStarting() {
-            return equals(CREATED);
-        }
-
         @Override
         public boolean isValidTransition(final ThreadStateTransitionValidator 
newState) {
             final State tmpState = (State) newState;
@@ -179,12 +173,6 @@ public boolean stillRunning() {
         }
     }
 
-    private boolean stillStarting() {
-        synchronized (stateLock) {
-            return state.isStarting();
-        }
-    }
-
     public GlobalStreamThread(final ProcessorTopology topology,
                               final StreamsConfig config,
                               final Consumer<byte[], byte[]> globalConsumer,
@@ -247,7 +235,7 @@ void initialize() {
 
         void pollAndUpdate() {
             try {
-                final ConsumerRecords<byte[], byte[]> received = 
poll(globalConsumer, pollMs);
+                final ConsumerRecords<byte[], byte[]> received = 
globalConsumer.poll(pollMs);
                 for (final ConsumerRecord<byte[], byte[]> record : received) {
                     stateMaintainer.update(record);
                 }
@@ -278,19 +266,7 @@ public void close() throws IOException {
 
     @Override
     public void run() {
-        final StateConsumer stateConsumer;
-        try {
-            stateConsumer = initialize();
-        } catch (final ShutdownException e) {
-            log.info("Shutting down from initialization");
-            // Almost certainly, we arrived here because the state is already 
PENDING_SHUTDOWN, but it's harmless to
-            // just make sure
-            setState(State.PENDING_SHUTDOWN);
-            setState(State.DEAD);
-            streamsMetrics.removeAllThreadLevelSensors();
-            log.info("Shutdown complete");
-            return;
-        }
+        final StateConsumer stateConsumer = initialize();
 
         if (stateConsumer == null) {
             // during initialization, the caller thread would wait for the 
state consumer
@@ -342,14 +318,7 @@ private StateConsumer initialize() {
                 globalConsumer,
                 stateDirectory,
                 stateRestoreListener,
-                config,
-                new GlobalStateManagerImpl.IsRunning() {
-                    @Override
-                    public boolean check() {
-                        return stillStarting() || stillRunning();
-                    }
-                }
-            );
+                config);
 
             final GlobalProcessorContextImpl globalProcessorContext = new 
GlobalProcessorContextImpl(
                 config,
@@ -402,7 +371,6 @@ public void shutdown() {
         // one could call shutdown() multiple times, so ignore subsequent calls
         // if already shutting down or dead
         setState(PENDING_SHUTDOWN);
-        globalConsumer.wakeup();
     }
 
     public Map<MetricName, Metric> consumerMetrics() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 9b67fc4263c..5fcba76570e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -40,8 +40,6 @@
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
-
 public class StoreChangelogReader implements ChangelogReader {
 
     private final Logger log;
@@ -83,7 +81,7 @@ public void register(final StateRestorer restorer) {
 
         final Set<TopicPartition> restoringPartitions = new 
HashSet<>(needsRestoring.keySet());
         try {
-            final ConsumerRecords<byte[], byte[]> allRecords = 
poll(restoreConsumer, 10);
+            final ConsumerRecords<byte[], byte[]> allRecords = 
restoreConsumer.poll(10);
             for (final TopicPartition partition : restoringPartitions) {
                 restorePartition(allRecords, partition, 
active.restoringTaskFor(partition));
             }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ddefd9ce0a5..3080d2e1583 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -65,7 +65,6 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Collections.singleton;
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
 
 public class StreamThread extends Thread {
 
@@ -832,7 +831,7 @@ long runOnce(final long recordsProcessedBeforeCommit) {
         ConsumerRecords<byte[], byte[]> records = null;
 
         try {
-            records = poll(consumer, pollTimeMs);
+            records = consumer.poll(pollTimeMs);
         } catch (final InvalidOffsetException e) {
             resetInvalidOffsets(e);
         }
@@ -1059,7 +1058,7 @@ private void maybeUpdateStandbyTasks(final long now) {
             }
 
             try {
-                final ConsumerRecords<byte[], byte[]> records = 
poll(restoreConsumer, 0);
+                final ConsumerRecords<byte[], byte[]> records = 
restoreConsumer.poll(0);
 
                 if (!records.isEmpty()) {
                     for (final TopicPartition partition : 
records.partitions()) {
@@ -1124,8 +1123,6 @@ private long computeLatency() {
     public void shutdown() {
         log.info("Informed to shut down");
         final State oldState = setState(State.PENDING_SHUTDOWN);
-        consumer.wakeup();
-        restoreConsumer.wakeup();
         if (oldState == State.CREATED) {
             // The thread may not have been started. Take responsibility for 
shutting down
             completeShutdown(true);
@@ -1229,10 +1226,10 @@ TaskManager taskManager() {
                 result.putAll(producerMetrics);
             }
         } else {
-            // When EOS is turned on, each task will has its own producer 
client
+            // When EOS is turned on, each task will have its own producer 
client
             // and the producer object passed in here will be null. We would 
then iterate through
             // all the active tasks and add their metrics to the output 
metrics map.
-            for (StreamTask task: taskManager.activeTasks().values()) {
+            for (final StreamTask task: taskManager.activeTasks().values()) {
                 final Map<MetricName, ? extends Metric> taskProducerMetrics = 
task.getProducer().metrics();
                 result.putAll(taskProducerMetrics);
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index cdfc470c51b..904ebe2a611 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -253,6 +253,7 @@ public void 
testGlobalThreadCloseWithoutConnectingToBroker() {
         // There's nothing to assert... We're testing that this operation 
actually completes.
     }
 
+    @Ignore // this test cannot pass until we implement KIP-266
     @Test
     public void testLocalThreadCloseWithoutConnectingToBroker() {
         final Properties props = new Properties();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index ec97f125ece..d306ee4607e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -51,8 +51,6 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
-
 /**
  * Utility functions to make integration testing more convenient.
  */
@@ -390,7 +388,7 @@ public boolean conditionMet() {
         while (totalPollTimeMs < waitTime &&
             continueConsuming(consumedValues.size(), maxMessages)) {
             totalPollTimeMs += pollIntervalMs;
-            final ConsumerRecords<K, V> records = poll(consumer, 
pollIntervalMs);
+            final ConsumerRecords<K, V> records = 
consumer.poll(pollIntervalMs);
 
             for (final ConsumerRecord<K, V> record : records) {
                 consumedValues.add(new KeyValue<>(record.key(), 
record.value()));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java 
b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index a660e6770bc..8187467aaa6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -62,8 +62,6 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
-
 /**
  * Class that provides support for a series of benchmarks. It is usually 
driven by
  * tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py.
@@ -336,7 +334,7 @@ private void consumeAndProduce(final String topic) {
             consumer.seekToBeginning(partitions);
 
             while (true) {
-                final ConsumerRecords<Integer, byte[]> records = 
poll(consumer, POLL_MS);
+                final ConsumerRecords<Integer, byte[]> records = 
consumer.poll(POLL_MS);
                 if (records.isEmpty()) {
                     if (processedRecords == numRecords) {
                         break;
@@ -374,7 +372,7 @@ private void consume(final String topic) {
             consumer.seekToBeginning(partitions);
 
             while (true) {
-                final ConsumerRecords<Integer, byte[]> records = 
poll(consumer, POLL_MS);
+                final ConsumerRecords<Integer, byte[]> records = 
consumer.poll(POLL_MS);
                 if (records.isEmpty()) {
                     if (processedRecords == numRecords) {
                         break;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 7935271cdf9..2ca9c211c1c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -79,14 +79,6 @@
     private final TopicPartition t2 = new TopicPartition("t2", 1);
     private final TopicPartition t3 = new TopicPartition("t3", 1);
     private final TopicPartition t4 = new TopicPartition("t4", 1);
-
-    private final GlobalStateManagerImpl.IsRunning alwaysRunning = new 
GlobalStateManagerImpl.IsRunning() {
-        @Override
-        public boolean check() {
-            return true;
-        }
-    };
-
     private GlobalStateManagerImpl stateManager;
     private StateDirectory stateDirectory;
     private StreamsConfig streamsConfig;
@@ -127,8 +119,7 @@ public void before() throws IOException {
             consumer,
             stateDirectory,
             stateRestoreListener,
-            streamsConfig,
-            alwaysRunning);
+            streamsConfig);
         processorContext = new 
InternalMockProcessorContext(stateDirectory.globalStateDir(), streamsConfig);
         stateManager.setGlobalProcessorContext(processorContext);
         checkpointFile = new File(stateManager.baseDir(), 
ProcessorStateManager.CHECKPOINT_FILE_NAME);
@@ -516,8 +507,7 @@ public boolean lockGlobalState() throws IOException {
                 }
             },
             stateRestoreListener,
-            streamsConfig,
-            alwaysRunning
+            streamsConfig
         );
 
         try {
@@ -555,8 +545,7 @@ public void 
shouldRetryWhenEndOffsetsThrowsTimeoutException() {
                 consumer,
                 stateDirectory,
                 stateRestoreListener,
-                streamsConfig,
-                alwaysRunning);
+                streamsConfig);
         } catch (final StreamsException expected) {
             assertEquals(numberOfCalls.get(), retries);
         }
@@ -589,8 +578,7 @@ public void 
shouldRetryWhenPartitionsForThrowsTimeoutException() {
                 consumer,
                 stateDirectory,
                 stateRestoreListener,
-                streamsConfig,
-                alwaysRunning);
+                streamsConfig);
         } catch (final StreamsException expected) {
             assertEquals(numberOfCalls.get(), retries);
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 486b35e7ea9..93d6a0d931b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -62,7 +62,6 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Collections.singleton;
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -189,7 +188,7 @@ public void testUpdate() throws IOException {
         }
 
         restoreStateConsumer.seekToBeginning(partition);
-        task.update(partition2, poll(restoreStateConsumer, 
100).records(partition2));
+        task.update(partition2, 
restoreStateConsumer.poll(100).records(partition2));
 
         StandbyContextImpl context = (StandbyContextImpl) task.context();
         MockStateStore store1 = (MockStateStore) 
context.getStateMgr().getStore(storeName1);
@@ -246,7 +245,7 @@ public void testUpdateKTable() throws IOException {
         }
 
         // The commit offset is at 0L. Records should not be processed
-        List<ConsumerRecord<byte[], byte[]>> remaining = 
task.update(globalTopicPartition, poll(restoreStateConsumer, 
100).records(globalTopicPartition));
+        List<ConsumerRecord<byte[], byte[]>> remaining = 
task.update(globalTopicPartition, 
restoreStateConsumer.poll(100).records(globalTopicPartition));
         assertEquals(5, remaining.size());
 
         committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), 
globalTopicPartition.partition()), new OffsetAndMetadata(10L));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index c420d9878a4..e897088beca 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -42,8 +42,6 @@
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
-
 public class BrokerCompatibilityTest {
 
     private static final String SOURCE_TOPIC = 
"brokerCompatibilitySourceTopic";
@@ -155,7 +153,7 @@ private static void loopUntilRecordReceived(final String 
kafka, final boolean eo
             consumer.subscribe(Collections.singletonList(SINK_TOPIC));
 
             while (true) {
-                final ConsumerRecords<String, String> records = poll(consumer, 
100);
+                final ConsumerRecords<String, String> records = 
consumer.poll(100);
                 for (final ConsumerRecord<String, String> record : records) {
                     if (record.key().equals("key") && 
record.value().equals("1")) {
                         return;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index 513d592fa38..752cdd696ed 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -52,8 +52,6 @@
 import java.util.Random;
 import java.util.Set;
 
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
-
 public class EosTestDriver extends SmokeTestUtil {
 
     private static final int MAX_NUMBER_OF_KEYS = 100;
@@ -256,7 +254,7 @@ private static void ensureStreamsApplicationDown(final 
String kafka) {
                 topics.add("repartition");
             }
             consumer.subscribe(topics);
-            poll(consumer, 0);
+            consumer.poll(0);
 
             final Set<TopicPartition> partitions = new HashSet<>();
             for (final String topic : topics) {
@@ -286,7 +284,7 @@ private static void ensureStreamsApplicationDown(final 
String kafka) {
         long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
         boolean allRecordsReceived = false;
         while (!allRecordsReceived && System.currentTimeMillis() < 
maxWaitTime) {
-            final ConsumerRecords<byte[], byte[]> receivedRecords = 
poll(consumer, 100);
+            final ConsumerRecords<byte[], byte[]> receivedRecords = 
consumer.poll(100);
 
             for (final ConsumerRecord<byte[], byte[]> record : 
receivedRecords) {
                 maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
@@ -593,7 +591,7 @@ public void onCompletion(final RecordMetadata metadata, 
final Exception exceptio
 
         long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
         while (!partitions.isEmpty() && System.currentTimeMillis() < 
maxWaitTime) {
-            final ConsumerRecords<byte[], byte[]> records = poll(consumer, 
100);
+            final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
             if (records.isEmpty()) {
                 System.out.println("No data received.");
                 for (final TopicPartition tp : partitions) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 74eac3f156c..50330a08e61 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -47,8 +47,6 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
-
 public class SmokeTestDriver extends SmokeTestUtil {
 
     public static final int MAX_RECORD_EMPTY_RETRIES = 60;
@@ -291,7 +289,7 @@ public static void verify(String kafka, Map<String, 
Set<Integer>> allData, int m
         int retry = 0;
         final long start = System.currentTimeMillis();
         while (System.currentTimeMillis() - start < 
TimeUnit.MINUTES.toMillis(6)) {
-            ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+            ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
             if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
                 if (verifyMin(min, allData, false)
                     && verifyMax(max, allData, false)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java 
b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
index 49f699ec658..ad19f32fd1d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
@@ -40,7 +40,6 @@
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
-import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -75,7 +74,7 @@ public void 
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
 
         streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(3, records.count());
     }
 
@@ -91,7 +90,7 @@ public void 
testResetToSpecificOffsetWhenBeforeBeginningOffset() {
 
         streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(2, records.count());
     }
 
@@ -107,7 +106,7 @@ public void testResetToSpecificOffsetWhenAfterEndOffset() {
 
         streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 4L);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(2, records.count());
     }
 
@@ -123,7 +122,7 @@ public void 
testShiftOffsetByWhenBetweenBeginningAndEndOffset() {
 
         streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 3L);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(2, records.count());
     }
 
@@ -139,7 +138,7 @@ public void testShiftOffsetByWhenBeforeBeginningOffset() {
 
         streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, -3L);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(5, records.count());
     }
 
@@ -155,7 +154,7 @@ public void testShiftOffsetByWhenAfterEndOffset() {
 
         streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 5L);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(2, records.count());
     }
 
@@ -173,7 +172,7 @@ public void 
testResetUsingPlanWhenBetweenBeginningAndEndOffset() {
         topicPartitionsAndOffset.put(topicPartition, 3L);
         streamsResetter.resetOffsetsFromResetPlan(consumer, 
inputTopicPartitions, topicPartitionsAndOffset);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(2, records.count());
     }
 
@@ -191,7 +190,7 @@ public void testResetUsingPlanWhenBeforeBeginningOffset() {
         topicPartitionsAndOffset.put(topicPartition, 1L);
         streamsResetter.resetOffsetsFromResetPlan(consumer, 
inputTopicPartitions, topicPartitionsAndOffset);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(2, records.count());
     }
 
@@ -209,7 +208,7 @@ public void testResetUsingPlanWhenAfterEndOffset() {
         topicPartitionsAndOffset.put(topicPartition, 5L);
         streamsResetter.resetOffsetsFromResetPlan(consumer, 
inputTopicPartitions, topicPartitionsAndOffset);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(2, records.count());
     }
 
@@ -227,7 +226,7 @@ public void shouldSeekToEndOffset() {
         intermediateTopicPartitions.add(topicPartition);
         streamsResetter.maybeSeekToEnd("g1", consumer, 
intermediateTopicPartitions);
 
-        final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
+        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
         assertEquals(2, records.count());
     }
 
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 7b8bdd5664d..c237ca77e1d 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -309,13 +309,7 @@ public void onRestoreEnd(final TopicPartition 
topicPartition, final String store
                 consumer,
                 stateDirectory,
                 stateRestoreListener,
-                streamsConfig,
-                new GlobalStateManagerImpl.IsRunning() {
-                    @Override
-                    public boolean check() {
-                        return true;
-                    }
-                });
+                streamsConfig);
 
             final GlobalProcessorContextImpl globalProcessorContext
                 = new GlobalProcessorContextImpl(streamsConfig, 
globalStateManager, streamsMetrics, cache);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StreamThread.shutdown() need to interrupt the stream threads to break the loop
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-5697
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5697
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: John Roesler
>            Priority: Major
>              Labels: newbie
>             Fix For: 2.0.0
>
>
> In {{StreamThread.shutdown()}} we currently do nothing but set the state, 
> hoping the stream thread may eventually check it and shutdown itself. 
> However, under certain scenarios the thread may get blocked within a single 
> loop and hence will never check on this state enum. For example, it's 
> {{consumer.poll}} call trigger {{ensureCoordinatorReady()}} which will block 
> until the coordinator can be found. If the coordinator broker is never up and 
> running then the Stream instance will be blocked forever.
> A simple way to produce this issue is to start the work count demo without 
> starting the ZK / Kafka broker, and then it will get stuck in a single loop 
> and even `ctrl-C` will not stop it since its set state will never be read by 
> the thread:
> {code:java}
> [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to