satishd commented on code in PR #14329:
URL: https://github.com/apache/kafka/pull/14329#discussion_r1314498306


##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java:
##########
@@ -98,35 +98,28 @@ public void waitTillConsumptionCatchesUp(RecordMetadata 
recordMetadata) throws T
      */
     public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata,
                                              long timeoutMs) throws 
TimeoutException {
-        final int partition = recordMetadata.partition();
-        final long consumeCheckIntervalMs = 
Math.min(CONSUME_RECHECK_INTERVAL_MS, timeoutMs);
-
-        log.info("Waiting until consumer is caught up with the target 
partition: [{}]", partition);
-
+        int partition = recordMetadata.partition();
         // If the current assignment does not have the subscription for this 
partition then return immediately.
         if (!consumerTask.isMetadataPartitionAssigned(partition)) {
-            throw new KafkaException("This consumer is not assigned to the 
target partition " + partition + ". " +
-                    "Partitions currently assigned: " + 
consumerTask.metadataPartitionsAssigned());
+            throw new KafkaException("This consumer is not assigned to the 
target partition " + partition +
+                    ". Currently assigned partitions: " + 
consumerTask.metadataPartitionsAssigned());
         }
-
-        final long offset = recordMetadata.offset();
+        long offset = recordMetadata.offset();
         long startTimeMs = time.milliseconds();
+        long consumeCheckIntervalMs = Math.min(CONSUME_RECHECK_INTERVAL_MS, 
timeoutMs);
+        log.info("Wait until the consumer is caught up with the target 
partition {} up-to offset {}", partition, offset);
         while (true) {
-            log.debug("Checking if partition [{}] is up to date with offset 
[{}]", partition, offset);
             long readOffset = 
consumerTask.readOffsetForMetadataPartition(partition).orElse(-1L);
             if (readOffset >= offset) {
                 return;
             }
-
-            log.debug("Expected offset [{}] for partition [{}], but the 
committed offset: [{}],  Sleeping for [{}] to retry again",
-                    offset, partition, readOffset, consumeCheckIntervalMs);
-
+            log.debug("Expected offset for partition {} is {}, but the read 
offset is {}. " +
+                    "Sleeping for {} ms to retry again", partition, offset, 
readOffset, consumeCheckIntervalMs);
             if (time.milliseconds() - startTimeMs > timeoutMs) {
-                log.warn("Expected offset for partition:[{}] is : [{}], but 
the committed offset: [{}] ",
-                        partition, readOffset, offset);
+                log.warn("Expected offset for partition {} is {}, but the read 
offset is {}",
+                        partition, offset, readOffset);

Review Comment:
   Good catch on the ordering of the offsets in the log. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to