mumrah commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r441007131
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -675,36 +676,41 @@ private ListOffsetResult
fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp
completedFetch.partition);
} else {
FetchPosition position =
subscriptions.position(completedFetch.partition);
- if (completedFetch.nextFetchOffset == position.offset) {
- List<ConsumerRecord<K, V>> partRecords =
completedFetch.fetchRecords(maxRecords);
-
- log.trace("Returning {} fetched records at offset {} for
assigned partition {}",
- partRecords.size(), position,
completedFetch.partition);
-
- if (completedFetch.nextFetchOffset > position.offset) {
- FetchPosition nextPosition = new FetchPosition(
- completedFetch.nextFetchOffset,
- completedFetch.lastEpoch,
- position.currentLeader);
- log.trace("Update fetching position to {} for partition
{}", nextPosition, completedFetch.partition);
- subscriptions.position(completedFetch.partition,
nextPosition);
- }
+ if (position != null) {
+ if (completedFetch.nextFetchOffset == position.offset) {
+ List<ConsumerRecord<K, V>> partRecords =
completedFetch.fetchRecords(maxRecords);
+
+ log.trace("Returning {} fetched records at offset {} for
assigned partition {}",
+ partRecords.size(), position,
completedFetch.partition);
+
+ if (completedFetch.nextFetchOffset > position.offset) {
+ FetchPosition nextPosition = new FetchPosition(
+ completedFetch.nextFetchOffset,
+ completedFetch.lastEpoch,
+ position.currentLeader);
+ log.trace("Update fetching position to {} for
partition {}", nextPosition, completedFetch.partition);
+ subscriptions.position(completedFetch.partition,
nextPosition);
+ }
- Long partitionLag =
subscriptions.partitionLag(completedFetch.partition, isolationLevel);
- if (partitionLag != null)
- this.sensors.recordPartitionLag(completedFetch.partition,
partitionLag);
+ Long partitionLag =
subscriptions.partitionLag(completedFetch.partition, isolationLevel);
+ if (partitionLag != null)
+
this.sensors.recordPartitionLag(completedFetch.partition, partitionLag);
- Long lead =
subscriptions.partitionLead(completedFetch.partition);
- if (lead != null) {
- this.sensors.recordPartitionLead(completedFetch.partition,
lead);
- }
+ Long lead =
subscriptions.partitionLead(completedFetch.partition);
+ if (lead != null) {
+
this.sensors.recordPartitionLead(completedFetch.partition, lead);
+ }
- return partRecords;
+ return partRecords;
+ } else {
+ // these records aren't next in line based on the last
consumed position, ignore them
+ // they must be from an obsolete request
+ log.debug("Ignoring fetched records for {} at offset {}
since the current position is {}",
+ completedFetch.partition,
completedFetch.nextFetchOffset, position);
+ }
} else {
- // these records aren't next in line based on the last
consumed position, ignore them
- // they must be from an obsolete request
- log.debug("Ignoring fetched records for {} at offset {} since
the current position is {}",
- completedFetch.partition,
completedFetch.nextFetchOffset, position);
+ log.warn("Ignoring fetched records for {} at offset {} since
the current position is undefined",
Review comment:
Would IllegalStateException make sense here (since we just checked that
the partition was fetchable)?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]