Repository: kafka Updated Branches: refs/heads/trunk 958e10c87 -> dce06766d
KAFKA-3392: ConsumerRecords iterator throws NoSuchElementException when a TopicPartition is empty This contribution is my original work, and I license it under the project's open source license. CC jkreps Author: Drausin Wulsin <[email protected]> Author: John Doe <[email protected]> Reviewers: Jason Gustafson Closes #1055 from drausin/bugfix/consumer-records-iterator Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dce06766 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dce06766 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dce06766 Branch: refs/heads/trunk Commit: dce06766da245ca95951c9c7e82d6a113db7cb13 Parents: 958e10c Author: Drausin Wulsin <[email protected]> Authored: Thu Mar 17 10:52:33 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Mar 17 10:52:33 2016 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/ConsumerRecords.java | 2 +- .../clients/consumer/ConsumerRecordsTest.java | 58 ++++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/dce06766/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index 3d7ec60..5b83f0c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -103,7 +103,7 @@ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> { Iterator<ConsumerRecord<K, V>> current; public ConsumerRecord<K, V> makeNext() { - if (current == null || !current.hasNext()) { + while (current == null || !current.hasNext()) { if (iters.hasNext()) current = iters.next().iterator(); else http://git-wip-us.apache.org/repos/asf/kafka/blob/dce06766/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java new file mode 100644 index 0000000..d68a341 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.clients.consumer; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.junit.Test; + +public class ConsumerRecordsTest { + + @Test + public void iterator() throws Exception { + + Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new LinkedHashMap<>(); + + String topic = "topic"; + records.put(new TopicPartition(topic, 0), new ArrayList<ConsumerRecord<Integer, String>>()); + ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, "value1"); + ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, "value2"); + records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2)); + records.put(new TopicPartition(topic, 2), new ArrayList<ConsumerRecord<Integer, String>>()); + + ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records); + Iterator<ConsumerRecord<Integer, String>> iter = consumerRecords.iterator(); + + int c = 0; + for (; iter.hasNext(); c++) { + ConsumerRecord<Integer, String> record = iter.next(); + assertEquals(1, record.partition()); + assertEquals(topic, record.topic()); + assertEquals(c, record.offset()); + } + assertEquals(2, c); + } +} \ No newline at end of file
