Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-13 Thread via GitHub


chia7712 merged PR #16227:
URL: https://github.com/apache/kafka/pull/16227


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-11 Thread via GitHub


frankvicky commented on PR #16227:
URL: https://github.com/apache/kafka/pull/16227#issuecomment-2161999580

   Hi @chia7712, I have do some small changes based on your feedback, PTAL  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-11 Thread via GitHub


frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1635735059


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,153 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
-public void iterator() throws Exception {
+public void testIterator() throws Exception {

Review Comment:
   Oops, I will remove it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-11 Thread via GitHub


frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1635734783


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,153 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
-public void iterator() throws Exception {
+public void testIterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 15;
+int emptyPartitionIndex = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, 
Collections.singleton(topic));
+Iterator> iterator = 
records.iterator();
 
-Map>> records = 
new LinkedHashMap<>();
+int recordCount = 0;
+int partitionCount = 0;
+int currentPartition = -1;
 
-String topic = "topic";
-records.put(new TopicPartition(topic, 0), new ArrayList<>());
-ConsumerRecord record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-ConsumerRecord record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-records.put(new TopicPartition(topic, 2), new ArrayList<>());
-
-ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
-Iterator> iter = 
consumerRecords.iterator();
-
-int c = 0;
-for (; iter.hasNext(); c++) {
-ConsumerRecord record = iter.next();
-assertEquals(1, record.partition());
-assertEquals(topic, record.topic());
-assertEquals(c, record.offset());
+while (iterator.hasNext()) {
+ConsumerRecord record = iterator.next();
+validateEmptyPartition(record, emptyPartitionIndex);
+
+// Check if we have moved to a new partition
+if (currentPartition != record.partition()) {
+// Increment the partition count as we have encountered a new 
partition
+partitionCount++;
+// Update the current partition to the new partition
+currentPartition = record.partition();
+}
+
+validateRecordPayload(topic, record, currentPartition, 
recordCount, recordSize);
+recordCount++;
+}
+
+// Including empty partition
+assertEquals(partitionSize, partitionCount + 1);
+}
+
+@Test
+public void testRecordsByPartition() {
+List topics = Arrays.asList("topic1", "topic2");
+int recordSize = 3;
+int partitionSize = 5;
+int emptyPartitionIndex = 2;
+
+ConsumerRecords consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+for (String topic : topics) {
+for (int partition = 0; partition < partitionSize; partition++) {
+TopicPartition topicPartition = new TopicPartition(topic, 
partition);
+List> records = 
consumerRecords.records(topicPartition);
+
+if (partition == emptyPartitionIndex) {
+assertTrue(records.isEmpty());
+} else {
+assertEquals(recordSize, records.size());
+for (int i = 0; i < records.size(); i++) {
+ConsumerRecord record = 
records.get(i);
+validateRecordPayload(topic, record, partition, i, 
recordSize);
+}
+}
+}
 }
-assertEquals(2, c);
+}
+
+@Test
+public void testRecordsByNullTopic() {
+String nullTopic = null;
+ConsumerRecords consumerRecords = 
ConsumerRecords.empty();
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
+
+
+@Test
+public void testRecordsByTopic() {
+List topics = Arrays.asList("topic1", "topic2", "topic3", 
"topic4");
+int recordSize = 3;
+int partitionSize = 10;
+int emptyPartitionIndex = 6;
+int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize - 
1);
+
+ConsumerRecords consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+for (String topic : topics) {
+

Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-11 Thread via GitHub


chia7712 commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1635380850


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,153 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
-public void iterator() throws Exception {
+public void testIterator() throws Exception {

Review Comment:
   `Exception` is unused.



##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,153 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
-public void iterator() throws Exception {
+public void testIterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 15;
+int emptyPartitionIndex = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, 
Collections.singleton(topic));
+Iterator> iterator = 
records.iterator();
 
-Map>> records = 
new LinkedHashMap<>();
+int recordCount = 0;
+int partitionCount = 0;
+int currentPartition = -1;
 
-String topic = "topic";
-records.put(new TopicPartition(topic, 0), new ArrayList<>());
-ConsumerRecord record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-ConsumerRecord record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-records.put(new TopicPartition(topic, 2), new ArrayList<>());
-
-ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
-Iterator> iter = 
consumerRecords.iterator();
-
-int c = 0;
-for (; iter.hasNext(); c++) {
-ConsumerRecord record = iter.next();
-assertEquals(1, record.partition());
-assertEquals(topic, record.topic());
-assertEquals(c, record.offset());
+while (iterator.hasNext()) {
+ConsumerRecord record = iterator.next();
+validateEmptyPartition(record, emptyPartitionIndex);
+
+// Check if we have moved to a new partition
+if (currentPartition != record.partition()) {
+// Increment the partition count as we have encountered a new 
partition
+partitionCount++;
+// Update the current partition to the new partition
+currentPartition = record.partition();
+}
+
+validateRecordPayload(topic, record, currentPartition, 
recordCount, recordSize);
+recordCount++;
+}
+
+// Including empty partition
+assertEquals(partitionSize, partitionCount + 1);
+}
+
+@Test
+public void testRecordsByPartition() {
+List topics = Arrays.asList("topic1", "topic2");
+int recordSize = 3;
+int partitionSize = 5;
+int emptyPartitionIndex = 2;
+
+ConsumerRecords consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+for (String topic : topics) {
+for (int partition = 0; partition < partitionSize; partition++) {
+TopicPartition topicPartition = new TopicPartition(topic, 
partition);
+List> records = 
consumerRecords.records(topicPartition);
+
+if (partition == emptyPartitionIndex) {
+assertTrue(records.isEmpty());
+} else {
+assertEquals(recordSize, records.size());
+for (int i = 0; i < records.size(); i++) {
+ConsumerRecord record = 
records.get(i);
+validateRecordPayload(topic, record, partition, i, 
recordSize);
+}
+}
+}
 }
-assertEquals(2, c);
+}
+
+@Test
+public void testRecordsByNullTopic() {
+String nullTopic = null;
+ConsumerRecords consumerRecords = 
ConsumerRecords.empty();
+IllegalArgumentException exception = 

Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-11 Thread via GitHub


frankvicky commented on PR #16227:
URL: https://github.com/apache/kafka/pull/16227#issuecomment-2159863343

   Hi @chia7712, I have do a small change based on your feedback, PTAL  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-10 Thread via GitHub


frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1634144659


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,155 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
-public void iterator() throws Exception {
+public void testIterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 15;
+int emptyPartitionIndex = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, 
Collections.singleton(topic));
+Iterator> iterator = 
records.iterator();
 
-Map>> records = 
new LinkedHashMap<>();
+int recordCount = 0;
+int partitionCount = 0;
+int currentPartition = -1;
 
-String topic = "topic";
-records.put(new TopicPartition(topic, 0), new ArrayList<>());
-ConsumerRecord record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-ConsumerRecord record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-records.put(new TopicPartition(topic, 2), new ArrayList<>());
-
-ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
-Iterator> iter = 
consumerRecords.iterator();
-
-int c = 0;
-for (; iter.hasNext(); c++) {
-ConsumerRecord record = iter.next();
-assertEquals(1, record.partition());
-assertEquals(topic, record.topic());
-assertEquals(c, record.offset());
+while (iterator.hasNext()) {
+ConsumerRecord record = iterator.next();
+validateEmptyPartition(record, emptyPartitionIndex);
+
+// Check if we have moved to a new partition
+if (currentPartition != record.partition()) {
+// Increment the partition count as we have encountered a new 
partition
+partitionCount++;
+// Update the current partition to the new partition
+currentPartition = record.partition();
+}
+
+validateRecordPayload(topic, record, currentPartition, 
recordCount, recordSize);
+recordCount++;
+}
+
+// Including empty partition
+assertEquals(partitionSize, partitionCount + 1);
+}
+
+@Test
+public void testRecordsByPartition() {
+List topics = Arrays.asList("topic1", "topic2");
+int recordSize = 3;
+int partitionSize = 5;
+int emptyPartitionIndex = 2;
+
+ConsumerRecords consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+for (String topic : topics) {
+for (int partition = 0; partition < partitionSize; partition++) {
+TopicPartition topicPartition = new TopicPartition(topic, 
partition);
+List> records = 
consumerRecords.records(topicPartition);
+
+if (partition == emptyPartitionIndex) {
+assertTrue(records.isEmpty());
+} else {
+assertEquals(recordSize, records.size());
+for (int i = 0; i < records.size(); i++) {
+ConsumerRecord record = 
records.get(i);
+validateRecordPayload(topic, record, partition, i, 
recordSize);
+}
+}
+}
 }
-assertEquals(2, c);
+}
+
+@Test
+public void testRecordsByNullTopic() {
+String nullTopic = null;
+ConsumerRecords consumerRecords = 
ConsumerRecords.empty();
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
+
+
+@Test
+public void testRecordsByTopic() {
+List topics = Arrays.asList("topic1", "topic2", "topic3", 
"topic4");
+int recordSize = 3;
+int partitionSize = 10;
+int emptyPartitionIndex = 6;
+int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize - 
1);
+
+ConsumerRecords consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+for (String topic : topics) {
+

Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-10 Thread via GitHub


chia7712 commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1634004534


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,155 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
-public void iterator() throws Exception {
+public void testIterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 15;
+int emptyPartitionIndex = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, 
Collections.singleton(topic));
+Iterator> iterator = 
records.iterator();
 
-Map>> records = 
new LinkedHashMap<>();
+int recordCount = 0;
+int partitionCount = 0;
+int currentPartition = -1;
 
-String topic = "topic";
-records.put(new TopicPartition(topic, 0), new ArrayList<>());
-ConsumerRecord record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-ConsumerRecord record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-records.put(new TopicPartition(topic, 2), new ArrayList<>());
-
-ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
-Iterator> iter = 
consumerRecords.iterator();
-
-int c = 0;
-for (; iter.hasNext(); c++) {
-ConsumerRecord record = iter.next();
-assertEquals(1, record.partition());
-assertEquals(topic, record.topic());
-assertEquals(c, record.offset());
+while (iterator.hasNext()) {
+ConsumerRecord record = iterator.next();
+validateEmptyPartition(record, emptyPartitionIndex);
+
+// Check if we have moved to a new partition
+if (currentPartition != record.partition()) {
+// Increment the partition count as we have encountered a new 
partition
+partitionCount++;
+// Update the current partition to the new partition
+currentPartition = record.partition();
+}
+
+validateRecordPayload(topic, record, currentPartition, 
recordCount, recordSize);
+recordCount++;
+}
+
+// Including empty partition
+assertEquals(partitionSize, partitionCount + 1);
+}
+
+@Test
+public void testRecordsByPartition() {
+List topics = Arrays.asList("topic1", "topic2");
+int recordSize = 3;
+int partitionSize = 5;
+int emptyPartitionIndex = 2;
+
+ConsumerRecords consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+for (String topic : topics) {
+for (int partition = 0; partition < partitionSize; partition++) {
+TopicPartition topicPartition = new TopicPartition(topic, 
partition);
+List> records = 
consumerRecords.records(topicPartition);
+
+if (partition == emptyPartitionIndex) {
+assertTrue(records.isEmpty());
+} else {
+assertEquals(recordSize, records.size());
+for (int i = 0; i < records.size(); i++) {
+ConsumerRecord record = 
records.get(i);
+validateRecordPayload(topic, record, partition, i, 
recordSize);
+}
+}
+}
 }
-assertEquals(2, c);
+}
+
+@Test
+public void testRecordsByNullTopic() {
+String nullTopic = null;
+ConsumerRecords consumerRecords = 
ConsumerRecords.empty();
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
+
+
+@Test
+public void testRecordsByTopic() {
+List topics = Arrays.asList("topic1", "topic2", "topic3", 
"topic4");
+int recordSize = 3;
+int partitionSize = 10;
+int emptyPartitionIndex = 6;
+int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize - 
1);
+
+ConsumerRecords consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+for (String topic : topics) {
+

Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-09 Thread via GitHub


frankvicky commented on PR #16227:
URL: https://github.com/apache/kafka/pull/16227#issuecomment-2157060477

   Hi @chia7712 , I have do some changes and add a test case, PTAL  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-09 Thread via GitHub


frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1632484396


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,129 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
 public void iterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 15;
+int emptyPartitionIndex = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, 
Collections.singleton(topic));
+Iterator> iterator = 
records.iterator();
 
-Map>> records = 
new LinkedHashMap<>();
+int recordCount = 0;
+int partitionCount = 0;
+int currentPartition = -1;
+
+while (iterator.hasNext()) {
+ConsumerRecord record = iterator.next();
+
+if (record.partition() == emptyPartitionIndex) {
+fail("Partition " + emptyPartitionIndex + " is not empty");
+}
+
+// Check if we have moved to a new partition
+if (currentPartition != record.partition()) {
+// Increment the partition count as we have encountered a new 
partition
+partitionCount++;
+// Update the current partition to the new partition
+currentPartition = record.partition();
+}
 
-String topic = "topic";
-records.put(new TopicPartition(topic, 0), new ArrayList<>());
-ConsumerRecord record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-ConsumerRecord record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-records.put(new TopicPartition(topic, 2), new ArrayList<>());
-
-ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
-Iterator> iter = 
consumerRecords.iterator();
-
-int c = 0;
-for (; iter.hasNext(); c++) {
-ConsumerRecord record = iter.next();
-assertEquals(1, record.partition());
 assertEquals(topic, record.topic());
-assertEquals(c, record.offset());
+assertEquals(currentPartition, record.partition());
+assertEquals(recordCount % recordSize, record.offset());
+assertEquals(recordCount % recordSize, record.key());
+assertEquals(String.valueOf(recordCount % recordSize), 
record.value());
+
+recordCount++;
 }
-assertEquals(2, c);
+
+// Including empty partition
+assertEquals(partitionSize, partitionCount + 1);
+}
+
+@Test
+public void testRecordsWithNullTopic() {
+String nullTopic = null;
+ConsumerRecords consumerRecords = 
ConsumerRecords.empty();
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
+
+
+@Test
+public void testRecords() {

Review Comment:
   I will do it  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-09 Thread via GitHub


frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1632484236


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,129 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
 public void iterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 15;
+int emptyPartitionIndex = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, 
Collections.singleton(topic));
+Iterator> iterator = 
records.iterator();
 
-Map>> records = 
new LinkedHashMap<>();
+int recordCount = 0;
+int partitionCount = 0;
+int currentPartition = -1;
+
+while (iterator.hasNext()) {
+ConsumerRecord record = iterator.next();
+
+if (record.partition() == emptyPartitionIndex) {
+fail("Partition " + emptyPartitionIndex + " is not empty");
+}
+
+// Check if we have moved to a new partition
+if (currentPartition != record.partition()) {
+// Increment the partition count as we have encountered a new 
partition
+partitionCount++;
+// Update the current partition to the new partition
+currentPartition = record.partition();
+}
 
-String topic = "topic";
-records.put(new TopicPartition(topic, 0), new ArrayList<>());
-ConsumerRecord record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-ConsumerRecord record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-records.put(new TopicPartition(topic, 2), new ArrayList<>());
-
-ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
-Iterator> iter = 
consumerRecords.iterator();
-
-int c = 0;
-for (; iter.hasNext(); c++) {
-ConsumerRecord record = iter.next();
-assertEquals(1, record.partition());
 assertEquals(topic, record.topic());
-assertEquals(c, record.offset());
+assertEquals(currentPartition, record.partition());
+assertEquals(recordCount % recordSize, record.offset());
+assertEquals(recordCount % recordSize, record.key());
+assertEquals(String.valueOf(recordCount % recordSize), 
record.value());
+
+recordCount++;
 }
-assertEquals(2, c);
+
+// Including empty partition
+assertEquals(partitionSize, partitionCount + 1);
+}
+
+@Test
+public void testRecordsWithNullTopic() {
+String nullTopic = null;
+ConsumerRecords consumerRecords = 
ConsumerRecords.empty();
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
+
+
+@Test
+public void testRecords() {
+List topics = Arrays.asList("topic1", "topic2", "topic3", 
"topic4");
+int recordSize = 3;
+int partitionSize = 10;
+int emptyPartitionIndex = 6;
+int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize - 
1);
+
+ConsumerRecords consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
+
+for (String topic : topics) {
+Iterable> records = 
consumerRecords.records(topic);
+Iterator> iterator = 
records.iterator();
+int recordCount = 0;
+int partitionCount = 0;
+int currentPartition = -1;
+
+while (iterator.hasNext()) {
+ConsumerRecord record = iterator.next();
+
+if (record.partition() == emptyPartitionIndex) {
+fail("Partition " + emptyPartitionIndex + " is not empty");
+}
+
+// Check if we have moved to a new partition
+if (currentPartition != record.partition()) {
+// Increment the partition count as we have encountered a 
new partition
+partitionCount++;
+// Update the current partition to the new partition
+currentPartition = record.partition();
+}
+
+assertEquals(topic, 

Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-09 Thread via GitHub


frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1632481576


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,129 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
 public void iterator() throws Exception {

Review Comment:
   Sure, don't even notice this case doesn't have `test` prefix 藍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-09 Thread via GitHub


chia7712 commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1632355536


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,129 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
 public void iterator() throws Exception {

Review Comment:
   could you please rename it to `testIterator`?



##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,129 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
 public void iterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 15;
+int emptyPartitionIndex = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, 
Collections.singleton(topic));
+Iterator> iterator = 
records.iterator();
 
-Map>> records = 
new LinkedHashMap<>();
+int recordCount = 0;
+int partitionCount = 0;
+int currentPartition = -1;
+
+while (iterator.hasNext()) {
+ConsumerRecord record = iterator.next();
+
+if (record.partition() == emptyPartitionIndex) {
+fail("Partition " + emptyPartitionIndex + " is not empty");
+}
+
+// Check if we have moved to a new partition
+if (currentPartition != record.partition()) {
+// Increment the partition count as we have encountered a new 
partition
+partitionCount++;
+// Update the current partition to the new partition
+currentPartition = record.partition();
+}
 
-String topic = "topic";
-records.put(new TopicPartition(topic, 0), new ArrayList<>());
-ConsumerRecord record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-ConsumerRecord record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-records.put(new TopicPartition(topic, 2), new ArrayList<>());
-
-ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
-Iterator> iter = 
consumerRecords.iterator();
-
-int c = 0;
-for (; iter.hasNext(); c++) {
-ConsumerRecord record = iter.next();
-assertEquals(1, record.partition());
 assertEquals(topic, record.topic());
-assertEquals(c, record.offset());
+assertEquals(currentPartition, record.partition());
+assertEquals(recordCount % recordSize, record.offset());
+assertEquals(recordCount % recordSize, record.key());
+assertEquals(String.valueOf(recordCount % recordSize), 
record.value());
+
+recordCount++;
 }
-assertEquals(2, c);
+
+// Including empty partition
+assertEquals(partitionSize, partitionCount + 1);
+}
+
+@Test
+public void testRecordsWithNullTopic() {
+String nullTopic = null;
+ConsumerRecords consumerRecords = 
ConsumerRecords.empty();
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
+
+
+@Test
+public void testRecords() {

Review Comment:
   Could you add test for `records(TopicPartition)`?



##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -31,32 +31,129 @@
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class ConsumerRecordsTest {
 
 @Test
 public void iterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 15;
+int emptyPartitionIndex = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, 

Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-09 Thread via GitHub


frankvicky commented on PR #16227:
URL: https://github.com/apache/kafka/pull/16227#issuecomment-2156349665

   Hi @chia7712 , I have do some refactors, PTAL  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-08 Thread via GitHub


chia7712 commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1632067288


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -35,28 +38,109 @@ public class ConsumerRecordsTest {
 
 @Test
 public void iterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 15;
+int emptyPartitionInterval = 3;

Review Comment:
   Could we define specific partition to be empty? that will get simplified I 
think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-08 Thread via GitHub


frankvicky commented on PR #16227:
URL: https://github.com/apache/kafka/pull/16227#issuecomment-2156063782

   Hi @chia7712 ,  I do some changes based on comments, PTAL 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-08 Thread via GitHub


frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1632055803


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -35,28 +35,83 @@ public class ConsumerRecordsTest {
 
 @Test
 public void iterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 10;
+int emptyPartitionInterval = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, topic);
+Iterator> iterator = 
records.iterator();
+
+for (int count = 0; iterator.hasNext(); count++) {
+if (count % emptyPartitionInterval != 0) {
+int offset = 0;
+for (; offset < recordSize; offset++) {
+ConsumerRecord record = iterator.next();
+assertEquals(count, record.partition());
+assertEquals(topic, record.topic());
+assertEquals(offset, record.offset());

Review Comment:
   Oops, I miss both of them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-08 Thread via GitHub


frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1632043299


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -35,28 +35,83 @@ public class ConsumerRecordsTest {
 
 @Test
 public void iterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 10;
+int emptyPartitionInterval = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, topic);
+Iterator> iterator = 
records.iterator();
+
+for (int count = 0; iterator.hasNext(); count++) {
+if (count % emptyPartitionInterval != 0) {
+int offset = 0;
+for (; offset < recordSize; offset++) {
+ConsumerRecord record = iterator.next();
+assertEquals(count, record.partition());
+assertEquals(topic, record.topic());
+assertEquals(offset, record.offset());
+}
+assertEquals(recordSize, offset);
+}
+}
+}
 
+@Test
+public void testRecordsWithNullTopic() {
+String nullTopic = null;
 Map>> records = 
new LinkedHashMap<>();
+ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
 
-String topic = "topic";
-records.put(new TopicPartition(topic, 0), new ArrayList<>());
-ConsumerRecord record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-ConsumerRecord record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-records.put(new TopicPartition(topic, 2), new ArrayList<>());
+@Test
+public void testRecords() {
+String[] topics = {"topic1", "topic2", "topic3", "topic4"};
+int recordSize = 3;
+int partitionSize = 10;
+int emptyPartitionInterval = 3;
+ConsumerRecords consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, 
topics);
 
-ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
-Iterator> iter = 
consumerRecords.iterator();
-
-int c = 0;
-for (; iter.hasNext(); c++) {
-ConsumerRecord record = iter.next();
-assertEquals(1, record.partition());
-assertEquals(topic, record.topic());
-assertEquals(c, record.offset());
+for (String topic : topics) {
+Iterable> records = 
consumerRecords.records(topic);
+Iterator> iterator = 
records.iterator();
+
+for (int count = 0; iterator.hasNext(); count++) {

Review Comment:
   Sure, I will add the verification of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-08 Thread via GitHub


frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1632040486


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -35,28 +35,83 @@ public class ConsumerRecordsTest {
 
 @Test
 public void iterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 10;
+int emptyPartitionInterval = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, topic);
+Iterator> iterator = 
records.iterator();
+
+for (int count = 0; iterator.hasNext(); count++) {
+if (count % emptyPartitionInterval != 0) {
+int offset = 0;
+for (; offset < recordSize; offset++) {
+ConsumerRecord record = iterator.next();
+assertEquals(count, record.partition());
+assertEquals(topic, record.topic());
+assertEquals(offset, record.offset());
+}
+assertEquals(recordSize, offset);
+}
+}
+}
 
+@Test
+public void testRecordsWithNullTopic() {
+String nullTopic = null;
 Map>> records = 
new LinkedHashMap<>();
+ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);

Review Comment:
   Cool, I don't know we have this utility function, I will replace with it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-08 Thread via GitHub


chia7712 commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1632032964


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -35,28 +35,83 @@ public class ConsumerRecordsTest {
 
 @Test
 public void iterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 10;
+int emptyPartitionInterval = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, topic);
+Iterator> iterator = 
records.iterator();
+
+for (int count = 0; iterator.hasNext(); count++) {
+if (count % emptyPartitionInterval != 0) {
+int offset = 0;
+for (; offset < recordSize; offset++) {
+ConsumerRecord record = iterator.next();
+assertEquals(count, record.partition());
+assertEquals(topic, record.topic());
+assertEquals(offset, record.offset());

Review Comment:
   please verify the "key" and "value"



##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -35,28 +35,83 @@ public class ConsumerRecordsTest {
 
 @Test
 public void iterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 10;
+int emptyPartitionInterval = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, topic);
+Iterator> iterator = 
records.iterator();
+
+for (int count = 0; iterator.hasNext(); count++) {
+if (count % emptyPartitionInterval != 0) {
+int offset = 0;
+for (; offset < recordSize; offset++) {
+ConsumerRecord record = iterator.next();
+assertEquals(count, record.partition());
+assertEquals(topic, record.topic());
+assertEquals(offset, record.offset());
+}
+assertEquals(recordSize, offset);
+}
+}
+}
 
+@Test
+public void testRecordsWithNullTopic() {
+String nullTopic = null;
 Map>> records = 
new LinkedHashMap<>();
+ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
 
-String topic = "topic";
-records.put(new TopicPartition(topic, 0), new ArrayList<>());
-ConsumerRecord record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
-0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
-ConsumerRecord record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
-0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
-records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
-records.put(new TopicPartition(topic, 2), new ArrayList<>());
+@Test
+public void testRecords() {
+String[] topics = {"topic1", "topic2", "topic3", "topic4"};
+int recordSize = 3;
+int partitionSize = 10;
+int emptyPartitionInterval = 3;
+ConsumerRecords consumerRecords = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, 
topics);
 
-ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
-Iterator> iter = 
consumerRecords.iterator();
-
-int c = 0;
-for (; iter.hasNext(); c++) {
-ConsumerRecord record = iter.next();
-assertEquals(1, record.partition());
-assertEquals(topic, record.topic());
-assertEquals(c, record.offset());
+for (String topic : topics) {
+Iterable> records = 
consumerRecords.records(topic);
+Iterator> iterator = 
records.iterator();
+
+for (int count = 0; iterator.hasNext(); count++) {

Review Comment:
   Could you verify the number of records from the iterable/iterator?



##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -35,28 +35,83 @@ public class ConsumerRecordsTest {
 
 @Test
 public void iterator() throws Exception {
+String topic = "topic";
+int recordSize = 10;
+int partitionSize = 10;
+int emptyPartitionInterval = 3;
+ConsumerRecords records = 
buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, topic);
+Iterator> iterator = 
records.iterator();
+
+for (int count = 0; iterator.hasNext(); count++) {
+if (count % emptyPartitionInterval != 0) {
+int offset = 0;
+ 

Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-07 Thread via GitHub


frankvicky commented on PR #16227:
URL: https://github.com/apache/kafka/pull/16227#issuecomment-2155814048

   Hi @chia7712, I do few changes based on comments, PTAL 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-07 Thread via GitHub


frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1631833896


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -59,4 +49,72 @@ public void iterator() throws Exception {
 }
 assertEquals(2, c);
 }
+
+@Test
+public void testRecordsWithNullTopic() {
+String nullTopic = null;
+Map>> records = 
new LinkedHashMap<>();
+ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
+
+@Test
+public void testRecords() {
+String[] topics = {"topic1", "topic2", "topic3", "topic4"};
+int recordSize = 3;
+Map>> 
partitionToRecords = buildTopicTestRecords(recordSize, topics);
+ConsumerRecords consumerRecords = new 
ConsumerRecords<>(partitionToRecords);
+for (String topic : topics) {
+Iterable> records = 
consumerRecords.records(topic);
+
+int count  = 0;
+Iterator> iterator = 
records.iterator();
+for (; iterator.hasNext() && count < recordSize; count++) {
+ConsumerRecord record = iterator.next();
+assertEquals(topic, record.topic());
+assertEquals(count, record.partition());
+assertEquals(count, record.offset());
+assertEquals(count, record.key());
+assertEquals(String.valueOf(count), record.value());
+}
+}
+}
+
+private Map>> 
buildSingleTopicTestRecords(String topic) {

Review Comment:
   I keep this because the test case `iterator` has it' own test logic but I 
will try to rewrite it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-07 Thread via GitHub


frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1631832885


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -59,4 +49,72 @@ public void iterator() throws Exception {
 }
 assertEquals(2, c);
 }
+
+@Test
+public void testRecordsWithNullTopic() {
+String nullTopic = null;
+Map>> records = 
new LinkedHashMap<>();
+ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
+
+@Test
+public void testRecords() {
+String[] topics = {"topic1", "topic2", "topic3", "topic4"};
+int recordSize = 3;
+Map>> 
partitionToRecords = buildTopicTestRecords(recordSize, topics);
+ConsumerRecords consumerRecords = new 
ConsumerRecords<>(partitionToRecords);
+for (String topic : topics) {
+Iterable> records = 
consumerRecords.records(topic);
+
+int count  = 0;
+Iterator> iterator = 
records.iterator();
+for (; iterator.hasNext() && count < recordSize; count++) {
+ConsumerRecord record = iterator.next();
+assertEquals(topic, record.topic());
+assertEquals(count, record.partition());
+assertEquals(count, record.offset());
+assertEquals(count, record.key());
+assertEquals(String.valueOf(count), record.value());
+}
+}
+}
+
+private Map>> 
buildSingleTopicTestRecords(String topic) {
+Map>> records = 
new LinkedHashMap<>();
+ConsumerRecord record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
+0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
+ConsumerRecord record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
+0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
+
+new ArrayList<>();

Review Comment:
   Oops, I accidentally leave this, I will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-07 Thread via GitHub


frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1631832337


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -59,4 +49,72 @@ public void iterator() throws Exception {
 }
 assertEquals(2, c);
 }
+
+@Test
+public void testRecordsWithNullTopic() {
+String nullTopic = null;
+Map>> records = 
new LinkedHashMap<>();
+ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
+
+@Test
+public void testRecords() {
+String[] topics = {"topic1", "topic2", "topic3", "topic4"};
+int recordSize = 3;
+Map>> 
partitionToRecords = buildTopicTestRecords(recordSize, topics);
+ConsumerRecords consumerRecords = new 
ConsumerRecords<>(partitionToRecords);
+for (String topic : topics) {
+Iterable> records = 
consumerRecords.records(topic);
+
+int count  = 0;
+Iterator> iterator = 
records.iterator();
+for (; iterator.hasNext() && count < recordSize; count++) {
+ConsumerRecord record = iterator.next();
+assertEquals(topic, record.topic());
+assertEquals(count, record.partition());
+assertEquals(count, record.offset());
+assertEquals(count, record.key());
+assertEquals(String.valueOf(count), record.value());
+}
+}
+}
+
+private Map>> 
buildSingleTopicTestRecords(String topic) {
+Map>> records = 
new LinkedHashMap<>();
+ConsumerRecord record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
+0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
+ConsumerRecord record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
+0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
+
+new ArrayList<>();
+for (int i = 0; i < 3; i++) {
+new ConsumerRecord<>(topic, i, i, 0L, TimestampType.CREATE_TIME,
+0, 0, 2, String.valueOf(i), new RecordHeaders(), 
Optional.empty());
+}
+
+records.put(new TopicPartition(topic, 0), new ArrayList<>());
+records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
+records.put(new TopicPartition(topic, 2), new ArrayList<>());
+return records;
+}
+
+private Map>> 
buildTopicTestRecords(int recordSize, String... topics) {

Review Comment:
   Yes, it is more clear that we return `ConsumerRecords` directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-07 Thread via GitHub


chia7712 commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1631754807


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -59,4 +49,72 @@ public void iterator() throws Exception {
 }
 assertEquals(2, c);
 }
+
+@Test
+public void testRecordsWithNullTopic() {
+String nullTopic = null;
+Map>> records = 
new LinkedHashMap<>();
+ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
+
+@Test
+public void testRecords() {
+String[] topics = {"topic1", "topic2", "topic3", "topic4"};
+int recordSize = 3;
+Map>> 
partitionToRecords = buildTopicTestRecords(recordSize, topics);
+ConsumerRecords consumerRecords = new 
ConsumerRecords<>(partitionToRecords);
+for (String topic : topics) {
+Iterable> records = 
consumerRecords.records(topic);
+
+int count  = 0;
+Iterator> iterator = 
records.iterator();
+for (; iterator.hasNext() && count < recordSize; count++) {
+ConsumerRecord record = iterator.next();
+assertEquals(topic, record.topic());
+assertEquals(count, record.partition());
+assertEquals(count, record.offset());
+assertEquals(count, record.key());
+assertEquals(String.valueOf(count), record.value());
+}
+}
+}
+
+private Map>> 
buildSingleTopicTestRecords(String topic) {
+Map>> records = 
new LinkedHashMap<>();
+ConsumerRecord record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
+0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
+ConsumerRecord record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
+0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
+
+new ArrayList<>();
+for (int i = 0; i < 3; i++) {
+new ConsumerRecord<>(topic, i, i, 0L, TimestampType.CREATE_TIME,
+0, 0, 2, String.valueOf(i), new RecordHeaders(), 
Optional.empty());
+}
+
+records.put(new TopicPartition(topic, 0), new ArrayList<>());
+records.put(new TopicPartition(topic, 1), Arrays.asList(record1, 
record2));
+records.put(new TopicPartition(topic, 2), new ArrayList<>());
+return records;
+}
+
+private Map>> 
buildTopicTestRecords(int recordSize, String... topics) {

Review Comment:
   Maybe this method can return `ConsumerRecords` directly?



##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -59,4 +49,72 @@ public void iterator() throws Exception {
 }
 assertEquals(2, c);
 }
+
+@Test
+public void testRecordsWithNullTopic() {
+String nullTopic = null;
+Map>> records = 
new LinkedHashMap<>();
+ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
+
+@Test
+public void testRecords() {
+String[] topics = {"topic1", "topic2", "topic3", "topic4"};
+int recordSize = 3;
+Map>> 
partitionToRecords = buildTopicTestRecords(recordSize, topics);
+ConsumerRecords consumerRecords = new 
ConsumerRecords<>(partitionToRecords);
+for (String topic : topics) {
+Iterable> records = 
consumerRecords.records(topic);
+
+int count  = 0;
+Iterator> iterator = 
records.iterator();
+for (; iterator.hasNext() && count < recordSize; count++) {
+ConsumerRecord record = iterator.next();
+assertEquals(topic, record.topic());
+assertEquals(count, record.partition());
+assertEquals(count, record.offset());
+assertEquals(count, record.key());
+assertEquals(String.valueOf(count), record.value());
+}
+}
+}
+
+private Map>> 
buildSingleTopicTestRecords(String topic) {
+Map>> records = 
new LinkedHashMap<>();
+ConsumerRecord record1 = new ConsumerRecord<>(topic, 
1, 0, 0L, TimestampType.CREATE_TIME,
+0, 0, 1, "value1", new RecordHeaders(), Optional.empty());
+ConsumerRecord record2 = new ConsumerRecord<>(topic, 
1, 1, 0L, TimestampType.CREATE_TIME,
+0, 0, 2, "value2", new RecordHeaders(), Optional.empty());
+
+new ArrayList<>();

Review Comment:
   exception?



##

Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-07 Thread via GitHub


frankvicky commented on PR #16227:
URL: https://github.com/apache/kafka/pull/16227#issuecomment-2154878391

   Hi @chia7712, I have modified the test case to cover the scenario of 
multiple topics and records, PLTA 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-06 Thread via GitHub


frankvicky commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1630620777


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -59,4 +50,47 @@ public void iterator() throws Exception {
 }
 assertEquals(2, c);
 }
+
+@Test
+public void testRecordsWithNullTopic() {
+String nullTopic = null;
+Map>> records = 
new LinkedHashMap<>();
+ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
+
+@Test
+public void testRecords() {
+String targetTopic = "topic";
+Map>> records = 
buildTestRecordsWithDummyRecords(targetTopic);

Review Comment:
   Sure, will do it  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]

2024-06-06 Thread via GitHub


chia7712 commented on code in PR #16227:
URL: https://github.com/apache/kafka/pull/16227#discussion_r1629889551


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##
@@ -59,4 +50,47 @@ public void iterator() throws Exception {
 }
 assertEquals(2, c);
 }
+
+@Test
+public void testRecordsWithNullTopic() {
+String nullTopic = null;
+Map>> records = 
new LinkedHashMap<>();
+ConsumerRecords consumerRecords = new 
ConsumerRecords<>(records);
+IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, () -> 
consumerRecords.records(nullTopic));
+assertEquals("Topic must be non-null.", exception.getMessage());
+}
+
+@Test
+public void testRecords() {
+String targetTopic = "topic";
+Map>> records = 
buildTestRecordsWithDummyRecords(targetTopic);

Review Comment:
   Could you build the `ConsumerRecords` with multiple topics? Also, we should 
verify the record for each topic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org