frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1634144659


##########
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##########
@@ -31,32 +31,155 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
     @Test
-    public void iterator() throws Exception {
+    public void testIterator() throws Exception {
+        String topic = "topic";
+        int recordSize = 10;
+        int partitionSize = 15;
+        int emptyPartitionIndex = 3;
+        ConsumerRecords<Integer, String> records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, 
Collections.singleton(topic));
+        Iterator<ConsumerRecord<Integer, String>> iterator = 
records.iterator();
 
-        Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = 
new LinkedHashMap<>();
+        int recordCount = 0;
+        int partitionCount = 0;
+        int currentPartition = -1;
 
-        String topic = "topic";
-        records.put(new TopicPartition(topic, 0), new ArrayList<>());
-        ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-            0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-        ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-            0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-        records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-        records.put(new TopicPartition(topic, 2), new ArrayList<>());
-
-        ConsumerRecords<Integer, String> consumerRecords = new 
ConsumerRecords<>(records);
-        Iterator<ConsumerRecord<Integer, String>> iter = 
consumerRecords.iterator();
-
-        int c = 0;
-        for (; iter.hasNext(); c++) {
-            ConsumerRecord<Integer, String> record = iter.next();
-            assertEquals(1, record.partition());
-            assertEquals(topic, record.topic());
-            assertEquals(c, record.offset());
+        while (iterator.hasNext()) {
+            ConsumerRecord<Integer, String> record = iterator.next();
+            validateEmptyPartition(record, emptyPartitionIndex);
+
+            // Check if we have moved to a new partition
+            if (currentPartition != record.partition()) {
+                // Increment the partition count as we have encountered a new 
partition
+                partitionCount++;
+                // Update the current partition to the new partition
+                currentPartition = record.partition();
+            }
+
+            validateRecordPayload(topic, record, currentPartition, 
recordCount, recordSize);
+            recordCount++;
+        }
+
+        // Including empty partition
+        assertEquals(partitionSize, partitionCount + 1);
+    }
+
+    @Test
+    public void testRecordsByPartition() {
+        List<String> topics = Arrays.asList("topic1", "topic2");
+        int recordSize = 3;
+        int partitionSize = 5;
+        int emptyPartitionIndex = 2;
+
+        ConsumerRecords<Integer, String> consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+        for (String topic : topics) {
+            for (int partition = 0; partition < partitionSize; partition++) {
+                TopicPartition topicPartition = new TopicPartition(topic, 
partition);
+                List<ConsumerRecord<Integer, String>> records = 
consumerRecords.records(topicPartition);
+
+                if (partition == emptyPartitionIndex) {
+                    assertTrue(records.isEmpty());
+                } else {
+                    assertEquals(recordSize, records.size());
+                    for (int i = 0; i < records.size(); i++) {
+                        ConsumerRecord<Integer, String> record = 
records.get(i);
+                        validateRecordPayload(topic, record, partition, i, 
recordSize);
+                    }
+                }
+            }
         }
-        assertEquals(2, c);
+    }
+
+    @Test
+    public void testRecordsByNullTopic() {
+        String nullTopic = null;
+        ConsumerRecords<Integer, String> consumerRecords = 
ConsumerRecords.empty();
+        IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+        assertEquals("Topic must be non-null.", exception.getMessage());
+    }
+
+
+    @Test
+    public void testRecordsByTopic() {
+        List<String> topics = Arrays.asList("topic1", "topic2", "topic3", 
"topic4");
+        int recordSize = 3;
+        int partitionSize = 10;
+        int emptyPartitionIndex = 6;
+        int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize - 
1);
+
+        ConsumerRecords<Integer, String> consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+        for (String topic : topics) {
+            Iterable<ConsumerRecord<Integer, String>> records = 
consumerRecords.records(topic);
+            Iterator<ConsumerRecord<Integer, String>> iterator = 
records.iterator();
+            int recordCount = 0;
+            int partitionCount = 0;
+            int currentPartition = -1;
+
+            while (iterator.hasNext()) {

Review Comment:
   Indeed, it will be more easy to read than using iterator. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to