Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
chia7712 merged PR #16227: URL: https://github.com/apache/kafka/pull/16227 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on PR #16227: URL: https://github.com/apache/kafka/pull/16227#issuecomment-2161999580 Hi @chia7712, I have do some small changes based on your feedback, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1635735059 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -31,32 +31,153 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test -public void iterator() throws Exception { +public void testIterator() throws Exception { Review Comment: Oops, I will remove it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1635734783 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -31,32 +31,153 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test -public void iterator() throws Exception { +public void testIterator() throws Exception { +String topic = "topic"; +int recordSize = 10; +int partitionSize = 15; +int emptyPartitionIndex = 3; +ConsumerRecords records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, Collections.singleton(topic)); +Iterator> iterator = records.iterator(); -Map>> records = new LinkedHashMap<>(); +int recordCount = 0; +int partitionCount = 0; +int currentPartition = -1; -String topic = "topic"; -records.put(new TopicPartition(topic, 0), new ArrayList<>()); -ConsumerRecord record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, -0, 0, 1, "value1", new RecordHeaders(), Optional.empty()); -ConsumerRecord record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, -0, 0, 2, "value2", new RecordHeaders(), Optional.empty()); -records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2)); -records.put(new TopicPartition(topic, 2), new ArrayList<>()); - -ConsumerRecords consumerRecords = new ConsumerRecords<>(records); -Iterator> iter = consumerRecords.iterator(); - -int c = 0; -for (; iter.hasNext(); c++) { -ConsumerRecord record = iter.next(); -assertEquals(1, record.partition()); -assertEquals(topic, record.topic()); -assertEquals(c, record.offset()); +while (iterator.hasNext()) { +ConsumerRecord record = iterator.next(); +validateEmptyPartition(record, emptyPartitionIndex); + +// Check if we have moved to a new partition +if (currentPartition != record.partition()) { +// Increment the partition count as we have encountered a new partition +partitionCount++; +// Update the current partition to the new partition +currentPartition = record.partition(); +} + +validateRecordPayload(topic, record, currentPartition, recordCount, recordSize); +recordCount++; +} + +// Including empty partition +assertEquals(partitionSize, partitionCount + 1); +} + +@Test +public void testRecordsByPartition() { +List topics = Arrays.asList("topic1", "topic2"); +int recordSize = 3; +int partitionSize = 5; +int emptyPartitionIndex = 2; + +ConsumerRecords consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics); + +for (String topic : topics) { +for (int partition = 0; partition < partitionSize; partition++) { +TopicPartition topicPartition = new TopicPartition(topic, partition); +List> records = consumerRecords.records(topicPartition); + +if (partition == emptyPartitionIndex) { +assertTrue(records.isEmpty()); +} else { +assertEquals(recordSize, records.size()); +for (int i = 0; i < records.size(); i++) { +ConsumerRecord record = records.get(i); +validateRecordPayload(topic, record, partition, i, recordSize); +} +} +} } -assertEquals(2, c); +} + +@Test +public void testRecordsByNullTopic() { +String nullTopic = null; +ConsumerRecords consumerRecords = ConsumerRecords.empty(); +IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); +assertEquals("Topic must be non-null.", exception.getMessage()); +} + + +@Test +public void testRecordsByTopic() { +List topics = Arrays.asList("topic1", "topic2", "topic3", "topic4"); +int recordSize = 3; +int partitionSize = 10; +int emptyPartitionIndex = 6; +int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize - 1); + +ConsumerRecords consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics); + +for (String topic : topics) { +
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
chia7712 commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1635380850 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -31,32 +31,153 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test -public void iterator() throws Exception { +public void testIterator() throws Exception { Review Comment: `Exception` is unused. ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -31,32 +31,153 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test -public void iterator() throws Exception { +public void testIterator() throws Exception { +String topic = "topic"; +int recordSize = 10; +int partitionSize = 15; +int emptyPartitionIndex = 3; +ConsumerRecords records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, Collections.singleton(topic)); +Iterator> iterator = records.iterator(); -Map>> records = new LinkedHashMap<>(); +int recordCount = 0; +int partitionCount = 0; +int currentPartition = -1; -String topic = "topic"; -records.put(new TopicPartition(topic, 0), new ArrayList<>()); -ConsumerRecord record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, -0, 0, 1, "value1", new RecordHeaders(), Optional.empty()); -ConsumerRecord record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, -0, 0, 2, "value2", new RecordHeaders(), Optional.empty()); -records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2)); -records.put(new TopicPartition(topic, 2), new ArrayList<>()); - -ConsumerRecords consumerRecords = new ConsumerRecords<>(records); -Iterator> iter = consumerRecords.iterator(); - -int c = 0; -for (; iter.hasNext(); c++) { -ConsumerRecord record = iter.next(); -assertEquals(1, record.partition()); -assertEquals(topic, record.topic()); -assertEquals(c, record.offset()); +while (iterator.hasNext()) { +ConsumerRecord record = iterator.next(); +validateEmptyPartition(record, emptyPartitionIndex); + +// Check if we have moved to a new partition +if (currentPartition != record.partition()) { +// Increment the partition count as we have encountered a new partition +partitionCount++; +// Update the current partition to the new partition +currentPartition = record.partition(); +} + +validateRecordPayload(topic, record, currentPartition, recordCount, recordSize); +recordCount++; +} + +// Including empty partition +assertEquals(partitionSize, partitionCount + 1); +} + +@Test +public void testRecordsByPartition() { +List topics = Arrays.asList("topic1", "topic2"); +int recordSize = 3; +int partitionSize = 5; +int emptyPartitionIndex = 2; + +ConsumerRecords consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics); + +for (String topic : topics) { +for (int partition = 0; partition < partitionSize; partition++) { +TopicPartition topicPartition = new TopicPartition(topic, partition); +List> records = consumerRecords.records(topicPartition); + +if (partition == emptyPartitionIndex) { +assertTrue(records.isEmpty()); +} else { +assertEquals(recordSize, records.size()); +for (int i = 0; i < records.size(); i++) { +ConsumerRecord record = records.get(i); +validateRecordPayload(topic, record, partition, i, recordSize); +} +} +} } -assertEquals(2, c); +} + +@Test +public void testRecordsByNullTopic() { +String nullTopic = null; +ConsumerRecords consumerRecords = ConsumerRecords.empty(); +IllegalArgumentException exception =
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on PR #16227: URL: https://github.com/apache/kafka/pull/16227#issuecomment-2159863343 Hi @chia7712, I have do a small change based on your feedback, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1634144659 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -31,32 +31,155 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test -public void iterator() throws Exception { +public void testIterator() throws Exception { +String topic = "topic"; +int recordSize = 10; +int partitionSize = 15; +int emptyPartitionIndex = 3; +ConsumerRecords records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, Collections.singleton(topic)); +Iterator> iterator = records.iterator(); -Map>> records = new LinkedHashMap<>(); +int recordCount = 0; +int partitionCount = 0; +int currentPartition = -1; -String topic = "topic"; -records.put(new TopicPartition(topic, 0), new ArrayList<>()); -ConsumerRecord record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, -0, 0, 1, "value1", new RecordHeaders(), Optional.empty()); -ConsumerRecord record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, -0, 0, 2, "value2", new RecordHeaders(), Optional.empty()); -records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2)); -records.put(new TopicPartition(topic, 2), new ArrayList<>()); - -ConsumerRecords consumerRecords = new ConsumerRecords<>(records); -Iterator> iter = consumerRecords.iterator(); - -int c = 0; -for (; iter.hasNext(); c++) { -ConsumerRecord record = iter.next(); -assertEquals(1, record.partition()); -assertEquals(topic, record.topic()); -assertEquals(c, record.offset()); +while (iterator.hasNext()) { +ConsumerRecord record = iterator.next(); +validateEmptyPartition(record, emptyPartitionIndex); + +// Check if we have moved to a new partition +if (currentPartition != record.partition()) { +// Increment the partition count as we have encountered a new partition +partitionCount++; +// Update the current partition to the new partition +currentPartition = record.partition(); +} + +validateRecordPayload(topic, record, currentPartition, recordCount, recordSize); +recordCount++; +} + +// Including empty partition +assertEquals(partitionSize, partitionCount + 1); +} + +@Test +public void testRecordsByPartition() { +List topics = Arrays.asList("topic1", "topic2"); +int recordSize = 3; +int partitionSize = 5; +int emptyPartitionIndex = 2; + +ConsumerRecords consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics); + +for (String topic : topics) { +for (int partition = 0; partition < partitionSize; partition++) { +TopicPartition topicPartition = new TopicPartition(topic, partition); +List> records = consumerRecords.records(topicPartition); + +if (partition == emptyPartitionIndex) { +assertTrue(records.isEmpty()); +} else { +assertEquals(recordSize, records.size()); +for (int i = 0; i < records.size(); i++) { +ConsumerRecord record = records.get(i); +validateRecordPayload(topic, record, partition, i, recordSize); +} +} +} } -assertEquals(2, c); +} + +@Test +public void testRecordsByNullTopic() { +String nullTopic = null; +ConsumerRecords consumerRecords = ConsumerRecords.empty(); +IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); +assertEquals("Topic must be non-null.", exception.getMessage()); +} + + +@Test +public void testRecordsByTopic() { +List topics = Arrays.asList("topic1", "topic2", "topic3", "topic4"); +int recordSize = 3; +int partitionSize = 10; +int emptyPartitionIndex = 6; +int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize - 1); + +ConsumerRecords consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics); + +for (String topic : topics) { +
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
chia7712 commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1634004534 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -31,32 +31,155 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test -public void iterator() throws Exception { +public void testIterator() throws Exception { +String topic = "topic"; +int recordSize = 10; +int partitionSize = 15; +int emptyPartitionIndex = 3; +ConsumerRecords records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, Collections.singleton(topic)); +Iterator> iterator = records.iterator(); -Map>> records = new LinkedHashMap<>(); +int recordCount = 0; +int partitionCount = 0; +int currentPartition = -1; -String topic = "topic"; -records.put(new TopicPartition(topic, 0), new ArrayList<>()); -ConsumerRecord record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, -0, 0, 1, "value1", new RecordHeaders(), Optional.empty()); -ConsumerRecord record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, -0, 0, 2, "value2", new RecordHeaders(), Optional.empty()); -records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2)); -records.put(new TopicPartition(topic, 2), new ArrayList<>()); - -ConsumerRecords consumerRecords = new ConsumerRecords<>(records); -Iterator> iter = consumerRecords.iterator(); - -int c = 0; -for (; iter.hasNext(); c++) { -ConsumerRecord record = iter.next(); -assertEquals(1, record.partition()); -assertEquals(topic, record.topic()); -assertEquals(c, record.offset()); +while (iterator.hasNext()) { +ConsumerRecord record = iterator.next(); +validateEmptyPartition(record, emptyPartitionIndex); + +// Check if we have moved to a new partition +if (currentPartition != record.partition()) { +// Increment the partition count as we have encountered a new partition +partitionCount++; +// Update the current partition to the new partition +currentPartition = record.partition(); +} + +validateRecordPayload(topic, record, currentPartition, recordCount, recordSize); +recordCount++; +} + +// Including empty partition +assertEquals(partitionSize, partitionCount + 1); +} + +@Test +public void testRecordsByPartition() { +List topics = Arrays.asList("topic1", "topic2"); +int recordSize = 3; +int partitionSize = 5; +int emptyPartitionIndex = 2; + +ConsumerRecords consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics); + +for (String topic : topics) { +for (int partition = 0; partition < partitionSize; partition++) { +TopicPartition topicPartition = new TopicPartition(topic, partition); +List> records = consumerRecords.records(topicPartition); + +if (partition == emptyPartitionIndex) { +assertTrue(records.isEmpty()); +} else { +assertEquals(recordSize, records.size()); +for (int i = 0; i < records.size(); i++) { +ConsumerRecord record = records.get(i); +validateRecordPayload(topic, record, partition, i, recordSize); +} +} +} } -assertEquals(2, c); +} + +@Test +public void testRecordsByNullTopic() { +String nullTopic = null; +ConsumerRecords consumerRecords = ConsumerRecords.empty(); +IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); +assertEquals("Topic must be non-null.", exception.getMessage()); +} + + +@Test +public void testRecordsByTopic() { +List topics = Arrays.asList("topic1", "topic2", "topic3", "topic4"); +int recordSize = 3; +int partitionSize = 10; +int emptyPartitionIndex = 6; +int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize - 1); + +ConsumerRecords consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics); + +for (String topic : topics) { +
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on PR #16227: URL: https://github.com/apache/kafka/pull/16227#issuecomment-2157060477 Hi @chia7712 , I have do some changes and add a test case, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1632484396 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -31,32 +31,129 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test public void iterator() throws Exception { +String topic = "topic"; +int recordSize = 10; +int partitionSize = 15; +int emptyPartitionIndex = 3; +ConsumerRecords records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, Collections.singleton(topic)); +Iterator> iterator = records.iterator(); -Map>> records = new LinkedHashMap<>(); +int recordCount = 0; +int partitionCount = 0; +int currentPartition = -1; + +while (iterator.hasNext()) { +ConsumerRecord record = iterator.next(); + +if (record.partition() == emptyPartitionIndex) { +fail("Partition " + emptyPartitionIndex + " is not empty"); +} + +// Check if we have moved to a new partition +if (currentPartition != record.partition()) { +// Increment the partition count as we have encountered a new partition +partitionCount++; +// Update the current partition to the new partition +currentPartition = record.partition(); +} -String topic = "topic"; -records.put(new TopicPartition(topic, 0), new ArrayList<>()); -ConsumerRecord record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, -0, 0, 1, "value1", new RecordHeaders(), Optional.empty()); -ConsumerRecord record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, -0, 0, 2, "value2", new RecordHeaders(), Optional.empty()); -records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2)); -records.put(new TopicPartition(topic, 2), new ArrayList<>()); - -ConsumerRecords consumerRecords = new ConsumerRecords<>(records); -Iterator> iter = consumerRecords.iterator(); - -int c = 0; -for (; iter.hasNext(); c++) { -ConsumerRecord record = iter.next(); -assertEquals(1, record.partition()); assertEquals(topic, record.topic()); -assertEquals(c, record.offset()); +assertEquals(currentPartition, record.partition()); +assertEquals(recordCount % recordSize, record.offset()); +assertEquals(recordCount % recordSize, record.key()); +assertEquals(String.valueOf(recordCount % recordSize), record.value()); + +recordCount++; } -assertEquals(2, c); + +// Including empty partition +assertEquals(partitionSize, partitionCount + 1); +} + +@Test +public void testRecordsWithNullTopic() { +String nullTopic = null; +ConsumerRecords consumerRecords = ConsumerRecords.empty(); +IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); +assertEquals("Topic must be non-null.", exception.getMessage()); +} + + +@Test +public void testRecords() { Review Comment: I will do it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1632484236 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -31,32 +31,129 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test public void iterator() throws Exception { +String topic = "topic"; +int recordSize = 10; +int partitionSize = 15; +int emptyPartitionIndex = 3; +ConsumerRecords records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, Collections.singleton(topic)); +Iterator> iterator = records.iterator(); -Map>> records = new LinkedHashMap<>(); +int recordCount = 0; +int partitionCount = 0; +int currentPartition = -1; + +while (iterator.hasNext()) { +ConsumerRecord record = iterator.next(); + +if (record.partition() == emptyPartitionIndex) { +fail("Partition " + emptyPartitionIndex + " is not empty"); +} + +// Check if we have moved to a new partition +if (currentPartition != record.partition()) { +// Increment the partition count as we have encountered a new partition +partitionCount++; +// Update the current partition to the new partition +currentPartition = record.partition(); +} -String topic = "topic"; -records.put(new TopicPartition(topic, 0), new ArrayList<>()); -ConsumerRecord record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, -0, 0, 1, "value1", new RecordHeaders(), Optional.empty()); -ConsumerRecord record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, -0, 0, 2, "value2", new RecordHeaders(), Optional.empty()); -records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2)); -records.put(new TopicPartition(topic, 2), new ArrayList<>()); - -ConsumerRecords consumerRecords = new ConsumerRecords<>(records); -Iterator> iter = consumerRecords.iterator(); - -int c = 0; -for (; iter.hasNext(); c++) { -ConsumerRecord record = iter.next(); -assertEquals(1, record.partition()); assertEquals(topic, record.topic()); -assertEquals(c, record.offset()); +assertEquals(currentPartition, record.partition()); +assertEquals(recordCount % recordSize, record.offset()); +assertEquals(recordCount % recordSize, record.key()); +assertEquals(String.valueOf(recordCount % recordSize), record.value()); + +recordCount++; } -assertEquals(2, c); + +// Including empty partition +assertEquals(partitionSize, partitionCount + 1); +} + +@Test +public void testRecordsWithNullTopic() { +String nullTopic = null; +ConsumerRecords consumerRecords = ConsumerRecords.empty(); +IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); +assertEquals("Topic must be non-null.", exception.getMessage()); +} + + +@Test +public void testRecords() { +List topics = Arrays.asList("topic1", "topic2", "topic3", "topic4"); +int recordSize = 3; +int partitionSize = 10; +int emptyPartitionIndex = 6; +int expectedTotalRecordSizeOfEachTopic = recordSize * (partitionSize - 1); + +ConsumerRecords consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics); + +for (String topic : topics) { +Iterable> records = consumerRecords.records(topic); +Iterator> iterator = records.iterator(); +int recordCount = 0; +int partitionCount = 0; +int currentPartition = -1; + +while (iterator.hasNext()) { +ConsumerRecord record = iterator.next(); + +if (record.partition() == emptyPartitionIndex) { +fail("Partition " + emptyPartitionIndex + " is not empty"); +} + +// Check if we have moved to a new partition +if (currentPartition != record.partition()) { +// Increment the partition count as we have encountered a new partition +partitionCount++; +// Update the current partition to the new partition +currentPartition = record.partition(); +} + +assertEquals(topic,
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1632481576 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -31,32 +31,129 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test public void iterator() throws Exception { Review Comment: Sure, don't even notice this case doesn't have `test` prefix 藍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
chia7712 commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1632355536 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -31,32 +31,129 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test public void iterator() throws Exception { Review Comment: could you please rename it to `testIterator`? ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -31,32 +31,129 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test public void iterator() throws Exception { +String topic = "topic"; +int recordSize = 10; +int partitionSize = 15; +int emptyPartitionIndex = 3; +ConsumerRecords records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, Collections.singleton(topic)); +Iterator> iterator = records.iterator(); -Map>> records = new LinkedHashMap<>(); +int recordCount = 0; +int partitionCount = 0; +int currentPartition = -1; + +while (iterator.hasNext()) { +ConsumerRecord record = iterator.next(); + +if (record.partition() == emptyPartitionIndex) { +fail("Partition " + emptyPartitionIndex + " is not empty"); +} + +// Check if we have moved to a new partition +if (currentPartition != record.partition()) { +// Increment the partition count as we have encountered a new partition +partitionCount++; +// Update the current partition to the new partition +currentPartition = record.partition(); +} -String topic = "topic"; -records.put(new TopicPartition(topic, 0), new ArrayList<>()); -ConsumerRecord record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, -0, 0, 1, "value1", new RecordHeaders(), Optional.empty()); -ConsumerRecord record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, -0, 0, 2, "value2", new RecordHeaders(), Optional.empty()); -records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2)); -records.put(new TopicPartition(topic, 2), new ArrayList<>()); - -ConsumerRecords consumerRecords = new ConsumerRecords<>(records); -Iterator> iter = consumerRecords.iterator(); - -int c = 0; -for (; iter.hasNext(); c++) { -ConsumerRecord record = iter.next(); -assertEquals(1, record.partition()); assertEquals(topic, record.topic()); -assertEquals(c, record.offset()); +assertEquals(currentPartition, record.partition()); +assertEquals(recordCount % recordSize, record.offset()); +assertEquals(recordCount % recordSize, record.key()); +assertEquals(String.valueOf(recordCount % recordSize), record.value()); + +recordCount++; } -assertEquals(2, c); + +// Including empty partition +assertEquals(partitionSize, partitionCount + 1); +} + +@Test +public void testRecordsWithNullTopic() { +String nullTopic = null; +ConsumerRecords consumerRecords = ConsumerRecords.empty(); +IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); +assertEquals("Topic must be non-null.", exception.getMessage()); +} + + +@Test +public void testRecords() { Review Comment: Could you add test for `records(TopicPartition)`? ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -31,32 +31,129 @@ import org.apache.kafka.common.record.TimestampType; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + public class ConsumerRecordsTest { @Test public void iterator() throws Exception { +String topic = "topic"; +int recordSize = 10; +int partitionSize = 15; +int emptyPartitionIndex = 3; +ConsumerRecords records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex,
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on PR #16227: URL: https://github.com/apache/kafka/pull/16227#issuecomment-2156349665 Hi @chia7712 , I have do some refactors, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
chia7712 commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1632067288 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -35,28 +38,109 @@ public class ConsumerRecordsTest { @Test public void iterator() throws Exception { +String topic = "topic"; +int recordSize = 10; +int partitionSize = 15; +int emptyPartitionInterval = 3; Review Comment: Could we define specific partition to be empty? that will get simplified I think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on PR #16227: URL: https://github.com/apache/kafka/pull/16227#issuecomment-2156063782 Hi @chia7712 , I do some changes based on comments, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1632055803 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -35,28 +35,83 @@ public class ConsumerRecordsTest { @Test public void iterator() throws Exception { +String topic = "topic"; +int recordSize = 10; +int partitionSize = 10; +int emptyPartitionInterval = 3; +ConsumerRecords records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, topic); +Iterator> iterator = records.iterator(); + +for (int count = 0; iterator.hasNext(); count++) { +if (count % emptyPartitionInterval != 0) { +int offset = 0; +for (; offset < recordSize; offset++) { +ConsumerRecord record = iterator.next(); +assertEquals(count, record.partition()); +assertEquals(topic, record.topic()); +assertEquals(offset, record.offset()); Review Comment: Oops, I miss both of them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1632043299 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -35,28 +35,83 @@ public class ConsumerRecordsTest { @Test public void iterator() throws Exception { +String topic = "topic"; +int recordSize = 10; +int partitionSize = 10; +int emptyPartitionInterval = 3; +ConsumerRecords records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, topic); +Iterator> iterator = records.iterator(); + +for (int count = 0; iterator.hasNext(); count++) { +if (count % emptyPartitionInterval != 0) { +int offset = 0; +for (; offset < recordSize; offset++) { +ConsumerRecord record = iterator.next(); +assertEquals(count, record.partition()); +assertEquals(topic, record.topic()); +assertEquals(offset, record.offset()); +} +assertEquals(recordSize, offset); +} +} +} +@Test +public void testRecordsWithNullTopic() { +String nullTopic = null; Map>> records = new LinkedHashMap<>(); +ConsumerRecords consumerRecords = new ConsumerRecords<>(records); +IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); +assertEquals("Topic must be non-null.", exception.getMessage()); +} -String topic = "topic"; -records.put(new TopicPartition(topic, 0), new ArrayList<>()); -ConsumerRecord record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, -0, 0, 1, "value1", new RecordHeaders(), Optional.empty()); -ConsumerRecord record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, -0, 0, 2, "value2", new RecordHeaders(), Optional.empty()); -records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2)); -records.put(new TopicPartition(topic, 2), new ArrayList<>()); +@Test +public void testRecords() { +String[] topics = {"topic1", "topic2", "topic3", "topic4"}; +int recordSize = 3; +int partitionSize = 10; +int emptyPartitionInterval = 3; +ConsumerRecords consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, topics); -ConsumerRecords consumerRecords = new ConsumerRecords<>(records); -Iterator> iter = consumerRecords.iterator(); - -int c = 0; -for (; iter.hasNext(); c++) { -ConsumerRecord record = iter.next(); -assertEquals(1, record.partition()); -assertEquals(topic, record.topic()); -assertEquals(c, record.offset()); +for (String topic : topics) { +Iterable> records = consumerRecords.records(topic); +Iterator> iterator = records.iterator(); + +for (int count = 0; iterator.hasNext(); count++) { Review Comment: Sure, I will add the verification of it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1632040486 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -35,28 +35,83 @@ public class ConsumerRecordsTest { @Test public void iterator() throws Exception { +String topic = "topic"; +int recordSize = 10; +int partitionSize = 10; +int emptyPartitionInterval = 3; +ConsumerRecords records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, topic); +Iterator> iterator = records.iterator(); + +for (int count = 0; iterator.hasNext(); count++) { +if (count % emptyPartitionInterval != 0) { +int offset = 0; +for (; offset < recordSize; offset++) { +ConsumerRecord record = iterator.next(); +assertEquals(count, record.partition()); +assertEquals(topic, record.topic()); +assertEquals(offset, record.offset()); +} +assertEquals(recordSize, offset); +} +} +} +@Test +public void testRecordsWithNullTopic() { +String nullTopic = null; Map>> records = new LinkedHashMap<>(); +ConsumerRecords consumerRecords = new ConsumerRecords<>(records); Review Comment: Cool, I don't know we have this utility function, I will replace with it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
chia7712 commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1632032964 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -35,28 +35,83 @@ public class ConsumerRecordsTest { @Test public void iterator() throws Exception { +String topic = "topic"; +int recordSize = 10; +int partitionSize = 10; +int emptyPartitionInterval = 3; +ConsumerRecords records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, topic); +Iterator> iterator = records.iterator(); + +for (int count = 0; iterator.hasNext(); count++) { +if (count % emptyPartitionInterval != 0) { +int offset = 0; +for (; offset < recordSize; offset++) { +ConsumerRecord record = iterator.next(); +assertEquals(count, record.partition()); +assertEquals(topic, record.topic()); +assertEquals(offset, record.offset()); Review Comment: please verify the "key" and "value" ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -35,28 +35,83 @@ public class ConsumerRecordsTest { @Test public void iterator() throws Exception { +String topic = "topic"; +int recordSize = 10; +int partitionSize = 10; +int emptyPartitionInterval = 3; +ConsumerRecords records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, topic); +Iterator> iterator = records.iterator(); + +for (int count = 0; iterator.hasNext(); count++) { +if (count % emptyPartitionInterval != 0) { +int offset = 0; +for (; offset < recordSize; offset++) { +ConsumerRecord record = iterator.next(); +assertEquals(count, record.partition()); +assertEquals(topic, record.topic()); +assertEquals(offset, record.offset()); +} +assertEquals(recordSize, offset); +} +} +} +@Test +public void testRecordsWithNullTopic() { +String nullTopic = null; Map>> records = new LinkedHashMap<>(); +ConsumerRecords consumerRecords = new ConsumerRecords<>(records); +IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); +assertEquals("Topic must be non-null.", exception.getMessage()); +} -String topic = "topic"; -records.put(new TopicPartition(topic, 0), new ArrayList<>()); -ConsumerRecord record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, -0, 0, 1, "value1", new RecordHeaders(), Optional.empty()); -ConsumerRecord record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, -0, 0, 2, "value2", new RecordHeaders(), Optional.empty()); -records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2)); -records.put(new TopicPartition(topic, 2), new ArrayList<>()); +@Test +public void testRecords() { +String[] topics = {"topic1", "topic2", "topic3", "topic4"}; +int recordSize = 3; +int partitionSize = 10; +int emptyPartitionInterval = 3; +ConsumerRecords consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, topics); -ConsumerRecords consumerRecords = new ConsumerRecords<>(records); -Iterator> iter = consumerRecords.iterator(); - -int c = 0; -for (; iter.hasNext(); c++) { -ConsumerRecord record = iter.next(); -assertEquals(1, record.partition()); -assertEquals(topic, record.topic()); -assertEquals(c, record.offset()); +for (String topic : topics) { +Iterable> records = consumerRecords.records(topic); +Iterator> iterator = records.iterator(); + +for (int count = 0; iterator.hasNext(); count++) { Review Comment: Could you verify the number of records from the iterable/iterator? ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -35,28 +35,83 @@ public class ConsumerRecordsTest { @Test public void iterator() throws Exception { +String topic = "topic"; +int recordSize = 10; +int partitionSize = 10; +int emptyPartitionInterval = 3; +ConsumerRecords records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionInterval, topic); +Iterator> iterator = records.iterator(); + +for (int count = 0; iterator.hasNext(); count++) { +if (count % emptyPartitionInterval != 0) { +int offset = 0; +
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on PR #16227: URL: https://github.com/apache/kafka/pull/16227#issuecomment-2155814048 Hi @chia7712, I do few changes based on comments, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1631833896 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -59,4 +49,72 @@ public void iterator() throws Exception { } assertEquals(2, c); } + +@Test +public void testRecordsWithNullTopic() { +String nullTopic = null; +Map>> records = new LinkedHashMap<>(); +ConsumerRecords consumerRecords = new ConsumerRecords<>(records); +IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); +assertEquals("Topic must be non-null.", exception.getMessage()); +} + +@Test +public void testRecords() { +String[] topics = {"topic1", "topic2", "topic3", "topic4"}; +int recordSize = 3; +Map>> partitionToRecords = buildTopicTestRecords(recordSize, topics); +ConsumerRecords consumerRecords = new ConsumerRecords<>(partitionToRecords); +for (String topic : topics) { +Iterable> records = consumerRecords.records(topic); + +int count = 0; +Iterator> iterator = records.iterator(); +for (; iterator.hasNext() && count < recordSize; count++) { +ConsumerRecord record = iterator.next(); +assertEquals(topic, record.topic()); +assertEquals(count, record.partition()); +assertEquals(count, record.offset()); +assertEquals(count, record.key()); +assertEquals(String.valueOf(count), record.value()); +} +} +} + +private Map>> buildSingleTopicTestRecords(String topic) { Review Comment: I keep this because the test case `iterator` has it' own test logic but I will try to rewrite it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1631832885 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -59,4 +49,72 @@ public void iterator() throws Exception { } assertEquals(2, c); } + +@Test +public void testRecordsWithNullTopic() { +String nullTopic = null; +Map>> records = new LinkedHashMap<>(); +ConsumerRecords consumerRecords = new ConsumerRecords<>(records); +IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); +assertEquals("Topic must be non-null.", exception.getMessage()); +} + +@Test +public void testRecords() { +String[] topics = {"topic1", "topic2", "topic3", "topic4"}; +int recordSize = 3; +Map>> partitionToRecords = buildTopicTestRecords(recordSize, topics); +ConsumerRecords consumerRecords = new ConsumerRecords<>(partitionToRecords); +for (String topic : topics) { +Iterable> records = consumerRecords.records(topic); + +int count = 0; +Iterator> iterator = records.iterator(); +for (; iterator.hasNext() && count < recordSize; count++) { +ConsumerRecord record = iterator.next(); +assertEquals(topic, record.topic()); +assertEquals(count, record.partition()); +assertEquals(count, record.offset()); +assertEquals(count, record.key()); +assertEquals(String.valueOf(count), record.value()); +} +} +} + +private Map>> buildSingleTopicTestRecords(String topic) { +Map>> records = new LinkedHashMap<>(); +ConsumerRecord record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, +0, 0, 1, "value1", new RecordHeaders(), Optional.empty()); +ConsumerRecord record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, +0, 0, 2, "value2", new RecordHeaders(), Optional.empty()); + +new ArrayList<>(); Review Comment: Oops, I accidentally leave this, I will remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1631832337 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -59,4 +49,72 @@ public void iterator() throws Exception { } assertEquals(2, c); } + +@Test +public void testRecordsWithNullTopic() { +String nullTopic = null; +Map>> records = new LinkedHashMap<>(); +ConsumerRecords consumerRecords = new ConsumerRecords<>(records); +IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); +assertEquals("Topic must be non-null.", exception.getMessage()); +} + +@Test +public void testRecords() { +String[] topics = {"topic1", "topic2", "topic3", "topic4"}; +int recordSize = 3; +Map>> partitionToRecords = buildTopicTestRecords(recordSize, topics); +ConsumerRecords consumerRecords = new ConsumerRecords<>(partitionToRecords); +for (String topic : topics) { +Iterable> records = consumerRecords.records(topic); + +int count = 0; +Iterator> iterator = records.iterator(); +for (; iterator.hasNext() && count < recordSize; count++) { +ConsumerRecord record = iterator.next(); +assertEquals(topic, record.topic()); +assertEquals(count, record.partition()); +assertEquals(count, record.offset()); +assertEquals(count, record.key()); +assertEquals(String.valueOf(count), record.value()); +} +} +} + +private Map>> buildSingleTopicTestRecords(String topic) { +Map>> records = new LinkedHashMap<>(); +ConsumerRecord record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, +0, 0, 1, "value1", new RecordHeaders(), Optional.empty()); +ConsumerRecord record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, +0, 0, 2, "value2", new RecordHeaders(), Optional.empty()); + +new ArrayList<>(); +for (int i = 0; i < 3; i++) { +new ConsumerRecord<>(topic, i, i, 0L, TimestampType.CREATE_TIME, +0, 0, 2, String.valueOf(i), new RecordHeaders(), Optional.empty()); +} + +records.put(new TopicPartition(topic, 0), new ArrayList<>()); +records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2)); +records.put(new TopicPartition(topic, 2), new ArrayList<>()); +return records; +} + +private Map>> buildTopicTestRecords(int recordSize, String... topics) { Review Comment: Yes, it is more clear that we return `ConsumerRecords` directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
chia7712 commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1631754807 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -59,4 +49,72 @@ public void iterator() throws Exception { } assertEquals(2, c); } + +@Test +public void testRecordsWithNullTopic() { +String nullTopic = null; +Map>> records = new LinkedHashMap<>(); +ConsumerRecords consumerRecords = new ConsumerRecords<>(records); +IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); +assertEquals("Topic must be non-null.", exception.getMessage()); +} + +@Test +public void testRecords() { +String[] topics = {"topic1", "topic2", "topic3", "topic4"}; +int recordSize = 3; +Map>> partitionToRecords = buildTopicTestRecords(recordSize, topics); +ConsumerRecords consumerRecords = new ConsumerRecords<>(partitionToRecords); +for (String topic : topics) { +Iterable> records = consumerRecords.records(topic); + +int count = 0; +Iterator> iterator = records.iterator(); +for (; iterator.hasNext() && count < recordSize; count++) { +ConsumerRecord record = iterator.next(); +assertEquals(topic, record.topic()); +assertEquals(count, record.partition()); +assertEquals(count, record.offset()); +assertEquals(count, record.key()); +assertEquals(String.valueOf(count), record.value()); +} +} +} + +private Map>> buildSingleTopicTestRecords(String topic) { +Map>> records = new LinkedHashMap<>(); +ConsumerRecord record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, +0, 0, 1, "value1", new RecordHeaders(), Optional.empty()); +ConsumerRecord record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, +0, 0, 2, "value2", new RecordHeaders(), Optional.empty()); + +new ArrayList<>(); +for (int i = 0; i < 3; i++) { +new ConsumerRecord<>(topic, i, i, 0L, TimestampType.CREATE_TIME, +0, 0, 2, String.valueOf(i), new RecordHeaders(), Optional.empty()); +} + +records.put(new TopicPartition(topic, 0), new ArrayList<>()); +records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2)); +records.put(new TopicPartition(topic, 2), new ArrayList<>()); +return records; +} + +private Map>> buildTopicTestRecords(int recordSize, String... topics) { Review Comment: Maybe this method can return `ConsumerRecords` directly? ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -59,4 +49,72 @@ public void iterator() throws Exception { } assertEquals(2, c); } + +@Test +public void testRecordsWithNullTopic() { +String nullTopic = null; +Map>> records = new LinkedHashMap<>(); +ConsumerRecords consumerRecords = new ConsumerRecords<>(records); +IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); +assertEquals("Topic must be non-null.", exception.getMessage()); +} + +@Test +public void testRecords() { +String[] topics = {"topic1", "topic2", "topic3", "topic4"}; +int recordSize = 3; +Map>> partitionToRecords = buildTopicTestRecords(recordSize, topics); +ConsumerRecords consumerRecords = new ConsumerRecords<>(partitionToRecords); +for (String topic : topics) { +Iterable> records = consumerRecords.records(topic); + +int count = 0; +Iterator> iterator = records.iterator(); +for (; iterator.hasNext() && count < recordSize; count++) { +ConsumerRecord record = iterator.next(); +assertEquals(topic, record.topic()); +assertEquals(count, record.partition()); +assertEquals(count, record.offset()); +assertEquals(count, record.key()); +assertEquals(String.valueOf(count), record.value()); +} +} +} + +private Map>> buildSingleTopicTestRecords(String topic) { +Map>> records = new LinkedHashMap<>(); +ConsumerRecord record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, +0, 0, 1, "value1", new RecordHeaders(), Optional.empty()); +ConsumerRecord record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, +0, 0, 2, "value2", new RecordHeaders(), Optional.empty()); + +new ArrayList<>(); Review Comment: exception? ##
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on PR #16227: URL: https://github.com/apache/kafka/pull/16227#issuecomment-2154878391 Hi @chia7712, I have modified the test case to cover the scenario of multiple topics and records, PLTA -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
frankvicky commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1630620777 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -59,4 +50,47 @@ public void iterator() throws Exception { } assertEquals(2, c); } + +@Test +public void testRecordsWithNullTopic() { +String nullTopic = null; +Map>> records = new LinkedHashMap<>(); +ConsumerRecords consumerRecords = new ConsumerRecords<>(records); +IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); +assertEquals("Topic must be non-null.", exception.getMessage()); +} + +@Test +public void testRecords() { +String targetTopic = "topic"; +Map>> records = buildTestRecordsWithDummyRecords(targetTopic); Review Comment: Sure, will do it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16901: Add unit tests for ConsumerRecords#records(String) [kafka]
chia7712 commented on code in PR #16227: URL: https://github.com/apache/kafka/pull/16227#discussion_r1629889551 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java: ## @@ -59,4 +50,47 @@ public void iterator() throws Exception { } assertEquals(2, c); } + +@Test +public void testRecordsWithNullTopic() { +String nullTopic = null; +Map>> records = new LinkedHashMap<>(); +ConsumerRecords consumerRecords = new ConsumerRecords<>(records); +IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> consumerRecords.records(nullTopic)); +assertEquals("Topic must be non-null.", exception.getMessage()); +} + +@Test +public void testRecords() { +String targetTopic = "topic"; +Map>> records = buildTestRecordsWithDummyRecords(targetTopic); Review Comment: Could you build the `ConsumerRecords` with multiple topics? Also, we should verify the record for each topic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org