showuon commented on code in PR #14045:
URL: https://github.com/apache/kafka/pull/14045#discussion_r1268858531


##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java:
##########
@@ -87,36 +96,41 @@ public void waitTillConsumptionCatchesUp(RecordMetadata 
recordMetadata) throws T
     }
 
     /**
-     * Waits if necessary for the consumption to reach the offset of the given 
{@code recordMetadata}.
+     * Waits if necessary for the consumption to reach the {@code offset} of 
the given record
+     * at a certain {@code partition} of the metadata topic.

Review Comment:
   This change should be reverted since the parameter is not updated.



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java:
##########
@@ -182,13 +182,12 @@ private CompletableFuture<Void> 
storeRemoteLogMetadata(TopicIdPartition topicIdP
             CompletableFuture<RecordMetadata> produceFuture = 
producerManager.publishMessage(remoteLogMetadata);
 
             // Create and return a `CompletableFuture` instance which 
completes when the consumer is caught up with the produced record's offset.
-            return produceFuture.thenApplyAsync(recordMetadata -> {
+            return produceFuture.thenAcceptAsync(recordMetadata -> {
                 try {
-                    
consumerManager.waitTillConsumptionCatchesUp(recordMetadata);
+                    
consumerManager.waitTillConsumptionCatchesUp(recordMetadata.partition(), 
recordMetadata.offset());

Review Comment:
   We should pass `recordMetadata` only, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to