satishd commented on code in PR #14329: URL: https://github.com/apache/kafka/pull/14329#discussion_r1314498306
########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java: ########## @@ -98,35 +98,28 @@ public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata) throws T */ public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata, long timeoutMs) throws TimeoutException { - final int partition = recordMetadata.partition(); - final long consumeCheckIntervalMs = Math.min(CONSUME_RECHECK_INTERVAL_MS, timeoutMs); - - log.info("Waiting until consumer is caught up with the target partition: [{}]", partition); - + int partition = recordMetadata.partition(); // If the current assignment does not have the subscription for this partition then return immediately. if (!consumerTask.isMetadataPartitionAssigned(partition)) { - throw new KafkaException("This consumer is not assigned to the target partition " + partition + ". " + - "Partitions currently assigned: " + consumerTask.metadataPartitionsAssigned()); + throw new KafkaException("This consumer is not assigned to the target partition " + partition + + ". Currently assigned partitions: " + consumerTask.metadataPartitionsAssigned()); } - - final long offset = recordMetadata.offset(); + long offset = recordMetadata.offset(); long startTimeMs = time.milliseconds(); + long consumeCheckIntervalMs = Math.min(CONSUME_RECHECK_INTERVAL_MS, timeoutMs); + log.info("Wait until the consumer is caught up with the target partition {} up-to offset {}", partition, offset); while (true) { - log.debug("Checking if partition [{}] is up to date with offset [{}]", partition, offset); long readOffset = consumerTask.readOffsetForMetadataPartition(partition).orElse(-1L); if (readOffset >= offset) { return; } - - log.debug("Expected offset [{}] for partition [{}], but the committed offset: [{}], Sleeping for [{}] to retry again", - offset, partition, readOffset, consumeCheckIntervalMs); - + log.debug("Expected offset for partition {} is {}, but the read offset is {}. " + + "Sleeping for {} ms to retry again", partition, offset, readOffset, consumeCheckIntervalMs); if (time.milliseconds() - startTimeMs > timeoutMs) { - log.warn("Expected offset for partition:[{}] is : [{}], but the committed offset: [{}] ", - partition, readOffset, offset); + log.warn("Expected offset for partition {} is {}, but the read offset is {}", + partition, offset, readOffset); Review Comment: Good catch on the ordering of the offsets in the log. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org