frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1635734783


##########
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##########
@@ -31,32 +31,153 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
     @Test
-    public void iterator() throws Exception {
+    public void testIterator() throws Exception {
+        String topic = "topic";
+        int recordSize = 10;
+        int partitionSize = 15;
+        int emptyPartitionIndex = 3;
+        ConsumerRecords<Integer, String> records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, 
Collections.singleton(topic));
+        Iterator<ConsumerRecord<Integer, String>> iterator = 
records.iterator();
 
-        Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = 
new LinkedHashMap<>();
+        int recordCount = 0;
+        int partitionCount = 0;
+        int currentPartition = -1;
 
-        String topic = "topic";
-        records.put(new TopicPartition(topic, 0), new ArrayList<>());
-        ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-            0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-        ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-            0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-        records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-        records.put(new TopicPartition(topic, 2), new ArrayList<>());
-
-        ConsumerRecords<Integer, String> consumerRecords = new 
ConsumerRecords<>(records);
-        Iterator<ConsumerRecord<Integer, String>> iter = 
consumerRecords.iterator();
-
-        int c = 0;
-        for (; iter.hasNext(); c++) {
-            ConsumerRecord<Integer, String> record = iter.next();
-            assertEquals(1, record.partition());
-            assertEquals(topic, record.topic());
-            assertEquals(c, record.offset());
+        while (iterator.hasNext()) {
+            ConsumerRecord<Integer, String> record = iterator.next();
+            validateEmptyPartition(record, emptyPartitionIndex);
+
+            // Check if we have moved to a new partition
+            if (currentPartition != record.partition()) {
+                // Increment the partition count as we have encountered a new 
partition
+                partitionCount++;
+                // Update the current partition to the new partition
+                currentPartition = record.partition();
+            }
+
+            validateRecordPayload(topic, record, currentPartition, 
recordCount, recordSize);
+            recordCount++;
+        }
+
+        // Including empty partition
+        assertEquals(partitionSize, partitionCount + 1);
+    }
+
+    @Test
+    public void testRecordsByPartition() {
+        List<String> topics = Arrays.asList("topic1", "topic2");
+        int recordSize = 3;
+        int partitionSize = 5;
+        int emptyPartitionIndex = 2;
+
+        ConsumerRecords<Integer, String> consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+        for (String topic : topics) {
+            for (int partition = 0; partition < partitionSize; partition++) {
+                TopicPartition topicPartition = new TopicPartition(topic, 
partition);
+                List<ConsumerRecord<Integer, String>> records = 
consumerRecords.records(topicPartition);
+
+                if (partition == emptyPartitionIndex) {
+                    assertTrue(records.isEmpty());
+                } else {
+                    assertEquals(recordSize, records.size());
+                    for (int i = 0; i < records.size(); i++) {
+                        ConsumerRecord<Integer, String> record = 
records.get(i);
+                        validateRecordPayload(topic, record, partition, i, 
recordSize);
+                    }
+                }
+            }
         }
-        assertEquals(2, c);
+    }
+
+    @Test
+    public void testRecordsByNullTopic() {
+        String nullTopic = null;
+        ConsumerRecords<Integer, String> consumerRecords = 
ConsumerRecords.empty();
+        IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+        assertEquals("Topic must be non-null.", exception.getMessage());
+    }
+
+
+    @Test
+    public void testRecordsByTopic() {
+        List<String> topics = Arrays.asList("topic1", "topic2", "topic3", 
"topic4");
+        int recordSize = 3;
+        int partitionSize = 10;
+        int emptyPartitionIndex = 6;
+        int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize - 
1);
+
+        ConsumerRecords<Integer, String> consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+        for (String topic : topics) {
+            Iterable<ConsumerRecord<Integer, String>> records = 
consumerRecords.records(topic);
+            int recordCount = 0;
+            int partitionCount = 0;
+            int currentPartition = -1;
+
+            for (ConsumerRecord<Integer, String> record : records) {
+                validateEmptyPartition(record, emptyPartitionIndex);
+
+                // Check if we have moved to a new partition
+                if (currentPartition != record.partition()) {
+                    // Increment the partition count as we have encountered a 
new partition
+                    partitionCount++;
+                    // Update the current partition to the new partition
+                    currentPartition = record.partition();
+                }
+
+                validateRecordPayload(topic, record, currentPartition, 
recordCount, recordSize);
+                recordCount++;
+            }
+
+            // Including empty partition
+            assertEquals(partitionSize, partitionCount + 1);
+            assertEquals(expectedTotalRecordSizeOfEachTopic, recordCount);
+        }
+    }
+
+    private ConsumerRecords<Integer, String> buildTopicTestRecords(int 
recordSize,
+                                                                   int 
partitionSize,
+                                                                   int 
emptyPartitionIndex,
+                                                                   
Collection<String> topics) {
+        Map<TopicPartition, List<ConsumerRecord<Integer, String>>> 
partitionToRecords = new LinkedHashMap<>();
+        for (String topic : topics) {
+            for (int i = 0; i < partitionSize; i++) {
+                List<ConsumerRecord<Integer, String>> records = new 
ArrayList<>(recordSize);
+                if (i != emptyPartitionIndex) {
+                    for (int j = 0; j < recordSize; j++) {
+                        records.add(
+                            new ConsumerRecord<>(topic, i, j, 0L, 
TimestampType.CREATE_TIME,
+                                0, 0, j, String.valueOf(j), new 
RecordHeaders(), Optional.empty())
+                        );
+                    }
+                }
+                partitionToRecords.put(new TopicPartition(topic, i), records);
+            }
+        }
+
+        return new ConsumerRecords<>(partitionToRecords);
+    }
+
+    private void validateEmptyPartition(ConsumerRecord<Integer, String> 
record, int emptyPartitionIndex) {
+        if (record.partition() == emptyPartitionIndex) {

Review Comment:
   Yes, in this way it could be more straightforward



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to