chia7712 commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1632032964


##########
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##########
@@ -35,28 +35,83 @@ public class ConsumerRecordsTest {
 
     @Test
     public void iterator() throws Exception {
+        String topic = "topic";
+        int recordSize = 10;
+        int partitionSize = 10;
+        int emptyPartitionInterval = 3;
+        ConsumerRecords<Integer, String> records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, topic);
+        Iterator<ConsumerRecord<Integer, String>> iterator = 
records.iterator();
+
+        for (int count = 0; iterator.hasNext(); count++) {
+            if (count % emptyPartitionInterval != 0) {
+                int offset = 0;
+                for (; offset < recordSize; offset++) {
+                    ConsumerRecord<Integer, String> record = iterator.next();
+                    assertEquals(count, record.partition());
+                    assertEquals(topic, record.topic());
+                    assertEquals(offset, record.offset());

Review Comment:
   please verify the "key" and "value"



##########
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##########
@@ -35,28 +35,83 @@ public class ConsumerRecordsTest {
 
     @Test
     public void iterator() throws Exception {
+        String topic = "topic";
+        int recordSize = 10;
+        int partitionSize = 10;
+        int emptyPartitionInterval = 3;
+        ConsumerRecords<Integer, String> records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, topic);
+        Iterator<ConsumerRecord<Integer, String>> iterator = 
records.iterator();
+
+        for (int count = 0; iterator.hasNext(); count++) {
+            if (count % emptyPartitionInterval != 0) {
+                int offset = 0;
+                for (; offset < recordSize; offset++) {
+                    ConsumerRecord<Integer, String> record = iterator.next();
+                    assertEquals(count, record.partition());
+                    assertEquals(topic, record.topic());
+                    assertEquals(offset, record.offset());
+                }
+                assertEquals(recordSize, offset);
+            }
+        }
+    }
 
+    @Test
+    public void testRecordsWithNullTopic() {
+        String nullTopic = null;
         Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = 
new LinkedHashMap<>();
+        ConsumerRecords<Integer, String> consumerRecords = new 
ConsumerRecords<>(records);
+        IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+        assertEquals("Topic must be non-null.", exception.getMessage());
+    }
 
-        String topic = "topic";
-        records.put(new TopicPartition(topic, 0), new ArrayList<>());
-        ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-            0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-        ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-            0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-        records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-        records.put(new TopicPartition(topic, 2), new ArrayList<>());
+    @Test
+    public void testRecords() {
+        String[] topics = {"topic1", "topic2", "topic3", "topic4"};
+        int recordSize = 3;
+        int partitionSize = 10;
+        int emptyPartitionInterval = 3;
+        ConsumerRecords<Integer, String> consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, 
topics);
 
-        ConsumerRecords<Integer, String> consumerRecords = new 
ConsumerRecords<>(records);
-        Iterator<ConsumerRecord<Integer, String>> iter = 
consumerRecords.iterator();
-
-        int c = 0;
-        for (; iter.hasNext(); c++) {
-            ConsumerRecord<Integer, String> record = iter.next();
-            assertEquals(1, record.partition());
-            assertEquals(topic, record.topic());
-            assertEquals(c, record.offset());
+        for (String topic : topics) {
+            Iterable<ConsumerRecord<Integer, String>> records = 
consumerRecords.records(topic);
+            Iterator<ConsumerRecord<Integer, String>> iterator = 
records.iterator();
+
+            for (int count = 0; iterator.hasNext(); count++) {

Review Comment:
   Could you verify the number of records from the iterable/iterator?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##########
@@ -35,28 +35,83 @@ public class ConsumerRecordsTest {
 
     @Test
     public void iterator() throws Exception {
+        String topic = "topic";
+        int recordSize = 10;
+        int partitionSize = 10;
+        int emptyPartitionInterval = 3;
+        ConsumerRecords<Integer, String> records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, topic);
+        Iterator<ConsumerRecord<Integer, String>> iterator = 
records.iterator();
+
+        for (int count = 0; iterator.hasNext(); count++) {
+            if (count % emptyPartitionInterval != 0) {
+                int offset = 0;
+                for (; offset < recordSize; offset++) {
+                    ConsumerRecord<Integer, String> record = iterator.next();
+                    assertEquals(count, record.partition());
+                    assertEquals(topic, record.topic());
+                    assertEquals(offset, record.offset());
+                }
+                assertEquals(recordSize, offset);
+            }
+        }
+    }
 
+    @Test
+    public void testRecordsWithNullTopic() {
+        String nullTopic = null;
         Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = 
new LinkedHashMap<>();
+        ConsumerRecords<Integer, String> consumerRecords = new 
ConsumerRecords<>(records);

Review Comment:
   Maybe we can use `ConsumerRecords.empty();`?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##########
@@ -35,28 +35,83 @@ public class ConsumerRecordsTest {
 
     @Test
     public void iterator() throws Exception {
+        String topic = "topic";
+        int recordSize = 10;
+        int partitionSize = 10;
+        int emptyPartitionInterval = 3;
+        ConsumerRecords<Integer, String> records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, topic);
+        Iterator<ConsumerRecord<Integer, String>> iterator = 
records.iterator();
+
+        for (int count = 0; iterator.hasNext(); count++) {
+            if (count % emptyPartitionInterval != 0) {
+                int offset = 0;
+                for (; offset < recordSize; offset++) {
+                    ConsumerRecord<Integer, String> record = iterator.next();
+                    assertEquals(count, record.partition());
+                    assertEquals(topic, record.topic());
+                    assertEquals(offset, record.offset());
+                }
+                assertEquals(recordSize, offset);
+            }
+        }
+    }
 
+    @Test
+    public void testRecordsWithNullTopic() {
+        String nullTopic = null;
         Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = 
new LinkedHashMap<>();
+        ConsumerRecords<Integer, String> consumerRecords = new 
ConsumerRecords<>(records);
+        IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+        assertEquals("Topic must be non-null.", exception.getMessage());
+    }
 
-        String topic = "topic";
-        records.put(new TopicPartition(topic, 0), new ArrayList<>());
-        ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-            0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-        ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-            0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-        records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-        records.put(new TopicPartition(topic, 2), new ArrayList<>());
+    @Test
+    public void testRecords() {
+        String[] topics = {"topic1", "topic2", "topic3", "topic4"};
+        int recordSize = 3;
+        int partitionSize = 10;
+        int emptyPartitionInterval = 3;
+        ConsumerRecords<Integer, String> consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, 
topics);
 
-        ConsumerRecords<Integer, String> consumerRecords = new 
ConsumerRecords<>(records);
-        Iterator<ConsumerRecord<Integer, String>> iter = 
consumerRecords.iterator();
-
-        int c = 0;
-        for (; iter.hasNext(); c++) {
-            ConsumerRecord<Integer, String> record = iter.next();
-            assertEquals(1, record.partition());
-            assertEquals(topic, record.topic());
-            assertEquals(c, record.offset());
+        for (String topic : topics) {
+            Iterable<ConsumerRecord<Integer, String>> records = 
consumerRecords.records(topic);
+            Iterator<ConsumerRecord<Integer, String>> iterator = 
records.iterator();
+
+            for (int count = 0; iterator.hasNext(); count++) {
+                if (count % emptyPartitionInterval != 0) {
+                    int offset = 0;
+                    for (; offset < recordSize; offset++) {
+                        ConsumerRecord<Integer, String> record = 
iterator.next();
+                        assertEquals(count, record.partition());
+                        assertEquals(topic, record.topic());
+                        assertEquals(offset, record.offset());
+                    }
+                    assertEquals(recordSize, offset);
+                }
+            }
         }
-        assertEquals(2, c);
+    }
+
+    private ConsumerRecords<Integer, String> buildTopicTestRecords(int 
recordSize,
+                                                                   int 
partitionSize,
+                                                                   int 
emptyPartitionInterval,
+                                                                   String... 
topics) {

Review Comment:
   Could you please use `Collection<String>` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to