This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push: new d1cc722 KAFKA-6364: Second check for ensuring changelog topic not changed during restore (#4511) d1cc722 is described below commit d1cc722b65c5e6fa770d0909b139d385a2544803 Author: Bill Bejeck <bbej...@gmail.com> AuthorDate: Wed Feb 14 15:43:57 2018 -0500 KAFKA-6364: Second check for ensuring changelog topic not changed during restore (#4511) Added a second check for race condition where store changelog topic updated during restore, but not if a KTable changelog topic. This will be tricky to test, but I wanted to push the PR to get feedback on the approach. Reviewers: Guozhang Wang <wangg...@gmail.com>, Matthias J. Sax <matth...@confluent.io> --- .../kafka/clients/consumer/MockConsumer.java | 35 +++++++++++++++--- .../processor/internals/StoreChangelogReader.java | 9 +++++ .../internals/StoreChangelogReaderTest.java | 43 +++++++++++++++++++++- 3 files changed, 81 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index c492995..ceb7024 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -51,7 +51,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> { private final Map<String, List<PartitionInfo>> partitions; private final SubscriptionState subscriptions; private final Map<TopicPartition, Long> beginningOffsets; - private final Map<TopicPartition, Long> endOffsets; + private final Map<TopicPartition, List<Long>> endOffsets; private final Map<TopicPartition, OffsetAndMetadata> committed; private final Queue<Runnable> pollTasks; private final Set<TopicPartition> paused; @@ -290,8 +290,26 @@ public class MockConsumer<K, V> implements Consumer<K, V> { subscriptions.requestOffsetReset(tp, OffsetResetStrategy.LATEST); } - public synchronized void updateEndOffsets(Map<TopicPartition, Long> newOffsets) { - endOffsets.putAll(newOffsets); + // needed for cases where you make a second call to endOffsets + public synchronized void addEndOffsets(final Map<TopicPartition, Long> newOffsets) { + innerUpdateEndOffsets(newOffsets, false); + } + + public synchronized void updateEndOffsets(final Map<TopicPartition, Long> newOffsets) { + innerUpdateEndOffsets(newOffsets, true); + } + + private void innerUpdateEndOffsets(final Map<TopicPartition, Long> newOffsets, + final boolean replace) { + + for (final Map.Entry<TopicPartition, Long> entry : newOffsets.entrySet()) { + List<Long> offsets = endOffsets.get(entry.getKey()); + if (replace || offsets == null) { + offsets = new ArrayList<>(); + } + offsets.add(entry.getValue()); + endOffsets.put(entry.getKey(), offsets); + } } @Override @@ -354,7 +372,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> { public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) { Map<TopicPartition, Long> result = new HashMap<>(); for (TopicPartition tp : partitions) { - Long endOffset = endOffsets.get(tp); + Long endOffset = getEndOffset(endOffsets.get(tp)); if (endOffset == null) throw new IllegalStateException("The partition " + tp + " does not have an end offset."); result.put(tp, endOffset); @@ -430,7 +448,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> { if (offset == null) throw new IllegalStateException("MockConsumer didn't have beginning offset specified, but tried to seek to beginning"); } else if (strategy == OffsetResetStrategy.LATEST) { - offset = endOffsets.get(tp); + offset = getEndOffset(endOffsets.get(tp)); if (offset == null) throw new IllegalStateException("MockConsumer didn't have end offset specified, but tried to seek to end"); } else { @@ -438,4 +456,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> { } seek(tp, offset); } + + private Long getEndOffset(List<Long> offsets) { + if (offsets == null || offsets.isEmpty()) { + return null; + } + return offsets.size() > 1 ? offsets.remove(0) : offsets.get(0); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index b11c45b..5fcba76 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -255,6 +255,15 @@ public class StoreChangelogReader implements ChangelogReader { throw new TaskMigratedException(task, topicPartition, endOffset, pos); } + // need to check for changelog topic + if (restorer.offsetLimit() == Long.MAX_VALUE) { + final Long updatedEndOffset = restoreConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition); + if (!restorer.hasCompleted(pos, updatedEndOffset)) { + throw new TaskMigratedException(task, topicPartition, updatedEndOffset, pos); + } + } + + log.debug("Completed restoring state from changelog {} with {} records ranging from offset {} to {}", topicPartition, restorer.restoredNumRecords(), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index e69cede..c65d4ef 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -390,9 +390,50 @@ public class StoreChangelogReaderTest { try { changelogReader.restore(active); fail("Should have thrown TaskMigratedException"); - } catch (final TaskMigratedException expected) { /* ignore */ } + } catch (final TaskMigratedException expected) { + /* ignore */ + } } + + @Test + public void shouldThrowTaskMigratedExceptionIfChangelogTopicUpdatedDuringRestoreProcessFoundInSecondCheck() { + final int messages = 10; + setupConsumer(messages, topicPartition); + // in this case first call to endOffsets returns correct value, but a second thread has updated the changelog topic + // so a subsequent call to endOffsets returns a value exceeding the expected end value + consumer.addEndOffsets(Collections.singletonMap(topicPartition, 15L)); + changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); + + expect(active.restoringTaskFor(topicPartition)).andReturn(task); + replay(active); + + try { + changelogReader.restore(active); + fail("Should have thrown TaskMigratedException"); + } catch (final TaskMigratedException expected) { + // verifies second block threw exception with updated end offset + assertTrue(expected.getMessage().contains("end offset 15, current offset 10")); + } + } + + + @Test + public void shouldNotThrowTaskMigratedExceptionIfSourceTopicUpdatedDuringRestoreProcess() { + final int messages = 10; + setupConsumer(messages, topicPartition); + // in this case first call to endOffsets returns correct value, but a second thread has updated the source topic + // but since it's a source topic, the second check should not fire hence no exception + consumer.addEndOffsets(Collections.singletonMap(topicPartition, 15L)); + changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 9L, true, "storeName")); + + expect(active.restoringTaskFor(topicPartition)).andReturn(task); + replay(active); + + changelogReader.restore(active); + } + + @Test public void shouldThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForChangelogTopicEOSEnabled() { final int totalMessages = 10; -- To stop receiving notification emails like this one, please contact guozh...@apache.org.