[ 
https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16506329#comment-16506329
 ] 

ASF GitHub Bot commented on KAFKA-5697:
---------------------------------------

guozhangwang closed pull request #5107: KAFKA-5697: Use nonblocking poll in 
Streams
URL: https://github.com/apache/kafka/pull/5107
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index e8ec5e9fe5f..4fd7a591eb6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -38,6 +38,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -60,6 +61,7 @@
     private InternalProcessorContext processorContext;
     private final int retries;
     private final long retryBackoffMs;
+    private final Duration pollTime;
 
     public GlobalStateManagerImpl(final LogContext logContext,
                                   final ProcessorTopology topology,
@@ -76,6 +78,7 @@ public GlobalStateManagerImpl(final LogContext logContext,
         this.stateRestoreListener = stateRestoreListener;
         this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG);
         this.retryBackoffMs = 
config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
+        this.pollTime = 
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
     }
 
     @Override
@@ -262,7 +265,7 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 
             while (offset < highWatermark) {
                 try {
-                    final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(100);
+                    final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(pollTime);
                     final List<KeyValue<byte[], byte[]>> restoreRecords = new 
ArrayList<>();
                     for (ConsumerRecord<byte[], byte[]> record : records) {
                         if (record.key() != null) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 112011f47b8..9d529c5455c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -36,6 +36,7 @@
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -200,7 +201,7 @@ public GlobalStreamThread(final ProcessorTopology topology,
         private final Consumer<byte[], byte[]> globalConsumer;
         private final GlobalStateMaintainer stateMaintainer;
         private final Time time;
-        private final long pollMs;
+        private final Duration pollTime;
         private final long flushInterval;
         private final Logger log;
 
@@ -210,13 +211,13 @@ public GlobalStreamThread(final ProcessorTopology 
topology,
                       final Consumer<byte[], byte[]> globalConsumer,
                       final GlobalStateMaintainer stateMaintainer,
                       final Time time,
-                      final long pollMs,
+                      final Duration pollTime,
                       final long flushInterval) {
             this.log = logContext.logger(getClass());
             this.globalConsumer = globalConsumer;
             this.stateMaintainer = stateMaintainer;
             this.time = time;
-            this.pollMs = pollMs;
+            this.pollTime = pollTime;
             this.flushInterval = flushInterval;
         }
 
@@ -235,7 +236,7 @@ void initialize() {
 
         void pollAndUpdate() {
             try {
-                final ConsumerRecords<byte[], byte[]> received = 
globalConsumer.poll(pollMs);
+                final ConsumerRecords<byte[], byte[]> received = 
globalConsumer.poll(pollTime);
                 for (final ConsumerRecord<byte[], byte[]> record : received) {
                     stateMaintainer.update(record);
                 }
@@ -338,8 +339,9 @@ private StateConsumer initialize() {
                     logContext
                 ),
                 time,
-                config.getLong(StreamsConfig.POLL_MS_CONFIG),
-                config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
+                
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)),
+                config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)
+            );
             stateConsumer.initialize();
 
             return stateConsumer;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index bb0ed069670..07af8019aef 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -29,6 +29,7 @@
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.slf4j.Logger;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -49,11 +50,14 @@
     private final Map<TopicPartition, StateRestorer> stateRestorers = new 
HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsRestoring = new 
HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsInitializing = new 
HashMap<>();
+    private final Duration pollTime;
 
     public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
+                                final Duration pollTime,
                                 final StateRestoreListener 
userStateRestoreListener,
                                 final LogContext logContext) {
         this.restoreConsumer = restoreConsumer;
+        this.pollTime = pollTime;
         this.log = logContext.logger(getClass());
         this.userStateRestoreListener = userStateRestoreListener;
     }
@@ -76,7 +80,7 @@ public void register(final StateRestorer restorer) {
         }
 
         try {
-            final ConsumerRecords<byte[], byte[]> records = 
restoreConsumer.poll(10);
+            final ConsumerRecords<byte[], byte[]> records = 
restoreConsumer.poll(pollTime);
             final Iterator<TopicPartition> iterator = 
needsRestoring.keySet().iterator();
             while (iterator.hasNext()) {
                 final TopicPartition partition = iterator.next();
@@ -295,6 +299,7 @@ private boolean hasPartition(final TopicPartition 
topicPartition) {
                 return true;
             }
         }
+
         return false;
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e72c4a5de94..a159e7b6c7a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -50,6 +50,7 @@
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.slf4j.Logger;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -212,7 +213,7 @@ State setState(final State newState) {
             if (newState == State.RUNNING) {
                 updateThreadMetadata(taskManager.activeTasks(), 
taskManager.standbyTasks());
             } else {
-                updateThreadMetadata(Collections.<TaskId, 
StreamTask>emptyMap(), Collections.<TaskId, StandbyTask>emptyMap());
+                updateThreadMetadata(Collections.emptyMap(), 
Collections.emptyMap());
             }
         }
 
@@ -555,7 +556,7 @@ StandbyTask createTask(final Consumer<byte[], byte[]> 
consumer,
     }
 
     private final Time time;
-    private final long pollTimeMs;
+    private final Duration pollTime;
     private final long commitTimeMs;
     private final Object stateLock;
     private final Logger log;
@@ -602,7 +603,8 @@ public static StreamThread create(final 
InternalTopologyBuilder builder,
         log.info("Creating restore consumer client");
         final Map<String, Object> restoreConsumerConfigs = 
config.getRestoreConsumerConfigs(threadClientId);
         final Consumer<byte[], byte[]> restoreConsumer = 
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
-        final StoreChangelogReader changelogReader = new 
StoreChangelogReader(restoreConsumer, userStateRestoreListener, logContext);
+        final Duration pollTime = 
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
+        final StoreChangelogReader changelogReader = new 
StoreChangelogReader(restoreConsumer, pollTime, userStateRestoreListener, 
logContext);
 
         Producer<byte[], byte[]> threadProducer = null;
         final boolean eosEnabled = 
StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
@@ -710,10 +712,10 @@ public StreamThread(final Time time,
         this.originalReset = originalReset;
         this.versionProbingFlag = versionProbingFlag;
 
-        this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
+        this.pollTime = 
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
         this.commitTimeMs = 
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
 
-        updateThreadMetadata(Collections.<TaskId, StreamTask>emptyMap(), 
Collections.<TaskId, StandbyTask>emptyMap());
+        updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap());
     }
 
     /**
@@ -801,14 +803,14 @@ long runOnce(final long recordsProcessedBeforeCommit) {
         if (state == State.PARTITIONS_ASSIGNED) {
             // try to fetch some records with zero poll millis
             // to unblock the restoration as soon as possible
-            records = pollRequests(0L);
+            records = pollRequests(Duration.ZERO);
 
             if (taskManager.updateNewAndRestoringTasks()) {
                 setState(State.RUNNING);
             }
         } else {
             // try to fetch some records if necessary
-            records = pollRequests(pollTimeMs);
+            records = pollRequests(pollTime);
 
             // if state changed after the poll call,
             // try to initialize the assigned tasks again
@@ -843,15 +845,15 @@ long runOnce(final long recordsProcessedBeforeCommit) {
     /**
      * Get the next batch of records by polling.
      *
-     * @param pollTimeMs poll time millis parameter for the consumer poll
+     * @param pollTime how long to block in Consumer#poll
      * @return Next batch of records or null if no records available.
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    private ConsumerRecords<byte[], byte[]> pollRequests(final long 
pollTimeMs) {
+    private ConsumerRecords<byte[], byte[]> pollRequests(final Duration 
pollTime) {
         ConsumerRecords<byte[], byte[]> records = null;
 
         try {
-            records = consumer.poll(pollTimeMs);
+            records = consumer.poll(pollTime);
         } catch (final InvalidOffsetException e) {
             resetInvalidOffsets(e);
         }
@@ -1078,7 +1080,11 @@ private void maybeUpdateStandbyTasks(final long now) {
             }
 
             try {
-                final ConsumerRecords<byte[], byte[]> records = 
restoreConsumer.poll(0);
+                // poll(0): Since this is during the normal processing, not 
during restoration.
+                // We can afford to have slower restore (because we don't wait 
inside poll for results).
+                // Instead, we want to proceed to the next iteration to call 
the main consumer#poll()
+                // as soon as possible so as to not be kicked out of the group.
+                final ConsumerRecords<byte[], byte[]> records = 
restoreConsumer.poll(Duration.ZERO);
 
                 if (!records.isEmpty()) {
                     for (final TopicPartition partition : 
records.partitions()) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 297b2434c06..8635b94544e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
@@ -25,6 +27,7 @@
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -42,7 +45,6 @@
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -234,9 +236,8 @@ public boolean conditionMet() {
         assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
     }
 
-    @Ignore // this test cannot pass as long as GST blocks KS.start()
     @Test
-    public void testGlobalThreadCloseWithoutConnectingToBroker() {
+    public void 
globalThreadShouldTimeoutWhenBrokerConnectionCannotBeEstablished() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:1");
@@ -244,16 +245,26 @@ public void 
testGlobalThreadCloseWithoutConnectingToBroker() {
         props.setProperty(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
 
+        // We want to configure request.timeout.ms, but it must be larger than 
a
+        // few other configs.
+        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 200);
+        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
+        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 201);
+        props.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, 202);
+
         final StreamsBuilder builder = new StreamsBuilder();
         // make sure we have the global state thread running too
         builder.globalTable("anyTopic");
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        streams.start();
-        streams.close();
+        try {
+            streams.start();
+            fail("expected start() to time out and throw an exception.");
+        } catch (final StreamsException expected) {
+            // This is a result of not being able to connect to the broker.
+        }
         // There's nothing to assert... We're testing that this operation 
actually completes.
     }
 
-    @Ignore // this test cannot pass until we implement KIP-266
     @Test
     public void testLocalThreadCloseWithoutConnectingToBroker() {
         final Properties props = new Properties();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index fe897c7ac30..86cb331956c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -44,6 +44,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -464,7 +465,7 @@ public boolean conditionMet() {
         while (totalPollTimeMs < waitTime &&
             continueConsuming(consumerRecords.size(), maxMessages)) {
             totalPollTimeMs += pollIntervalMs;
-            final ConsumerRecords<K, V> records = 
consumer.poll(pollIntervalMs);
+            final ConsumerRecords<K, V> records = 
consumer.poll(Duration.ofMillis(pollIntervalMs));
 
             for (final ConsumerRecord<K, V> record : records) {
                 consumerRecords.add(record);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java 
b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 8187467aaa6..7179293200e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -54,6 +54,7 @@
 import org.apache.kafka.streams.state.WindowStore;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
@@ -334,7 +335,7 @@ private void consumeAndProduce(final String topic) {
             consumer.seekToBeginning(partitions);
 
             while (true) {
-                final ConsumerRecords<Integer, byte[]> records = 
consumer.poll(POLL_MS);
+                final ConsumerRecords<Integer, byte[]> records = 
consumer.poll(Duration.ofMillis(POLL_MS));
                 if (records.isEmpty()) {
                     if (processedRecords == numRecords) {
                         break;
@@ -372,7 +373,7 @@ private void consume(final String topic) {
             consumer.seekToBeginning(partitions);
 
             while (true) {
-                final ConsumerRecords<Integer, byte[]> records = 
consumer.poll(POLL_MS);
+                final ConsumerRecords<Integer, byte[]> records = 
consumer.poll(Duration.ofMillis(POLL_MS));
                 if (records.isEmpty()) {
                     if (processedRecords == numRecords) {
                         break;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 347e9c4fd75..4ed44be47f2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -42,6 +42,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -233,7 +234,7 @@ private AbstractTask createTask(final Consumer consumer,
                                 storeTopicPartitions,
                                 ProcessorTopology.withLocalStores(new 
ArrayList<>(stateStoresToChangelogTopics.keySet()), 
storeNamesToChangelogTopics),
                                 consumer,
-                                new StoreChangelogReader(consumer, new 
MockStateRestoreListener(), new LogContext("stream-task-test ")),
+                                new StoreChangelogReader(consumer, 
Duration.ZERO, new MockStateRestoreListener(), new LogContext("stream-task-test 
")),
                                 false,
                                 stateDirectory,
                                 config) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 93d6a0d931b..05d0e3d04ee 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -50,6 +50,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -122,7 +123,12 @@ private StreamsConfig createConfig(final File baseDir) 
throws IOException {
 
     private final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final MockRestoreConsumer restoreStateConsumer = new 
MockRestoreConsumer();
-    private final StoreChangelogReader changelogReader = new 
StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new 
LogContext("standby-task-test "));
+    private final StoreChangelogReader changelogReader = new 
StoreChangelogReader(
+        restoreStateConsumer,
+        Duration.ZERO,
+        stateRestoreListener,
+        new LogContext("standby-task-test ")
+    );
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
@@ -188,7 +194,7 @@ public void testUpdate() throws IOException {
         }
 
         restoreStateConsumer.seekToBeginning(partition);
-        task.update(partition2, 
restoreStateConsumer.poll(100).records(partition2));
+        task.update(partition2, 
restoreStateConsumer.poll(Duration.ofMillis(100)).records(partition2));
 
         StandbyContextImpl context = (StandbyContextImpl) task.context();
         MockStateStore store1 = (MockStateStore) 
context.getStateMgr().getStore(storeName1);
@@ -245,7 +251,7 @@ public void testUpdateKTable() throws IOException {
         }
 
         // The commit offset is at 0L. Records should not be processed
-        List<ConsumerRecord<byte[], byte[]>> remaining = 
task.update(globalTopicPartition, 
restoreStateConsumer.poll(100).records(globalTopicPartition));
+        List<ConsumerRecord<byte[], byte[]>> remaining = 
task.update(globalTopicPartition, 
restoreStateConsumer.poll(Duration.ofMillis(100)).records(globalTopicPartition));
         assertEquals(5, remaining.size());
 
         committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), 
globalTopicPartition.partition()), new OffsetAndMetadata(10L));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
index 725211dd268..140f7056199 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
@@ -27,6 +27,7 @@
 import org.junit.Test;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -52,7 +53,7 @@ public void setUp() {
         partitionOffsets.put(topicOne, 20L);
         partitionOffsets.put(topicTwo, 30L);
         stateMaintainer = new StateMaintainerStub(partitionOffsets);
-        stateConsumer = new GlobalStreamThread.StateConsumer(logContext, 
consumer, stateMaintainer, time, 10L, FLUSH_INTERVAL);
+        stateConsumer = new GlobalStreamThread.StateConsumer(logContext, 
consumer, stateMaintainer, time, Duration.ofMillis(10L), FLUSH_INTERVAL);
     }
 
     @Test
@@ -109,7 +110,7 @@ public void 
shouldNotFlushOffsetsWhenFlushIntervalHasNotLapsed() {
 
     @Test
     public void shouldNotFlushWhenFlushIntervalIsZero() {
-        stateConsumer = new GlobalStreamThread.StateConsumer(logContext, 
consumer, stateMaintainer, time, 10L, -1);
+        stateConsumer = new GlobalStreamThread.StateConsumer(logContext, 
consumer, stateMaintainer, time, Duration.ofMillis(10L), -1);
         stateConsumer.initialize();
         time.sleep(100);
         stateConsumer.pollAndUpdate();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index aabe7ff6313..90abf32477f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -39,6 +39,7 @@
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -71,7 +72,7 @@
     private final StateRestoreListener stateRestoreListener = new 
MockStateRestoreListener();
     private final TopicPartition topicPartition = new TopicPartition("topic", 
0);
     private final LogContext logContext = new LogContext("test-reader ");
-    private final StoreChangelogReader changelogReader = new 
StoreChangelogReader(consumer, stateRestoreListener, logContext);
+    private final StoreChangelogReader changelogReader = new 
StoreChangelogReader(consumer, Duration.ZERO, stateRestoreListener, logContext);
 
     @Before
     public void setUp() {
@@ -89,7 +90,7 @@ public void shouldRequestTopicsAndHandleTimeoutException() {
             }
         };
 
-        final StoreChangelogReader changelogReader = new 
StoreChangelogReader(consumer, stateRestoreListener, logContext);
+        final StoreChangelogReader changelogReader = new 
StoreChangelogReader(consumer, Duration.ZERO, stateRestoreListener, logContext);
         changelogReader.register(new StateRestorer(topicPartition, 
restoreListener, null, Long.MAX_VALUE, true, "storeName"));
         changelogReader.restore(active);
         assertTrue(functionCalled.get());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 3a0fc4eb1cd..5537335b221 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -56,6 +56,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
@@ -116,7 +117,7 @@ public void close() {
     private final MockProducer<byte[], byte[]> producer = new 
MockProducer<>(false, bytesSerializer, bytesSerializer);
     private final MockConsumer<byte[], byte[]> restoreStateConsumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final StateRestoreListener stateRestoreListener = new 
MockStateRestoreListener();
-    private final StoreChangelogReader changelogReader = new 
StoreChangelogReader(restoreStateConsumer, stateRestoreListener, new 
LogContext("stream-task-test ")) {
+    private final StoreChangelogReader changelogReader = new 
StoreChangelogReader(restoreStateConsumer, Duration.ZERO, stateRestoreListener, 
new LogContext("stream-task-test ")) {
         @Override
         public Map<TopicPartition, Long> restoredOffsets() {
             return Collections.singletonMap(changelogPartition, offset);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index c24122abd13..66ea3c42779 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -49,6 +49,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -177,7 +178,7 @@ private StreamTask createStreamsTask(final StreamsConfig 
streamsConfig,
             Collections.singletonList(new TopicPartition(topicName, 
taskId.partition)),
             topology,
             clientSupplier.consumer,
-            new StoreChangelogReader(clientSupplier.restoreConsumer, new 
MockStateRestoreListener(), new LogContext("test-stream-task ")),
+            new StoreChangelogReader(clientSupplier.restoreConsumer, 
Duration.ZERO, new MockStateRestoreListener(), new LogContext("test-stream-task 
")),
             streamsConfig,
             new MockStreamsMetrics(metrics),
             stateDirectory,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index e897088beca..3c8446ca466 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -37,6 +37,7 @@
 import org.apache.kafka.streams.kstream.ValueMapper;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Locale;
 import java.util.Properties;
@@ -153,7 +154,7 @@ private static void loopUntilRecordReceived(final String 
kafka, final boolean eo
             consumer.subscribe(Collections.singletonList(SINK_TOPIC));
 
             while (true) {
-                final ConsumerRecords<String, String> records = 
consumer.poll(100);
+                final ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(100));
                 for (final ConsumerRecord<String, String> record : records) {
                     if (record.key().equals("key") && 
record.value().equals("1")) {
                         return;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index 752cdd696ed..0b18864d4ab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -16,16 +16,18 @@
  */
 package org.apache.kafka.streams.tests;
 
-import kafka.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.SerializationException;
@@ -40,17 +42,18 @@
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
-import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 public class EosTestDriver extends SmokeTestUtil {
 
@@ -59,22 +62,19 @@
 
     private static boolean isRunning = true;
 
-    static int numRecordsProduced = 0;
+    private static int numRecordsProduced = 0;
 
-    static synchronized void updateNumRecordsProduces(final int delta) {
+    private static synchronized void updateNumRecordsProduces(final int delta) 
{
         numRecordsProduced += delta;
     }
 
     static void generate(final String kafka) {
 
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                System.out.println("Terminating");
-                System.out.flush();
-                isRunning = false;
-            }
-        });
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            System.out.println("Terminating");
+            System.out.flush();
+            isRunning = false;
+        }));
 
         final Properties producerProps = new Properties();
         producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "EosTest");
