mumrah commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r441007131



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -675,36 +676,41 @@ private ListOffsetResult 
fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
                     completedFetch.partition);
         } else {
             FetchPosition position = 
subscriptions.position(completedFetch.partition);
-            if (completedFetch.nextFetchOffset == position.offset) {
-                List<ConsumerRecord<K, V>> partRecords = 
completedFetch.fetchRecords(maxRecords);
-
-                log.trace("Returning {} fetched records at offset {} for 
assigned partition {}",
-                        partRecords.size(), position, 
completedFetch.partition);
-
-                if (completedFetch.nextFetchOffset > position.offset) {
-                    FetchPosition nextPosition = new FetchPosition(
-                            completedFetch.nextFetchOffset,
-                            completedFetch.lastEpoch,
-                            position.currentLeader);
-                    log.trace("Update fetching position to {} for partition 
{}", nextPosition, completedFetch.partition);
-                    subscriptions.position(completedFetch.partition, 
nextPosition);
-                }
+            if (position != null) {
+                if (completedFetch.nextFetchOffset == position.offset) {
+                    List<ConsumerRecord<K, V>> partRecords = 
completedFetch.fetchRecords(maxRecords);
+
+                    log.trace("Returning {} fetched records at offset {} for 
assigned partition {}",
+                            partRecords.size(), position, 
completedFetch.partition);
+
+                    if (completedFetch.nextFetchOffset > position.offset) {
+                        FetchPosition nextPosition = new FetchPosition(
+                                completedFetch.nextFetchOffset,
+                                completedFetch.lastEpoch,
+                                position.currentLeader);
+                        log.trace("Update fetching position to {} for 
partition {}", nextPosition, completedFetch.partition);
+                        subscriptions.position(completedFetch.partition, 
nextPosition);
+                    }
 
-                Long partitionLag = 
subscriptions.partitionLag(completedFetch.partition, isolationLevel);
-                if (partitionLag != null)
-                    this.sensors.recordPartitionLag(completedFetch.partition, 
partitionLag);
+                    Long partitionLag = 
subscriptions.partitionLag(completedFetch.partition, isolationLevel);
+                    if (partitionLag != null)
+                        
this.sensors.recordPartitionLag(completedFetch.partition, partitionLag);
 
-                Long lead = 
subscriptions.partitionLead(completedFetch.partition);
-                if (lead != null) {
-                    this.sensors.recordPartitionLead(completedFetch.partition, 
lead);
-                }
+                    Long lead = 
subscriptions.partitionLead(completedFetch.partition);
+                    if (lead != null) {
+                        
this.sensors.recordPartitionLead(completedFetch.partition, lead);
+                    }
 
-                return partRecords;
+                    return partRecords;
+                } else {
+                    // these records aren't next in line based on the last 
consumed position, ignore them
+                    // they must be from an obsolete request
+                    log.debug("Ignoring fetched records for {} at offset {} 
since the current position is {}",
+                            completedFetch.partition, 
completedFetch.nextFetchOffset, position);
+                }
             } else {
-                // these records aren't next in line based on the last 
consumed position, ignore them
-                // they must be from an obsolete request
-                log.debug("Ignoring fetched records for {} at offset {} since 
the current position is {}",
-                        completedFetch.partition, 
completedFetch.nextFetchOffset, position);
+                log.warn("Ignoring fetched records for {} at offset {} since 
the current position is undefined",

Review comment:
       Would IllegalStateException make sense here (since we just checked that 
the partition was fetchable)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to