Repository: kafka
Updated Branches:
  refs/heads/0.11.0 33640106b -> 472c8974f


HOTFIX: poll with zero millis during restoration

Mirror of #4096 for 0.11.01.

During the restoration phase, when thread state is still in PARTITION_ASSIGNED 
not RUNNING yet, call poll() on the normal consumer with 0 millisecond timeout, 
to unblock the restoration of other tasks as soon as possible.

Author: Guozhang Wang <[email protected]>

Reviewers: Bill Bejeck <[email protected]>, Damian Guy <[email protected]>, 
Matthias J. Sax <[email protected]>, Xavier Léauté <[email protected]>

Closes #4085 from guozhangwang/KHotfix-0110-restore-only


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/472c8974
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/472c8974
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/472c8974

Branch: refs/heads/0.11.0
Commit: 472c8974f2c67924350beaf2812593c3ce61e59a
Parents: 3364010
Author: Guozhang Wang <[email protected]>
Authored: Mon Oct 23 22:52:35 2017 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Mon Oct 23 22:52:35 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/AssignedTasks.java      |  5 ++
 .../internals/InternalTopicManager.java         | 15 +++---
 .../internals/RecordCollectorImpl.java          |  3 ++
 .../processor/internals/RecordQueue.java        |  3 ++
 .../internals/StoreChangelogReader.java         |  2 +-
 .../processor/internals/StreamThread.java       | 52 ++++++++++++++------
 .../integration/ResetIntegrationTest.java       | 43 ++++------------
 .../integration/utils/IntegrationTestUtils.java |  1 -
 8 files changed, 67 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/472c8974/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index cb3404f..60de4a3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -113,6 +113,11 @@ class AssignedTasks<T extends AbstractTask> {
                 transitionToRunning(task);
                 resume.addAll(task.partitions());
                 it.remove();
+                log.trace("{} {} {} completed restoration as all its changelog 
partitions {} have been applied to restore state",
+                        logPrefix,
+                        taskTypeName,
+                        task.id(),
+                        task.changelogPartitions());
             } else {
                 if (log.isTraceEnabled()) {
                     final HashSet<TopicPartition> outstandingPartitions = new 
HashSet<>(task.changelogPartitions());

http://git-wip-us.apache.org/repos/asf/kafka/blob/472c8974/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 9eabf2e..54e98b0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -17,10 +17,10 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.requests.MetadataResponse;
-import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -37,8 +37,10 @@ public class InternalTopicManager {
     public static final String RETENTION_MS = "retention.ms";
     private static final int MAX_TOPIC_READY_TRY = 5;
 
-    private final Logger log;
+    private static final Logger log = 
LoggerFactory.getLogger(InternalTopicManager.class);
+
     private final Time time;
+    private final String logPrefix;
     private final long windowChangeLogAdditionalRetention;
 
     private final int replicationFactor;
@@ -53,8 +55,7 @@ public class InternalTopicManager {
         this.windowChangeLogAdditionalRetention = 
windowChangeLogAdditionalRetention;
         this.time = time;
 
-        LogContext logContext = new LogContext(String.format("stream-thread 
[%s] ", Thread.currentThread().getName()));
-        this.log = logContext.logger(getClass());
+        this.logPrefix = String.format("stream-thread [%s] ", 
Thread.currentThread().getName());
     }
 
     /**
@@ -81,7 +82,7 @@ public class InternalTopicManager {
                 }
                 return;
             } catch (StreamsException ex) {
-                log.warn("Could not create internal topics: " + 
ex.getMessage() + " Retry #" + i);
+                log.warn(logPrefix + "Could not create internal topics: " + 
ex.getMessage() + " Retry #" + i);
             }
             // backoff
             time.sleep(100L);
@@ -101,7 +102,7 @@ public class InternalTopicManager {
 
                 return existingTopicPartitions;
             } catch (StreamsException ex) {
-                log.warn("Could not get number of partitions: " + 
ex.getMessage() + " Retry #" + i);
+                log.warn(logPrefix + "Could not get number of partitions: " + 
ex.getMessage() + " Retry #" + i);
             }
             // backoff
             time.sleep(100L);
@@ -113,7 +114,7 @@ public class InternalTopicManager {
         try {
             streamsKafkaClient.close();
         } catch (IOException e) {
-            log.warn("Could not close StreamsKafkaClient.");
+            log.warn(logPrefix + "Could not close StreamsKafkaClient.");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/472c8974/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index d49cf58..02b07f6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -116,6 +116,9 @@ public class RecordCollectorImpl implements RecordCollector 
{
                         }
                     }
                 });
+
+                System.out.println("Send record " + serializedRecord);
+
                 return;
             } catch (final TimeoutException e) {
                 if (attempt == MAX_SEND_ATTEMPTS) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/472c8974/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 0902614..61d8205 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -82,6 +82,9 @@ public class RecordQueue {
     public int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> 
rawRecords) {
         for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
             ConsumerRecord<Object, Object> record = 
recordDeserializer.deserialize(rawRecord);
+
+            System.out.println("Got record " + record);
+
             long timestamp = timestampExtractor.extract(record, 
timeTracker.get());
             log.trace("Source node {} extracted timestamp {} for record {}", 
source.name(), timestamp, record);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/472c8974/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 003ded8..34dcb75 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -186,7 +186,7 @@ public class StoreChangelogReader implements 
ChangelogReader {
     private Collection<TopicPartition> completed() {
         final Set<TopicPartition> completed = new 
HashSet<>(stateRestorers.keySet());
         completed.removeAll(needsRestoring.keySet());
-        log.debug("{} completed partitions {}", logPrefix, completed);
+        log.trace("{} completed partitions {}", logPrefix, completed);
         return completed;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/472c8974/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index c0acae5..210b070 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -488,24 +488,22 @@ public class StreamThread extends Thread {
     long runOnce(long recordsProcessedBeforeCommit) {
         timerStartedMs = time.milliseconds();
 
-        // try to fetch some records if necessary
-        final ConsumerRecords<byte[], byte[]> records = pollRequests();
+        ConsumerRecords<byte[], byte[]> records;
 
         if (state == State.PARTITIONS_ASSIGNED) {
-            active.initializeNewTasks();
-            standby.initializeNewTasks();
+            // try to fetch some records with zero poll millis
+            // to unblock the restoration as soon as possible
+            records = pollRequests(0L);
 
-            final Collection<TopicPartition> restored = 
storeChangelogReader.restore();
-            final Set<TopicPartition> resumed = 
active.updateRestored(restored);
-
-            if (!resumed.isEmpty()) {
-                log.trace("{} resuming partitions {}", logPrefix, resumed);
-                consumer.resume(resumed);
-            }
+            tryTransitToRunning();
+        } else {
+            // try to fetch some records if necessary
+            records = pollRequests(pollTimeMs);
 
-            if (active.allTasksRunning()) {
-                assignStandbyPartitions();
-                setState(State.RUNNING);
+            // if state changed after the poll call,
+            // try to initialize the assigned tasks again
+            if (state == State.PARTITIONS_ASSIGNED) {
+                tryTransitToRunning();
             }
         }
 
@@ -528,10 +526,34 @@ public class StreamThread extends Thread {
     }
 
     /**
+     * Retry to restore the assigned records and transit to RUNNING state if 
all restoration is done
+     */
+    private void tryTransitToRunning() {
+        active.initializeNewTasks();
+        standby.initializeNewTasks();
+
+        final Collection<TopicPartition> restored = 
storeChangelogReader.restore();
+        final Set<TopicPartition> resumed = active.updateRestored(restored);
+
+        if (!resumed.isEmpty()) {
+            log.trace("{} resuming partitions {}", logPrefix, resumed);
+            consumer.resume(resumed);
+        }
+
+        if (active.allTasksRunning()) {
+            assignStandbyPartitions();
+            setState(State.RUNNING);
+        }
+    }
+
+    /**
      * Get the next batch of records by polling.
+     *
+     * @param pollTimeMs poll time millis parameter for the consumer poll
+     *
      * @return Next batch of records or null if no records available.
      */
-    private ConsumerRecords<byte[], byte[]> pollRequests() {
+    private ConsumerRecords<byte[], byte[]> pollRequests(final long 
pollTimeMs) {
         ConsumerRecords<byte[], byte[]> records = null;
 
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/472c8974/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index c8ba2bb..6e7b22f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -20,15 +20,12 @@ import kafka.admin.AdminClient;
 import kafka.server.KafkaConfig$;
 import kafka.tools.StreamsResetter;
 import kafka.utils.MockTime;
-import kafka.utils.ZkUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -50,10 +47,8 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
-import java.util.Set;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -106,6 +101,11 @@ public class ResetIntegrationTest {
         ++testNo;
         mockTime = CLUSTER.time;
 
+        // we align time to seconds to get clean window boundaries and thus 
ensure the same result for each run
+        // otherwise, input records could fall into different windows for 
different runs depending on the initial mock time
+        final long alignedTime = (System.currentTimeMillis() / 1000 + 1) * 
1000;
+        mockTime.setCurrentTimeMs(alignedTime);
+
         if (adminClient == null) {
             adminClient = 
AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers());
         }
@@ -359,36 +359,13 @@ public class ResetIntegrationTest {
         Assert.assertEquals(0, exitCode);
     }
 
-    private void assertInternalTopicsGotDeleted(final String 
intermediateUserTopic) {
-        final Set<String> expectedRemainingTopicsAfterCleanup = new 
HashSet<>();
-        expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC);
+    private void assertInternalTopicsGotDeleted(final String 
intermediateUserTopic) throws Exception {
+        // do not use list topics request, but read from the embedded 
cluster's zookeeper path directly to confirm
         if (intermediateUserTopic != null) {
-            expectedRemainingTopicsAfterCleanup.add(intermediateUserTopic);
-        }
-        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC);
-        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2);
-        expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN);
-        expectedRemainingTopicsAfterCleanup.add("__consumer_offsets");
-
-        Set<String> allTopics;
-        ZkUtils zkUtils = null;
-        try {
-            zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
-                30000,
-                30000,
-                JaasUtils.isZkSecurityEnabled());
-
-            do {
-                Utils.sleep(100);
-                allTopics = new HashSet<>();
-                
allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
-            } while (allTopics.size() != 
expectedRemainingTopicsAfterCleanup.size());
-        } finally {
-            if (zkUtils != null) {
-                zkUtils.close();
-            }
+            CLUSTER.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, 
OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, TestUtils.GROUP_METADATA_TOPIC_NAME, 
intermediateUserTopic);
+        } else {
+            CLUSTER.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, 
OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, TestUtils.GROUP_METADATA_TOPIC_NAME);
         }
-        assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup));
     }
 
     private class WaitUntilConsumerGroupGotClosed implements TestCondition {

http://git-wip-us.apache.org/repos/asf/kafka/blob/472c8974/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 6f387c0..00dd542 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -313,7 +313,6 @@ public class IntegrationTestUtils {
             continueConsuming(consumedValues.size(), maxMessages)) {
             totalPollTimeMs += pollIntervalMs;
             final ConsumerRecords<K, V> records = 
consumer.poll(pollIntervalMs);
-
             for (final ConsumerRecord<K, V> record : records) {
                 consumedValues.add(new KeyValue<>(record.key(), 
record.value()));
             }

Reply via email to