AndrewJSchofield commented on code in PR #20852:
URL: https://github.com/apache/kafka/pull/20852#discussion_r2519854389


##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -624,23 +624,19 @@ class BrokerServer(
     }
   }
 
-  private def createPartitionMetadataClient(): PartitionMetadataClient = {
-    // This is a no-op implementation of PartitionMetadataClient. It always 
returns -1 as the latest offset for any
-    // requested topic partition.
-    // TODO: KAFKA-19800: Implement a real PartitionMetadataClient that can 
fetch latest offsets via InterBrokerSendThread.
-    new PartitionMetadataClient {
-      override def listLatestOffsets(topicPartitions: util.Set[TopicPartition]
-                                    ): util.Map[TopicPartition, 
util.concurrent.CompletableFuture[java.lang.Long]] = {
-        topicPartitions.asScala
-          .map { tp =>
-            tp -> 
CompletableFuture.completedFuture(java.lang.Long.valueOf(-1L))
-          }
-          .toMap
-          .asJava
-      }
-
-      override def close(): Unit = {}
-    }
+  private def createNetworkPartitionMetadataClient(metadataCache: 
MetadataCache): PartitionMetadataClient = {
+    new NetworkPartitionMetadataClient(
+      metadataCache,
+      NetworkUtils.buildNetworkClient(
+        "NetworkPartitionMetadataClient",
+        config,
+        metrics,
+        Time.SYSTEM,
+        new LogContext(s"[NetworkPartitionMetadataClient 
broker=${config.brokerId}]")
+      ),

Review Comment:
   I think this is a good model since it's not going to be used in every 
cluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to