Repository: kafka Updated Branches: refs/heads/0.11.0 33640106b -> 472c8974f
HOTFIX: poll with zero millis during restoration Mirror of #4096 for 0.11.01. During the restoration phase, when thread state is still in PARTITION_ASSIGNED not RUNNING yet, call poll() on the normal consumer with 0 millisecond timeout, to unblock the restoration of other tasks as soon as possible. Author: Guozhang Wang <[email protected]> Reviewers: Bill Bejeck <[email protected]>, Damian Guy <[email protected]>, Matthias J. Sax <[email protected]>, Xavier Léauté <[email protected]> Closes #4085 from guozhangwang/KHotfix-0110-restore-only Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/472c8974 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/472c8974 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/472c8974 Branch: refs/heads/0.11.0 Commit: 472c8974f2c67924350beaf2812593c3ce61e59a Parents: 3364010 Author: Guozhang Wang <[email protected]> Authored: Mon Oct 23 22:52:35 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Mon Oct 23 22:52:35 2017 -0700 ---------------------------------------------------------------------- .../processor/internals/AssignedTasks.java | 5 ++ .../internals/InternalTopicManager.java | 15 +++--- .../internals/RecordCollectorImpl.java | 3 ++ .../processor/internals/RecordQueue.java | 3 ++ .../internals/StoreChangelogReader.java | 2 +- .../processor/internals/StreamThread.java | 52 ++++++++++++++------ .../integration/ResetIntegrationTest.java | 43 ++++------------ .../integration/utils/IntegrationTestUtils.java | 1 - 8 files changed, 67 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/472c8974/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index cb3404f..60de4a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -113,6 +113,11 @@ class AssignedTasks<T extends AbstractTask> { transitionToRunning(task); resume.addAll(task.partitions()); it.remove(); + log.trace("{} {} {} completed restoration as all its changelog partitions {} have been applied to restore state", + logPrefix, + taskTypeName, + task.id(), + task.changelogPartitions()); } else { if (log.isTraceEnabled()) { final HashSet<TopicPartition> outstandingPartitions = new HashSet<>(task.changelogPartitions()); http://git-wip-us.apache.org/repos/asf/kafka/blob/472c8974/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index 9eabf2e..54e98b0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -17,10 +17,10 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.requests.MetadataResponse; -import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.StreamsException; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collection; @@ -37,8 +37,10 @@ public class InternalTopicManager { public static final String RETENTION_MS = "retention.ms"; private static final int MAX_TOPIC_READY_TRY = 5; - private final Logger log; + private static final Logger log = LoggerFactory.getLogger(InternalTopicManager.class); + private final Time time; + private final String logPrefix; private final long windowChangeLogAdditionalRetention; private final int replicationFactor; @@ -53,8 +55,7 @@ public class InternalTopicManager { this.windowChangeLogAdditionalRetention = windowChangeLogAdditionalRetention; this.time = time; - LogContext logContext = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())); - this.log = logContext.logger(getClass()); + this.logPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName()); } /** @@ -81,7 +82,7 @@ public class InternalTopicManager { } return; } catch (StreamsException ex) { - log.warn("Could not create internal topics: " + ex.getMessage() + " Retry #" + i); + log.warn(logPrefix + "Could not create internal topics: " + ex.getMessage() + " Retry #" + i); } // backoff time.sleep(100L); @@ -101,7 +102,7 @@ public class InternalTopicManager { return existingTopicPartitions; } catch (StreamsException ex) { - log.warn("Could not get number of partitions: " + ex.getMessage() + " Retry #" + i); + log.warn(logPrefix + "Could not get number of partitions: " + ex.getMessage() + " Retry #" + i); } // backoff time.sleep(100L); @@ -113,7 +114,7 @@ public class InternalTopicManager { try { streamsKafkaClient.close(); } catch (IOException e) { - log.warn("Could not close StreamsKafkaClient."); + log.warn(logPrefix + "Could not close StreamsKafkaClient."); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/472c8974/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index d49cf58..02b07f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -116,6 +116,9 @@ public class RecordCollectorImpl implements RecordCollector { } } }); + + System.out.println("Send record " + serializedRecord); + return; } catch (final TimeoutException e) { if (attempt == MAX_SEND_ATTEMPTS) { http://git-wip-us.apache.org/repos/asf/kafka/blob/472c8974/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 0902614..61d8205 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -82,6 +82,9 @@ public class RecordQueue { public int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) { for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) { ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(rawRecord); + + System.out.println("Got record " + record); + long timestamp = timestampExtractor.extract(record, timeTracker.get()); log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, record); http://git-wip-us.apache.org/repos/asf/kafka/blob/472c8974/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 003ded8..34dcb75 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -186,7 +186,7 @@ public class StoreChangelogReader implements ChangelogReader { private Collection<TopicPartition> completed() { final Set<TopicPartition> completed = new HashSet<>(stateRestorers.keySet()); completed.removeAll(needsRestoring.keySet()); - log.debug("{} completed partitions {}", logPrefix, completed); + log.trace("{} completed partitions {}", logPrefix, completed); return completed; } http://git-wip-us.apache.org/repos/asf/kafka/blob/472c8974/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index c0acae5..210b070 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -488,24 +488,22 @@ public class StreamThread extends Thread { long runOnce(long recordsProcessedBeforeCommit) { timerStartedMs = time.milliseconds(); - // try to fetch some records if necessary - final ConsumerRecords<byte[], byte[]> records = pollRequests(); + ConsumerRecords<byte[], byte[]> records; if (state == State.PARTITIONS_ASSIGNED) { - active.initializeNewTasks(); - standby.initializeNewTasks(); + // try to fetch some records with zero poll millis + // to unblock the restoration as soon as possible + records = pollRequests(0L); - final Collection<TopicPartition> restored = storeChangelogReader.restore(); - final Set<TopicPartition> resumed = active.updateRestored(restored); - - if (!resumed.isEmpty()) { - log.trace("{} resuming partitions {}", logPrefix, resumed); - consumer.resume(resumed); - } + tryTransitToRunning(); + } else { + // try to fetch some records if necessary + records = pollRequests(pollTimeMs); - if (active.allTasksRunning()) { - assignStandbyPartitions(); - setState(State.RUNNING); + // if state changed after the poll call, + // try to initialize the assigned tasks again + if (state == State.PARTITIONS_ASSIGNED) { + tryTransitToRunning(); } } @@ -528,10 +526,34 @@ public class StreamThread extends Thread { } /** + * Retry to restore the assigned records and transit to RUNNING state if all restoration is done + */ + private void tryTransitToRunning() { + active.initializeNewTasks(); + standby.initializeNewTasks(); + + final Collection<TopicPartition> restored = storeChangelogReader.restore(); + final Set<TopicPartition> resumed = active.updateRestored(restored); + + if (!resumed.isEmpty()) { + log.trace("{} resuming partitions {}", logPrefix, resumed); + consumer.resume(resumed); + } + + if (active.allTasksRunning()) { + assignStandbyPartitions(); + setState(State.RUNNING); + } + } + + /** * Get the next batch of records by polling. + * + * @param pollTimeMs poll time millis parameter for the consumer poll + * * @return Next batch of records or null if no records available. */ - private ConsumerRecords<byte[], byte[]> pollRequests() { + private ConsumerRecords<byte[], byte[]> pollRequests(final long pollTimeMs) { ConsumerRecords<byte[], byte[]> records = null; try { http://git-wip-us.apache.org/repos/asf/kafka/blob/472c8974/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index c8ba2bb..6e7b22f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -20,15 +20,12 @@ import kafka.admin.AdminClient; import kafka.server.KafkaConfig$; import kafka.tools.StreamsResetter; import kafka.utils.MockTime; -import kafka.utils.ZkUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; @@ -50,10 +47,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Properties; -import java.util.Set; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -106,6 +101,11 @@ public class ResetIntegrationTest { ++testNo; mockTime = CLUSTER.time; + // we align time to seconds to get clean window boundaries and thus ensure the same result for each run + // otherwise, input records could fall into different windows for different runs depending on the initial mock time + final long alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000; + mockTime.setCurrentTimeMs(alignedTime); + if (adminClient == null) { adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers()); } @@ -359,36 +359,13 @@ public class ResetIntegrationTest { Assert.assertEquals(0, exitCode); } - private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) { - final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>(); - expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC); + private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) throws Exception { + // do not use list topics request, but read from the embedded cluster's zookeeper path directly to confirm if (intermediateUserTopic != null) { - expectedRemainingTopicsAfterCleanup.add(intermediateUserTopic); - } - expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC); - expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2); - expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN); - expectedRemainingTopicsAfterCleanup.add("__consumer_offsets"); - - Set<String> allTopics; - ZkUtils zkUtils = null; - try { - zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(), - 30000, - 30000, - JaasUtils.isZkSecurityEnabled()); - - do { - Utils.sleep(100); - allTopics = new HashSet<>(); - allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())); - } while (allTopics.size() != expectedRemainingTopicsAfterCleanup.size()); - } finally { - if (zkUtils != null) { - zkUtils.close(); - } + CLUSTER.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, TestUtils.GROUP_METADATA_TOPIC_NAME, intermediateUserTopic); + } else { + CLUSTER.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, TestUtils.GROUP_METADATA_TOPIC_NAME); } - assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup)); } private class WaitUntilConsumerGroupGotClosed implements TestCondition { http://git-wip-us.apache.org/repos/asf/kafka/blob/472c8974/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 6f387c0..00dd542 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -313,7 +313,6 @@ public class IntegrationTestUtils { continueConsuming(consumedValues.size(), maxMessages)) { totalPollTimeMs += pollIntervalMs; final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs); - for (final ConsumerRecord<K, V> record : records) { consumedValues.add(new KeyValue<>(record.key(), record.value())); }