@@ -93,19 +93,16 @@ public void run() {
 
             final ProducerRecord<String, Integer> record = new 
ProducerRecord<>("data", key, value);
 
-            producer.send(record, new Callback() {
-                @Override
-                public void onCompletion(final RecordMetadata metadata, final 
Exception exception) {
-                    if (exception != null) {
-                        exception.printStackTrace(System.err);
-                        System.err.flush();
-                        if (exception instanceof TimeoutException) {
-                            try {
-                                // message == 
org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for 
data-0: 30004 ms has passed since last attempt plus backoff time
-                                final int expired = 
Integer.parseInt(exception.getMessage().split(" ")[2]);
-                                updateNumRecordsProduces(-expired);
-                            } catch (Exception ignore) { }
-                        }
+            producer.send(record, (metadata, exception) -> {
+                if (exception != null) {
+                    exception.printStackTrace(System.err);
+                    System.err.flush();
+                    if (exception instanceof TimeoutException) {
+                        try {
+                            // message == 
org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for 
data-0: 30004 ms has passed since last attempt plus backoff time
+                            final int expired = 
Integer.parseInt(exception.getMessage().split(" ")[2]);
+                            updateNumRecordsProduces(-expired);
+                        } catch (final Exception ignore) { }
                     }
                 }
             });
@@ -141,10 +138,6 @@ public void onCompletion(final RecordMetadata metadata, 
final Exception exceptio
     }
 
     public static void verify(final String kafka, final boolean 
withRepartitioning) {
-        ensureStreamsApplicationDown(kafka);
-
-        final Map<TopicPartition, Long> committedOffsets = 
getCommittedOffsets(kafka, withRepartitioning);
-
         final Properties props = new Properties();
         props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
@@ -152,6 +145,13 @@ public static void verify(final String kafka, final 
boolean withRepartitioning)
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
 
+        final Map<TopicPartition, Long> committedOffsets;
+        try (final AdminClient adminClient = KafkaAdminClient.create(props)) {
+            ensureStreamsApplicationDown(adminClient);
+
+            committedOffsets = getCommittedOffsets(adminClient, 
withRepartitioning);
+        }
+
         final String[] allInputTopics;
         final String[] allOutputTopics;
         if (withRepartitioning) {
@@ -218,54 +218,42 @@ public static void verify(final String kafka, final 
boolean withRepartitioning)
         System.out.flush();
     }
 
-    private static void ensureStreamsApplicationDown(final String kafka) {
-        AdminClient adminClient = null;
-        try {
-            adminClient = AdminClient.createSimplePlaintext(kafka);
+    private static void ensureStreamsApplicationDown(final AdminClient 
adminClient) {
 
-            final long maxWaitTime = System.currentTimeMillis() + 
MAX_IDLE_TIME_MS;
-            while (!adminClient.describeConsumerGroup(EosTestClient.APP_ID, 
10000).consumers().get().isEmpty()) {
-                if (System.currentTimeMillis() > maxWaitTime) {
-                    throw new RuntimeException("Streams application not down 
after " + (MAX_IDLE_TIME_MS / 1000) + " seconds.");
-                }
-                sleep(1000);
-            }
-        } finally {
-            if (adminClient != null) {
-                adminClient.close();
+        final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
+        ConsumerGroupDescription description;
+        do {
+            description = getConsumerGroupDescription(adminClient);
+
+            if (System.currentTimeMillis() > maxWaitTime && 
!description.members().isEmpty()) {
+                throw new RuntimeException(
+                    "Streams application not down after " + (MAX_IDLE_TIME_MS 
/ 1000) + " seconds. " +
+                        "Group: " + description
+                );
             }
-        }
+            sleep(1000);
+        } while (!description.members().isEmpty());
     }
 
-    private static Map<TopicPartition, Long> getCommittedOffsets(final String 
kafka,
+
+    private static Map<TopicPartition, Long> getCommittedOffsets(final 
AdminClient adminClient,
                                                                  final boolean 
withRepartitioning) {
-        final Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, EosTestClient.APP_ID);
-        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "OffsetsClient");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+        final Map<TopicPartition, OffsetAndMetadata> 
topicPartitionOffsetAndMetadataMap;
 
-        final Map<TopicPartition, Long> committedOffsets = new HashMap<>();
-        try (final KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(props)) {
-            final Set<String> topics = new HashSet<>();
-            topics.add("data");
-            if (withRepartitioning) {
-                topics.add("repartition");
-            }
-            consumer.subscribe(topics);
-            consumer.poll(0);
+        try {
+            final ListConsumerGroupOffsetsResult 
listConsumerGroupOffsetsResult = 
adminClient.listConsumerGroupOffsets(EosTestClient.APP_ID);
+            topicPartitionOffsetAndMetadataMap = 
listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(10, 
TimeUnit.SECONDS);
+        } catch (final InterruptedException | ExecutionException | 
java.util.concurrent.TimeoutException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        }
 
-            final Set<TopicPartition> partitions = new HashSet<>();
-            for (final String topic : topics) {
-                for (final PartitionInfo partition : 
consumer.partitionsFor(topic)) {
-                    partitions.add(new TopicPartition(partition.topic(), 
partition.partition()));
-                }
-            }
+        final Map<TopicPartition, Long> committedOffsets = new HashMap<>();
 
-            for (final TopicPartition tp : partitions) {
-                final long offset = consumer.position(tp);
-                committedOffsets.put(tp, offset);
+        for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
topicPartitionOffsetAndMetadataMap.entrySet()) {
+            final String topic = entry.getKey().topic();
+            if (topic.equals("data") || withRepartitioning && 
topic.equals("repartition")) {
+                committedOffsets.put(entry.getKey(), 
entry.getValue().offset());
             }
         }
 
@@ -284,7 +272,7 @@ private static void ensureStreamsApplicationDown(final 
String kafka) {
         long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
         boolean allRecordsReceived = false;
         while (!allRecordsReceived && System.currentTimeMillis() < 
maxWaitTime) {
-            final ConsumerRecords<byte[], byte[]> receivedRecords = 
consumer.poll(100);
+            final ConsumerRecords<byte[], byte[]> receivedRecords = 
consumer.poll(Duration.ofMillis(100));
 
             for (final ConsumerRecord<byte[], byte[]> record : 
receivedRecords) {
                 maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
@@ -327,19 +315,12 @@ private static void addRecord(final 
ConsumerRecord<byte[], byte[]> record,
         final TopicPartition partition = new TopicPartition(topic, 
record.partition());
 
         if (verifyTopic(topic, withRepartitioning)) {
-            Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
topicRecordsPerPartition
-                = recordPerTopicPerPartition.get(topic);
+            final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
topicRecordsPerPartition =
+                recordPerTopicPerPartition.computeIfAbsent(topic, k -> new 
HashMap<>());
 
-            if (topicRecordsPerPartition == null) {
-                topicRecordsPerPartition = new HashMap<>();
-                recordPerTopicPerPartition.put(topic, 
topicRecordsPerPartition);
-            }
+            final List<ConsumerRecord<byte[], byte[]>> records =
+                topicRecordsPerPartition.computeIfAbsent(partition, k -> new 
ArrayList<>());
 
-            List<ConsumerRecord<byte[], byte[]>> records = 
topicRecordsPerPartition.get(partition);
-            if (records == null) {
-                records = new ArrayList<>();
-                topicRecordsPerPartition.put(partition, records);
-            }
             records.add(record);
         } else {
             throw new RuntimeException("FAIL: received data from unexpected 
topic: " + record);
@@ -397,7 +378,7 @@ private static void verifyMin(final Map<TopicPartition, 
List<ConsumerRecord<byte
 
             if (partitionInput.size() != partitionMin.size()) {
                 throw new RuntimeException("Result verification failed: 
expected " + partitionInput.size() + " records for "
-                    +  partitionRecords.getKey() + " but received " + 
partitionMin.size());
+                    + partitionRecords.getKey() + " but received " + 
partitionMin.size());
             }
 
             final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = 
partitionInput.iterator();
@@ -439,7 +420,7 @@ private static void verifySum(final Map<TopicPartition, 
List<ConsumerRecord<byte
 
             if (partitionInput.size() != partitionSum.size()) {
                 throw new RuntimeException("Result verification failed: 
expected " + partitionInput.size() + " records for "
-                    +  partitionRecords.getKey() + " but received " + 
partitionSum.size());
+                    + partitionRecords.getKey() + " but received " + 
partitionSum.size());
             }
 
             final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = 
partitionInput.iterator();
@@ -480,7 +461,7 @@ private static void verifyMax(final Map<TopicPartition, 
List<ConsumerRecord<byte
 
             if (partitionInput.size() != partitionMax.size()) {
                 throw new RuntimeException("Result verification failed: 
expected " + partitionInput.size() + " records for "
-                    +  partitionRecords.getKey() + " but received " + 
partitionMax.size());
+                    + partitionRecords.getKey() + " but received " + 
partitionMax.size());
             }
 
             final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = 
partitionInput.iterator();
@@ -501,7 +482,7 @@ private static void verifyMax(final Map<TopicPartition, 
List<ConsumerRecord<byte
                 max = Math.max(max, value);
                 currentMinPerKey.put(key, max);
 
-                if (!receivedKey.equals(key) || receivedValue != 
max.intValue()) {
+                if (!receivedKey.equals(key) || receivedValue != max) {
                     throw new RuntimeException("Result verification failed for 
" + receivedRecord + " expected <" + key + "," + max + "> but was <" + 
receivedKey + "," + receivedValue + ">");
                 }
             }
@@ -521,7 +502,7 @@ private static void verifyCnt(final Map<TopicPartition, 
List<ConsumerRecord<byte
 
             if (partitionInput.size() != partitionCnt.size()) {
                 throw new RuntimeException("Result verification failed: 
expected " + partitionInput.size() + " records for "
-                    +  partitionRecords.getKey() + " but received " + 
partitionCnt.size());
+                    + partitionRecords.getKey() + " but received " + 
partitionCnt.size());
             }
 
             final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = 
partitionInput.iterator();
@@ -539,7 +520,7 @@ private static void verifyCnt(final Map<TopicPartition, 
List<ConsumerRecord<byte
                 }
                 currentSumPerKey.put(key, ++cnt);
 
-                if (!receivedKey.equals(key) || receivedValue != 
cnt.longValue()) {
+                if (!receivedKey.equals(key) || receivedValue != cnt) {
                     throw new RuntimeException("Result verification failed for 
" + receivedRecord + " expected <" + key + "," + cnt + "> but was <" + 
receivedKey + "," + receivedValue + ">");
                 }
             }
@@ -574,14 +555,11 @@ private static void verifyAllTransactionFinished(final 
KafkaConsumer<byte[], byt
             for (final TopicPartition tp : partitions) {
                 final ProducerRecord<String, String> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), "key", "value");
 
-                producer.send(record, new Callback() {
-                    @Override
-                    public void onCompletion(final RecordMetadata metadata, 
final Exception exception) {
-                        if (exception != null) {
-                            exception.printStackTrace(System.err);
-                            System.err.flush();
-                            Exit.exit(1);
-                        }
+                producer.send(record, (metadata, exception) -> {
+                    if (exception != null) {
+                        exception.printStackTrace(System.err);
+                        System.err.flush();
+                        Exit.exit(1);
                     }
                 });
             }
@@ -591,7 +569,7 @@ public void onCompletion(final RecordMetadata metadata, 
final Exception exceptio
 
         long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
         while (!partitions.isEmpty() && System.currentTimeMillis() < 
maxWaitTime) {
-            final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
+            final ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(100));
             if (records.isEmpty()) {
                 System.out.println("No data received.");
                 for (final TopicPartition tp : partitions) {
@@ -638,4 +616,18 @@ public void onCompletion(final RecordMetadata metadata, 
final Exception exceptio
         return partitions;
     }
 
+
+    private static ConsumerGroupDescription getConsumerGroupDescription(final 
AdminClient adminClient) {
+        final ConsumerGroupDescription description;
+        try {
+            description = 
adminClient.describeConsumerGroups(Collections.singleton(EosTestClient.APP_ID))
+                .describedGroups()
+                .get(EosTestClient.APP_ID)
+                .get(10, TimeUnit.SECONDS);
+        } catch (final InterruptedException | ExecutionException | 
java.util.concurrent.TimeoutException e) {
+            e.printStackTrace();
+            throw new RuntimeException("Unexpected Exception getting group 
description", e);
+        }
+        return description;
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 50330a08e61..7533fdd0858 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -36,6 +36,7 @@
 import org.apache.kafka.test.TestUtils;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -289,7 +290,7 @@ public static void verify(String kafka, Map<String, 
Set<Integer>> allData, int m
         int retry = 0;
         final long start = System.currentTimeMillis();
         while (System.currentTimeMillis() - start < 
TimeUnit.MINUTES.toMillis(6)) {
-            ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+            ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(500));
             if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
                 if (verifyMin(min, allData, false)
                     && verifyMax(max, allData, false)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java 
b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
index ad19f32fd1d..33cf1fa34bc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
@@ -32,6 +32,7 @@
 
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
@@ -74,7 +75,7 @@ public void 
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
 
         streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(500));
         assertEquals(3, records.count());
     }
 
@@ -90,7 +91,7 @@ public void 
testResetToSpecificOffsetWhenBeforeBeginningOffset() {
 
         streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(500));
         assertEquals(2, records.count());
     }
 
@@ -106,7 +107,7 @@ public void testResetToSpecificOffsetWhenAfterEndOffset() {
 
         streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 4L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(500));
         assertEquals(2, records.count());
     }
 
@@ -122,7 +123,7 @@ public void 
testShiftOffsetByWhenBetweenBeginningAndEndOffset() {
 
         streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 3L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(500));
         assertEquals(2, records.count());
     }
 
@@ -138,7 +139,7 @@ public void testShiftOffsetByWhenBeforeBeginningOffset() {
 
         streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, -3L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(500));
         assertEquals(5, records.count());
     }
 
@@ -154,7 +155,7 @@ public void testShiftOffsetByWhenAfterEndOffset() {
 
         streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 5L);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(500));
         assertEquals(2, records.count());
     }
 
@@ -172,7 +173,7 @@ public void 
testResetUsingPlanWhenBetweenBeginningAndEndOffset() {
         topicPartitionsAndOffset.put(topicPartition, 3L);
         streamsResetter.resetOffsetsFromResetPlan(consumer, 
inputTopicPartitions, topicPartitionsAndOffset);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(500));
         assertEquals(2, records.count());
     }
 
@@ -190,7 +191,7 @@ public void testResetUsingPlanWhenBeforeBeginningOffset() {
         topicPartitionsAndOffset.put(topicPartition, 1L);
         streamsResetter.resetOffsetsFromResetPlan(consumer, 
inputTopicPartitions, topicPartitionsAndOffset);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(500));
         assertEquals(2, records.count());
     }
 
@@ -208,7 +209,7 @@ public void testResetUsingPlanWhenAfterEndOffset() {
         topicPartitionsAndOffset.put(topicPartition, 5L);
         streamsResetter.resetOffsetsFromResetPlan(consumer, 
inputTopicPartitions, topicPartitionsAndOffset);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(500));
         assertEquals(2, records.count());
     }
 
@@ -226,7 +227,7 @@ public void shouldSeekToEndOffset() {
         intermediateTopicPartitions.add(topicPartition);
         streamsResetter.maybeSeekToEnd("g1", consumer, 
intermediateTopicPartitions);
 
-        final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
+        final ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(500));
         assertEquals(2, records.count());
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java 
b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
index 3070e36482f..00788fd2f98 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
@@ -25,6 +25,7 @@
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -85,9 +86,8 @@ public synchronized void assign(Collection<TopicPartition> 
partitions) {
         super.assign(partitions);
     }
 
-    @Deprecated
     @Override
-    public ConsumerRecords<byte[], byte[]> poll(long timeout) {
+    public ConsumerRecords<byte[], byte[]> poll(final Duration timeout) {
         // add buffered records to MockConsumer
         for (ConsumerRecord<byte[], byte[]> record : recordBuffer) {
             super.addRecord(record);
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 773cbb4c323..7f752652da4 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -66,6 +66,7 @@
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -327,6 +328,7 @@ public void onRestoreEnd(final TopicPartition 
topicPartition, final String store
                 consumer,
                 new StoreChangelogReader(
                     
createRestoreConsumer(processorTopology.storeToChangelogTopic()),
+                    Duration.ZERO,
                     stateRestoreListener,
                     new LogContext("topology-test-driver ")),
                 streamsConfig,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StreamThread.shutdown() need to interrupt the stream threads to break the loop
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-5697
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5697
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: John Roesler
>            Priority: Major
>              Labels: newbie
>             Fix For: 2.1.0
>
>
> In {{StreamThread.shutdown()}} we currently do nothing but set the state, 
> hoping the stream thread may eventually check it and shutdown itself. 
> However, under certain scenarios the thread may get blocked within a single 
> loop and hence will never check on this state enum. For example, it's 
> {{consumer.poll}} call trigger {{ensureCoordinatorReady()}} which will block 
> until the coordinator can be found. If the coordinator broker is never up and 
> running then the Stream instance will be blocked forever.
> A simple way to produce this issue is to start the work count demo without 
> starting the ZK / Kafka broker, and then it will get stuck in a single loop 
> and even `ctrl-C` will not stop it since its set state will never be read by 
> the thread:
> {code:java}
> [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to