hachikuji commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1484643624


##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -127,6 +127,13 @@ public synchronized Cluster fetch() {
         return cache.cluster();
     }
 
+    /**
+     * Get the current metadata cache.
+     */
+    public synchronized MetadataCache fetchCache() {

Review Comment:
   Perhaps we could make `cache` volatile and avoid the synchronization?



##########
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java:
##########
@@ -39,8 +39,9 @@
 import java.util.stream.Collectors;
 
 /**
- * An internal mutable cache of nodes, topics, and partitions in the Kafka 
cluster. This keeps an up-to-date Cluster
+ * An internal immutable cache of nodes, topics, and partitions in the Kafka 
cluster. This keeps an up-to-date Cluster
  * instance which is optimized for read access.
+ * Prefer to extend MetadataCache's API for internal client usage Vs the 
public {@link Cluster}
  */
 public class MetadataCache {

Review Comment:
   We don't have to do it here, but "snapshot" might be a better name since it 
suggests immutability.



##########
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java:
##########
@@ -113,14 +116,28 @@ Optional<Node> nodeById(int id) {
         return Optional.ofNullable(nodes.get(id));
     }
 
-    Cluster cluster() {
+    public Cluster cluster() {
         if (clusterInstance == null) {
             throw new IllegalStateException("Cached Cluster instance should 
not be null, but was.");
         } else {
             return clusterInstance;
         }
     }
 
+    /**
+     * Get leader-epoch for partition.
+     * @param tp partition
+     * @return leader-epoch if known, else return optional.empty()
+     */
+    public Optional<Integer> leaderEpochFor(TopicPartition tp) {

Review Comment:
   Would it make sense to use `OptionalInt`?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##########
@@ -110,7 +110,8 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
      * @param latestLeaderEpoch latest leader's epoch.
      */
     void maybeUpdateLeaderEpoch(Optional<Integer> latestLeaderEpoch) {
-        if (!currentLeaderEpoch.equals(latestLeaderEpoch)) {
+        if (latestLeaderEpoch.isPresent()

Review Comment:
   Hmm, this looks like a change in behavior. Previously we would override a 
known current leader epoch if `latestLeaderEpoch` is not defined. I am not sure 
if that was intentional. I recall there we had some logic which assumed that 
metadata version might be downgraded and no longer provide a leader epoch. Not 
sure it matters here though since we are not changing the leader, just updating 
the epoch.



##########
clients/src/test/java/org/apache/kafka/test/TestUtils.java:
##########
@@ -123,6 +126,34 @@ public static Cluster clusterWith(final int nodes, final 
String topic, final int
         return clusterWith(nodes, Collections.singletonMap(topic, partitions));
     }
 
+    public static MetadataCache metadataCacheWith(final int nodes, final 
Map<String, Integer> topicPartitionCounts) {

Review Comment:
   Useful to have a brief javadoc to explain replica counts and assignment 
strategy.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -879,17 +884,12 @@ private List<ProducerBatch> 
drainBatchesForOneNode(Metadata metadata, Node node,
             // Only proceed if the partition has no in-flight batches.
             if (isMuted(tp))
                 continue;
-            Metadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
-            // Although a small chance, but skip this partition if leader has 
changed since the partition -> node assignment obtained from outside the loop.

Review Comment:
   I guess this race is no longer possible since we are using the snapshot, 
which is immutable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to