hachikuji commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1484643624
########## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ########## @@ -127,6 +127,13 @@ public synchronized Cluster fetch() { return cache.cluster(); } + /** + * Get the current metadata cache. + */ + public synchronized MetadataCache fetchCache() { Review Comment: Perhaps we could make `cache` volatile and avoid the synchronization? ########## clients/src/main/java/org/apache/kafka/clients/MetadataCache.java: ########## @@ -39,8 +39,9 @@ import java.util.stream.Collectors; /** - * An internal mutable cache of nodes, topics, and partitions in the Kafka cluster. This keeps an up-to-date Cluster + * An internal immutable cache of nodes, topics, and partitions in the Kafka cluster. This keeps an up-to-date Cluster * instance which is optimized for read access. + * Prefer to extend MetadataCache's API for internal client usage Vs the public {@link Cluster} */ public class MetadataCache { Review Comment: We don't have to do it here, but "snapshot" might be a better name since it suggests immutability. ########## clients/src/main/java/org/apache/kafka/clients/MetadataCache.java: ########## @@ -113,14 +116,28 @@ Optional<Node> nodeById(int id) { return Optional.ofNullable(nodes.get(id)); } - Cluster cluster() { + public Cluster cluster() { if (clusterInstance == null) { throw new IllegalStateException("Cached Cluster instance should not be null, but was."); } else { return clusterInstance; } } + /** + * Get leader-epoch for partition. + * @param tp partition + * @return leader-epoch if known, else return optional.empty() + */ + public Optional<Integer> leaderEpochFor(TopicPartition tp) { Review Comment: Would it make sense to use `OptionalInt`? ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ########## @@ -110,7 +110,8 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon * @param latestLeaderEpoch latest leader's epoch. */ void maybeUpdateLeaderEpoch(Optional<Integer> latestLeaderEpoch) { - if (!currentLeaderEpoch.equals(latestLeaderEpoch)) { + if (latestLeaderEpoch.isPresent() Review Comment: Hmm, this looks like a change in behavior. Previously we would override a known current leader epoch if `latestLeaderEpoch` is not defined. I am not sure if that was intentional. I recall there we had some logic which assumed that metadata version might be downgraded and no longer provide a leader epoch. Not sure it matters here though since we are not changing the leader, just updating the epoch. ########## clients/src/test/java/org/apache/kafka/test/TestUtils.java: ########## @@ -123,6 +126,34 @@ public static Cluster clusterWith(final int nodes, final String topic, final int return clusterWith(nodes, Collections.singletonMap(topic, partitions)); } + public static MetadataCache metadataCacheWith(final int nodes, final Map<String, Integer> topicPartitionCounts) { Review Comment: Useful to have a brief javadoc to explain replica counts and assignment strategy. ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ########## @@ -879,17 +884,12 @@ private List<ProducerBatch> drainBatchesForOneNode(Metadata metadata, Node node, // Only proceed if the partition has no in-flight batches. if (isMuted(tp)) continue; - Metadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); - // Although a small chance, but skip this partition if leader has changed since the partition -> node assignment obtained from outside the loop. Review Comment: I guess this race is no longer possible since we are using the snapshot, which is immutable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org