showuon commented on code in PR #14045: URL: https://github.com/apache/kafka/pull/14045#discussion_r1268858531
########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java: ########## @@ -87,36 +96,41 @@ public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata) throws T } /** - * Waits if necessary for the consumption to reach the offset of the given {@code recordMetadata}. + * Waits if necessary for the consumption to reach the {@code offset} of the given record + * at a certain {@code partition} of the metadata topic. Review Comment: This change should be reverted since the parameter is not updated. ########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java: ########## @@ -182,13 +182,12 @@ private CompletableFuture<Void> storeRemoteLogMetadata(TopicIdPartition topicIdP CompletableFuture<RecordMetadata> produceFuture = producerManager.publishMessage(remoteLogMetadata); // Create and return a `CompletableFuture` instance which completes when the consumer is caught up with the produced record's offset. - return produceFuture.thenApplyAsync(recordMetadata -> { + return produceFuture.thenAcceptAsync(recordMetadata -> { try { - consumerManager.waitTillConsumptionCatchesUp(recordMetadata); + consumerManager.waitTillConsumptionCatchesUp(recordMetadata.partition(), recordMetadata.offset()); Review Comment: We should pass `recordMetadata` only, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org