Repository: kafka Updated Branches: refs/heads/trunk d98ca230a -> 5819b06fa
KAFKA-4403; Update KafkaBasedLog to use new endOffsets consumer API ewencp plz review Author: Balint Molnar <balintmolna...@gmail.com> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #2176 from baluchicken/KAFKA-4403 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5819b06f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5819b06f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5819b06f Branch: refs/heads/trunk Commit: 5819b06fafd8a1a32276018560e0ff731191caf5 Parents: d98ca23 Author: Balint Molnar <balintmolna...@gmail.com> Authored: Tue Nov 29 19:04:24 2016 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Tue Nov 29 19:04:24 2016 -0800 ---------------------------------------------------------------------- .../kafka/connect/util/KafkaBasedLog.java | 29 ++------------------ .../kafka/connect/util/KafkaBasedLogTest.java | 14 +++------- 2 files changed, 6 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5819b06f/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index 290f8a2..96141a5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -46,7 +45,6 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.Future; -import static java.util.Collections.singleton; /** * <p> @@ -263,31 +261,8 @@ public class KafkaBasedLog<K, V> { log.trace("Reading to end of offset log"); Set<TopicPartition> assignment = consumer.assignment(); - - // This approach to getting the current end offset is hacky until we have an API for looking these up directly - Map<TopicPartition, Long> offsets = new HashMap<>(); - for (TopicPartition tp : assignment) { - long offset = consumer.position(tp); - offsets.put(tp, offset); - consumer.seekToEnd(singleton(tp)); - } - - Map<TopicPartition, Long> endOffsets = new HashMap<>(); - try { - poll(0); - } finally { - // If there is an exception, even a possibly expected one like WakeupException, we need to make sure - // the consumers position is reset or it'll get into an inconsistent state. - for (TopicPartition tp : assignment) { - long startOffset = offsets.get(tp); - long endOffset = consumer.position(tp); - if (endOffset > startOffset) { - endOffsets.put(tp, endOffset); - consumer.seek(tp, startOffset); - } - log.trace("Reading to end of log for {}: starting offset {} to ending offset {}", tp, startOffset, endOffset); - } - } + Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment); + log.trace("Reading to end of log offsets {}", endOffsets); while (!endOffsets.isEmpty()) { poll(Integer.MAX_VALUE); http://git-wip-us.apache.org/repos/asf/kafka/blob/5819b06f/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java index ec58cb6..d97e56e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java @@ -279,17 +279,11 @@ public class KafkaBasedLogTest { // Once we're synchronized in a poll, start the read to end and schedule the exact set of poll events // that should follow. This readToEnd call will immediately wakeup this consumer.poll() call without // returning any data. + Map<TopicPartition, Long> newEndOffsets = new HashMap<>(); + newEndOffsets.put(TP0, 2L); + newEndOffsets.put(TP1, 2L); + consumer.updateEndOffsets(newEndOffsets); store.readToEnd(readEndFutureCallback); - // Needs to seek to end to find end offsets - consumer.schedulePollTask(new Runnable() { - @Override - public void run() { - Map<TopicPartition, Long> newEndOffsets = new HashMap<>(); - newEndOffsets.put(TP0, 2L); - newEndOffsets.put(TP1, 2L); - consumer.updateEndOffsets(newEndOffsets); - } - }); // Should keep polling until it reaches current log end offset for all partitions consumer.scheduleNopPollTask();