PatrickRen commented on a change in pull request #18266:
URL: https://github.com/apache/flink/pull/18266#discussion_r779337802



##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
##########
@@ -248,6 +253,100 @@ public void testAssignEmptySplit() throws Exception {
         assertTrue(recordsWithSplitIds.finishedSplits().isEmpty());
     }
 
+    @Test
+    public void testUsingCommittedOffsetsWithNoneOffsetResetStrategy() {
+        final Properties props = new Properties();
+        props.setProperty(
+                ConsumerConfig.GROUP_ID_CONFIG, 
"using-committed-offset-with-none-offset-reset");

Review comment:
       Maybe we can add a line of comment here, describing that you are using a 
new group ID without any committed offset, so an exception is expected

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
##########
@@ -248,6 +253,100 @@ public void testAssignEmptySplit() throws Exception {
         assertTrue(recordsWithSplitIds.finishedSplits().isEmpty());
     }
 
+    @Test
+    public void testUsingCommittedOffsetsWithNoneOffsetResetStrategy() {
+        final Properties props = new Properties();
+        props.setProperty(
+                ConsumerConfig.GROUP_ID_CONFIG, 
"using-committed-offset-with-none-offset-reset");
+        KafkaPartitionSplitReader reader =
+                createReader(props, 
UnregisteredMetricsGroup.createSourceReaderMetricGroup());
+        // Add a committed offset split and catch kafka exception
+        final KafkaException undefinedOffsetException =
+                Assertions.assertThrows(
+                        KafkaException.class,
+                        () ->
+                                reader.handleSplitsChanges(
+                                        new SplitsAddition<>(
+                                                Collections.singletonList(
+                                                        new 
KafkaPartitionSplit(
+                                                                new 
TopicPartition(TOPIC1, 0),
+                                                                
KafkaPartitionSplit
+                                                                        
.COMMITTED_OFFSET)))));
+        MatcherAssert.assertThat(
+                undefinedOffsetException.getMessage(),
+                CoreMatchers.containsString("Undefined offset with no reset 
policy for partition"));
+    }
+
+    @Test
+    public void testUsingCommittedOffsetsWithEarliestOffsetResetStrategy() 
throws Throwable {
+        MetricListener metricListener = new MetricListener();
+        final Properties props = new Properties();
+        props.setProperty(
+                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                OffsetResetStrategy.EARLIEST.name().toLowerCase());
+        props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
+        props.setProperty(
+                ConsumerConfig.GROUP_ID_CONFIG,
+                "using-committed-offset-with-earliest-offset-reset");
+        KafkaPartitionSplitReader reader =
+                createReader(
+                        props,
+                        
InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup()));
+        // Add a committed offset split
+        reader.handleSplitsChanges(
+                new SplitsAddition<>(
+                        Collections.singletonList(
+                                new KafkaPartitionSplit(
+                                        new TopicPartition(TOPIC1, 0),
+                                        
KafkaPartitionSplit.COMMITTED_OFFSET))));
+        // pendingRecords should have not been registered because of lazily 
registration
+        
assertFalse(metricListener.getGauge(MetricNames.PENDING_RECORDS).isPresent());
+        // Trigger first fetch
+        reader.fetch();
+        final Optional<Gauge<Long>> pendingRecords =
+                metricListener.getGauge(MetricNames.PENDING_RECORDS);
+        assertTrue(pendingRecords.isPresent());
+        // Validate pendingRecords
+        assertNotNull(pendingRecords);
+        assertEquals(NUM_RECORDS_PER_PARTITION - 1, (long) 
pendingRecords.get().getValue());
+        for (int i = 1; i < NUM_RECORDS_PER_PARTITION; i++) {
+            reader.fetch();
+            assertEquals(NUM_RECORDS_PER_PARTITION - i - 1, (long) 
pendingRecords.get().getValue());

Review comment:
       This reused code snippet actually is for testing the functionality of 
metric `pendingRecords`. I think a more straight-forward way under this case is 
to check if the first record that reader fetches is at the earliest offset, or 
check if the consuming position of the partition before any `fetch()` is the 
earliest offset. 

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
##########
@@ -248,6 +253,100 @@ public void testAssignEmptySplit() throws Exception {
         assertTrue(recordsWithSplitIds.finishedSplits().isEmpty());
     }
 
+    @Test
+    public void testUsingCommittedOffsetsWithNoneOffsetResetStrategy() {
+        final Properties props = new Properties();
+        props.setProperty(
+                ConsumerConfig.GROUP_ID_CONFIG, 
"using-committed-offset-with-none-offset-reset");
+        KafkaPartitionSplitReader reader =
+                createReader(props, 
UnregisteredMetricsGroup.createSourceReaderMetricGroup());
+        // Add a committed offset split and catch kafka exception
+        final KafkaException undefinedOffsetException =
+                Assertions.assertThrows(
+                        KafkaException.class,
+                        () ->
+                                reader.handleSplitsChanges(
+                                        new SplitsAddition<>(
+                                                Collections.singletonList(
+                                                        new 
KafkaPartitionSplit(
+                                                                new 
TopicPartition(TOPIC1, 0),
+                                                                
KafkaPartitionSplit
+                                                                        
.COMMITTED_OFFSET)))));
+        MatcherAssert.assertThat(
+                undefinedOffsetException.getMessage(),
+                CoreMatchers.containsString("Undefined offset with no reset 
policy for partition"));
+    }
+
+    @Test
+    public void testUsingCommittedOffsetsWithEarliestOffsetResetStrategy() 
throws Throwable {
+        MetricListener metricListener = new MetricListener();
+        final Properties props = new Properties();
+        props.setProperty(
+                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                OffsetResetStrategy.EARLIEST.name().toLowerCase());
+        props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
+        props.setProperty(
+                ConsumerConfig.GROUP_ID_CONFIG,
+                "using-committed-offset-with-earliest-offset-reset");
+        KafkaPartitionSplitReader reader =
+                createReader(
+                        props,
+                        
InternalSourceReaderMetricGroup.mock(metricListener.getMetricGroup()));
+        // Add a committed offset split
+        reader.handleSplitsChanges(
+                new SplitsAddition<>(
+                        Collections.singletonList(
+                                new KafkaPartitionSplit(
+                                        new TopicPartition(TOPIC1, 0),
+                                        
KafkaPartitionSplit.COMMITTED_OFFSET))));
+        // pendingRecords should have not been registered because of lazily 
registration
+        
assertFalse(metricListener.getGauge(MetricNames.PENDING_RECORDS).isPresent());
+        // Trigger first fetch
+        reader.fetch();
+        final Optional<Gauge<Long>> pendingRecords =
+                metricListener.getGauge(MetricNames.PENDING_RECORDS);
+        assertTrue(pendingRecords.isPresent());
+        // Validate pendingRecords
+        assertNotNull(pendingRecords);
+        assertEquals(NUM_RECORDS_PER_PARTITION - 1, (long) 
pendingRecords.get().getValue());
+        for (int i = 1; i < NUM_RECORDS_PER_PARTITION; i++) {
+            reader.fetch();
+            assertEquals(NUM_RECORDS_PER_PARTITION - i - 1, (long) 
pendingRecords.get().getValue());
+        }
+    }
+
+    @Test
+    public void testUsingCommittedOffsetsWithLatestOffsetResetStrategy() 
throws Throwable {
+        final Properties props = new Properties();
+        props.setProperty(
+                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                OffsetResetStrategy.LATEST.name().toLowerCase());
+        props.setProperty(
+                ConsumerConfig.GROUP_ID_CONFIG, 
"using-committed-offset-with-latest-offset-reset");
+        KafkaPartitionSplitReader reader =
+                createReader(props, 
UnregisteredMetricsGroup.createSourceReaderMetricGroup());
+        // Add empty latest offset reset split
+        final KafkaPartitionSplit latestOffsetResetEmptySplit =
+                new KafkaPartitionSplit(
+                        new TopicPartition(TOPIC1, 0),
+                        KafkaPartitionSplit.COMMITTED_OFFSET,
+                        KafkaPartitionSplit.LATEST_OFFSET);
+        final KafkaPartitionSplit latestOffsetResetNormalSplit =
+                new KafkaPartitionSplit(
+                        new TopicPartition(TOPIC2, 0), 
KafkaPartitionSplit.COMMITTED_OFFSET);
+
+        reader.handleSplitsChanges(
+                new SplitsAddition<>(
+                        Arrays.asList(latestOffsetResetEmptySplit, 
latestOffsetResetNormalSplit)));
+
+        // Fetch and check latest offset reset split is added to finished 
splits
+        RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> 
recordsWithSplitIds = reader.fetch();
+        assertTrue(
+                recordsWithSplitIds
+                        .finishedSplits()
+                        .contains(latestOffsetResetEmptySplit.splitId()));

Review comment:
       I doubt that this case is testing the expected behavior. If the reader 
doesn't act as excepted, for example the fetcher doesn't respect the 
`auto.offset.reset` config and starts from the earliest offset, the first split 
(latestOffsetResetEmptySplit) could also reach the end offset in 
`reader.fetch()`, be added to finished splits, and pass this case. Also I can't 
see any purpose of the second split (latestOffsetResetNormalSplit) here.
   
   Similar to the case above, I think a correct way is to check if the position 
before any `fetch()` is at the latest offset. This is the actual expected 
behavior of the reader.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to