mumrah commented on code in PR #6731:
URL: https://github.com/apache/kafka/pull/6731#discussion_r1964210916
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1136,24 +1163,42 @@ private PartitionRecords
parseCompletedFetch(CompletedFetch completedFetch) {
log.trace("Updating last stable offset for partition {} to
{}", tp, partition.lastStableOffset);
subscriptions.updateLastStableOffset(tp,
partition.lastStableOffset);
}
+
+ if (partition.preferredReadReplica.isPresent()) {
+
subscriptions.updatePreferredReadReplica(partitionRecords.partition,
partition.preferredReadReplica.get(), () -> {
+ long expireTimeMs = time.milliseconds() +
metadata.metadataExpireMs();
+ log.debug("Updating preferred read replica for
partition {} to {}, set to expire at {}",
+ tp, partition.preferredReadReplica.get(),
expireTimeMs);
+ return expireTimeMs;
+ });
+ }
+
} else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
error == Errors.REPLICA_NOT_AVAILABLE ||
error == Errors.KAFKA_STORAGE_ERROR ||
- error == Errors.FENCED_LEADER_EPOCH) {
+ error == Errors.FENCED_LEADER_EPOCH ||
+ error == Errors.OFFSET_NOT_AVAILABLE) {
log.debug("Error in fetch for partition {}: {}", tp,
error.exceptionName());
this.metadata.requestUpdate();
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
log.warn("Received unknown topic or partition error in fetch
for partition {}", tp);
this.metadata.requestUpdate();
} else if (error == Errors.OFFSET_OUT_OF_RANGE) {
- if (fetchOffset != subscriptions.position(tp).offset) {
- log.debug("Discarding stale fetch response for partition
{} since the fetched offset {} " +
- "does not match the current offset {}", tp,
fetchOffset, subscriptions.position(tp));
- } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
- log.info("Fetch offset {} is out of range for partition
{}, resetting offset", fetchOffset, tp);
- subscriptions.requestOffsetReset(tp);
+ Optional<Integer> clearedReplicaId =
subscriptions.clearPreferredReadReplica(tp);
Review Comment:
@nvartolomei can you file a Jira ticket or ask this question on the mailing
list?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]