Repository: kafka
Updated Branches:
  refs/heads/trunk d98ca230a -> 5819b06fa


KAFKA-4403; Update KafkaBasedLog to use new endOffsets consumer API

ewencp plz review

Author: Balint Molnar <balintmolna...@gmail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #2176 from baluchicken/KAFKA-4403


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5819b06f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5819b06f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5819b06f

Branch: refs/heads/trunk
Commit: 5819b06fafd8a1a32276018560e0ff731191caf5
Parents: d98ca23
Author: Balint Molnar <balintmolna...@gmail.com>
Authored: Tue Nov 29 19:04:24 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Nov 29 19:04:24 2016 -0800

----------------------------------------------------------------------
 .../kafka/connect/util/KafkaBasedLog.java       | 29 ++------------------
 .../kafka/connect/util/KafkaBasedLogTest.java   | 14 +++-------
 2 files changed, 6 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5819b06f/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 290f8a2..96141a5 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -46,7 +45,6 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.Future;
 
-import static java.util.Collections.singleton;
 
 /**
  * <p>
@@ -263,31 +261,8 @@ public class KafkaBasedLog<K, V> {
         log.trace("Reading to end of offset log");
 
         Set<TopicPartition> assignment = consumer.assignment();
-
-        // This approach to getting the current end offset is hacky until we 
have an API for looking these up directly
-        Map<TopicPartition, Long> offsets = new HashMap<>();
-        for (TopicPartition tp : assignment) {
-            long offset = consumer.position(tp);
-            offsets.put(tp, offset);
-            consumer.seekToEnd(singleton(tp));
-        }
-
-        Map<TopicPartition, Long> endOffsets = new HashMap<>();
-        try {
-            poll(0);
-        } finally {
-            // If there is an exception, even a possibly expected one like 
WakeupException, we need to make sure
-            // the consumers position is reset or it'll get into an 
inconsistent state.
-            for (TopicPartition tp : assignment) {
-                long startOffset = offsets.get(tp);
-                long endOffset = consumer.position(tp);
-                if (endOffset > startOffset) {
-                    endOffsets.put(tp, endOffset);
-                    consumer.seek(tp, startOffset);
-                }
-                log.trace("Reading to end of log for {}: starting offset {} to 
ending offset {}", tp, startOffset, endOffset);
-            }
-        }
+        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
+        log.trace("Reading to end of log offsets {}", endOffsets);
 
         while (!endOffsets.isEmpty()) {
             poll(Integer.MAX_VALUE);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5819b06f/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index ec58cb6..d97e56e 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -279,17 +279,11 @@ public class KafkaBasedLogTest {
                 // Once we're synchronized in a poll, start the read to end 
and schedule the exact set of poll events
                 // that should follow. This readToEnd call will immediately 
wakeup this consumer.poll() call without
                 // returning any data.
+                Map<TopicPartition, Long> newEndOffsets = new HashMap<>();
+                newEndOffsets.put(TP0, 2L);
+                newEndOffsets.put(TP1, 2L);
+                consumer.updateEndOffsets(newEndOffsets);
                 store.readToEnd(readEndFutureCallback);
-                // Needs to seek to end to find end offsets
-                consumer.schedulePollTask(new Runnable() {
-                    @Override
-                    public void run() {
-                        Map<TopicPartition, Long> newEndOffsets = new 
HashMap<>();
-                        newEndOffsets.put(TP0, 2L);
-                        newEndOffsets.put(TP1, 2L);
-                        consumer.updateEndOffsets(newEndOffsets);
-                    }
-                });
 
                 // Should keep polling until it reaches current log end offset 
for all partitions
                 consumer.scheduleNopPollTask();

Reply via email to