satishd commented on code in PR #14329:
URL: https://github.com/apache/kafka/pull/14329#discussion_r1314498306
##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java:
##########
@@ -98,35 +98,28 @@ public void waitTillConsumptionCatchesUp(RecordMetadata
recordMetadata) throws T
*/
public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata,
long timeoutMs) throws
TimeoutException {
- final int partition = recordMetadata.partition();
- final long consumeCheckIntervalMs =
Math.min(CONSUME_RECHECK_INTERVAL_MS, timeoutMs);
-
- log.info("Waiting until consumer is caught up with the target
partition: [{}]", partition);
-
+ int partition = recordMetadata.partition();
// If the current assignment does not have the subscription for this
partition then return immediately.
if (!consumerTask.isMetadataPartitionAssigned(partition)) {
- throw new KafkaException("This consumer is not assigned to the
target partition " + partition + ". " +
- "Partitions currently assigned: " +
consumerTask.metadataPartitionsAssigned());
+ throw new KafkaException("This consumer is not assigned to the
target partition " + partition +
+ ". Currently assigned partitions: " +
consumerTask.metadataPartitionsAssigned());
}
-
- final long offset = recordMetadata.offset();
+ long offset = recordMetadata.offset();
long startTimeMs = time.milliseconds();
+ long consumeCheckIntervalMs = Math.min(CONSUME_RECHECK_INTERVAL_MS,
timeoutMs);
+ log.info("Wait until the consumer is caught up with the target
partition {} up-to offset {}", partition, offset);
while (true) {
- log.debug("Checking if partition [{}] is up to date with offset
[{}]", partition, offset);
long readOffset =
consumerTask.readOffsetForMetadataPartition(partition).orElse(-1L);
if (readOffset >= offset) {
return;
}
-
- log.debug("Expected offset [{}] for partition [{}], but the
committed offset: [{}], Sleeping for [{}] to retry again",
- offset, partition, readOffset, consumeCheckIntervalMs);
-
+ log.debug("Expected offset for partition {} is {}, but the read
offset is {}. " +
+ "Sleeping for {} ms to retry again", partition, offset,
readOffset, consumeCheckIntervalMs);
if (time.milliseconds() - startTimeMs > timeoutMs) {
- log.warn("Expected offset for partition:[{}] is : [{}], but
the committed offset: [{}] ",
- partition, readOffset, offset);
+ log.warn("Expected offset for partition {} is {}, but the read
offset is {}",
+ partition, offset, readOffset);
Review Comment:
Good catch on the ordering of the offsets in the log.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]