This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push: new 6a3cc229895 KAFKA-16226; Reduce synchronization between producer threads (#15323) (#15498) 6a3cc229895 is described below commit 6a3cc229895b36373360a079d970e071d318e4fe Author: Mayank Shekhar Narula <42991652+msn-t...@users.noreply.github.com> AuthorDate: Thu Mar 14 14:09:04 2024 +0000 KAFKA-16226; Reduce synchronization between producer threads (#15323) (#15498) As this [JIRA](https://issues.apache.org/jira/browse/KAFKA-16226) explains, there is increased synchronization between application-thread, and the background thread as the background thread started to synchronized methods Metadata.currentLeader() in [original PR](https://github.com/apache/kafka/pull/14384). So this PR does the following changes 1. Changes background thread, i.e. RecordAccumulator's partitionReady(), and drainBatchesForOneNode(), to not use `Metadata.currentLeader()`. Instead rely on `MetadataCache` that is immutable. So access to it is unsynchronized. 2. This PR repurposes `MetadataCache` as an immutable snapshot of Metadata. This is a wrapper around public `Cluster`. `MetadataCache`'s API/functionality should be extended for internal client usage Vs public `Cluster`. For example, this PR adds `MetadataCache.leaderEpochFor()` 3. Rename `MetadataCache` to `MetadataSnapshot` to make it explicit its immutable. **Note both `Cluster` and `MetadataCache` are not syncronized, hence reduce synchronization from the hot path for high partition counts.** Reviewers: Jason Gustafson <ja...@confluent.io> --- .../java/org/apache/kafka/clients/Metadata.java | 45 ++- .../{MetadataCache.java => MetadataSnapshot.java} | 77 ++-- .../clients/producer/internals/ProducerBatch.java | 13 +- .../producer/internals/RecordAccumulator.java | 78 ++-- .../kafka/clients/producer/internals/Sender.java | 6 +- .../kafka/common/requests/MetadataResponse.java | 6 +- ...ataCacheTest.java => MetadataSnapshotTest.java} | 49 ++- .../org/apache/kafka/clients/MetadataTest.java | 4 +- .../producer/internals/ProducerBatchTest.java | 36 +- .../producer/internals/RecordAccumulatorTest.java | 405 +++++++++------------ .../clients/producer/internals/SenderTest.java | 9 +- .../producer/internals/TransactionManagerTest.java | 33 +- .../test/java/org/apache/kafka/test/TestUtils.java | 39 ++ 13 files changed, 424 insertions(+), 376 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index c42eb474a6c..8455e994f40 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -69,7 +69,7 @@ public class Metadata implements Closeable { private KafkaException fatalException; private Set<String> invalidTopics; private Set<String> unauthorizedTopics; - private MetadataCache cache = MetadataCache.empty(); + private volatile MetadataSnapshot metadataSnapshot = MetadataSnapshot.empty(); private boolean needFullUpdate; private boolean needPartialUpdate; private final ClusterResourceListeners clusterResourceListeners; @@ -108,8 +108,15 @@ public class Metadata implements Closeable { /** * Get the current cluster info without blocking */ - public synchronized Cluster fetch() { - return cache.cluster(); + public Cluster fetch() { + return metadataSnapshot.cluster(); + } + + /** + * Get the current metadata cache. + */ + public MetadataSnapshot fetchMetadataSnapshot() { + return metadataSnapshot; } /** @@ -207,7 +214,7 @@ public class Metadata implements Closeable { */ synchronized Optional<MetadataResponse.PartitionMetadata> partitionMetadataIfCurrent(TopicPartition topicPartition) { Integer epoch = lastSeenLeaderEpochs.get(topicPartition); - Optional<MetadataResponse.PartitionMetadata> partitionMetadata = cache.partitionMetadata(topicPartition); + Optional<MetadataResponse.PartitionMetadata> partitionMetadata = metadataSnapshot.partitionMetadata(topicPartition); if (epoch == null) { // old cluster format (no epochs) return partitionMetadata; @@ -220,8 +227,8 @@ public class Metadata implements Closeable { /** * @return a mapping from topic names to topic IDs for all topics with valid IDs in the cache */ - public synchronized Map<String, Uuid> topicIds() { - return cache.topicIds(); + public Map<String, Uuid> topicIds() { + return metadataSnapshot.topicIds(); } public synchronized LeaderAndEpoch currentLeader(TopicPartition topicPartition) { @@ -231,14 +238,14 @@ public class Metadata implements Closeable { MetadataResponse.PartitionMetadata partitionMetadata = maybeMetadata.get(); Optional<Integer> leaderEpochOpt = partitionMetadata.leaderEpoch; - Optional<Node> leaderNodeOpt = partitionMetadata.leaderId.flatMap(cache::nodeById); + Optional<Node> leaderNodeOpt = partitionMetadata.leaderId.flatMap(metadataSnapshot::nodeById); return new LeaderAndEpoch(leaderNodeOpt, leaderEpochOpt); } public synchronized void bootstrap(List<InetSocketAddress> addresses) { this.needFullUpdate = true; this.updateVersion += 1; - this.cache = MetadataCache.bootstrap(addresses); + this.metadataSnapshot = MetadataSnapshot.bootstrap(addresses); } /** @@ -273,22 +280,22 @@ public class Metadata implements Closeable { this.lastSuccessfulRefreshMs = nowMs; } - String previousClusterId = cache.clusterResource().clusterId(); + String previousClusterId = metadataSnapshot.clusterResource().clusterId(); - this.cache = handleMetadataResponse(response, isPartialUpdate, nowMs); + this.metadataSnapshot = handleMetadataResponse(response, isPartialUpdate, nowMs); - Cluster cluster = cache.cluster(); + Cluster cluster = metadataSnapshot.cluster(); maybeSetMetadataError(cluster); this.lastSeenLeaderEpochs.keySet().removeIf(tp -> !retainTopic(tp.topic(), false, nowMs)); - String newClusterId = cache.clusterResource().clusterId(); + String newClusterId = metadataSnapshot.clusterResource().clusterId(); if (!Objects.equals(previousClusterId, newClusterId)) { log.info("Cluster ID: {}", newClusterId); } - clusterResourceListeners.onUpdate(cache.clusterResource()); + clusterResourceListeners.onUpdate(metadataSnapshot.clusterResource()); - log.debug("Updated cluster metadata updateVersion {} to {}", this.updateVersion, this.cache); + log.debug("Updated cluster metadata updateVersion {} to {}", this.updateVersion, this.metadataSnapshot); } private void maybeSetMetadataError(Cluster cluster) { @@ -314,7 +321,7 @@ public class Metadata implements Closeable { /** * Transform a MetadataResponse into a new MetadataCache instance. */ - private MetadataCache handleMetadataResponse(MetadataResponse metadataResponse, boolean isPartialUpdate, long nowMs) { + private MetadataSnapshot handleMetadataResponse(MetadataResponse metadataResponse, boolean isPartialUpdate, long nowMs) { // All encountered topics. Set<String> topics = new HashSet<>(); @@ -325,7 +332,7 @@ public class Metadata implements Closeable { List<MetadataResponse.PartitionMetadata> partitions = new ArrayList<>(); Map<String, Uuid> topicIds = new HashMap<>(); - Map<String, Uuid> oldTopicIds = cache.topicIds(); + Map<String, Uuid> oldTopicIds = metadataSnapshot.topicIds(); for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) { String topicName = metadata.topic(); Uuid topicId = metadata.topicId(); @@ -373,11 +380,11 @@ public class Metadata implements Closeable { Map<Integer, Node> nodes = metadataResponse.brokersById(); if (isPartialUpdate) - return this.cache.mergeWith(metadataResponse.clusterId(), nodes, partitions, + return this.metadataSnapshot.mergeWith(metadataResponse.clusterId(), nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, metadataResponse.controller(), topicIds, (topic, isInternal) -> !topics.contains(topic) && retainTopic(topic, isInternal, nowMs)); else - return new MetadataCache(metadataResponse.clusterId(), nodes, partitions, + return new MetadataSnapshot(metadataResponse.clusterId(), nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, metadataResponse.controller(), topicIds); } @@ -417,7 +424,7 @@ public class Metadata implements Closeable { } else { // Otherwise ignore the new metadata and use the previously cached info log.debug("Got metadata for an older epoch {} (current is {}) for partition {}, not updating", newEpoch, currentEpoch, tp); - return cache.partitionMetadata(tp); + return metadataSnapshot.partitionMetadata(tp); } } else { // Handle old cluster formats as well as error responses where leader and epoch are missing diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java b/clients/src/main/java/org/apache/kafka/clients/MetadataSnapshot.java similarity index 76% rename from clients/src/main/java/org/apache/kafka/clients/MetadataCache.java rename to clients/src/main/java/org/apache/kafka/clients/MetadataSnapshot.java index d7b6bfd344e..fa2a76006bf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataCache.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataSnapshot.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients; +import java.util.OptionalInt; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.ClusterResource; import org.apache.kafka.common.Node; @@ -39,10 +40,11 @@ import java.util.function.Predicate; import java.util.stream.Collectors; /** - * An internal mutable cache of nodes, topics, and partitions in the Kafka cluster. This keeps an up-to-date Cluster + * An internal immutable snapshot of nodes, topics, and partitions in the Kafka cluster. This keeps an up-to-date Cluster * instance which is optimized for read access. + * Prefer to extend MetadataSnapshot's API for internal client usage Vs the public {@link Cluster} */ -public class MetadataCache { +public class MetadataSnapshot { private final String clusterId; private final Map<Integer, Node> nodes; private final Set<String> unauthorizedTopics; @@ -54,7 +56,7 @@ public class MetadataCache { private Cluster clusterInstance; - MetadataCache(String clusterId, + public MetadataSnapshot(String clusterId, Map<Integer, Node> nodes, Collection<PartitionMetadata> partitions, Set<String> unauthorizedTopics, @@ -65,27 +67,29 @@ public class MetadataCache { this(clusterId, nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, controller, topicIds, null); } - private MetadataCache(String clusterId, - Map<Integer, Node> nodes, - Collection<PartitionMetadata> partitions, - Set<String> unauthorizedTopics, - Set<String> invalidTopics, - Set<String> internalTopics, - Node controller, - Map<String, Uuid> topicIds, - Cluster clusterInstance) { + // Visible for testing + public MetadataSnapshot(String clusterId, + Map<Integer, Node> nodes, + Collection<PartitionMetadata> partitions, + Set<String> unauthorizedTopics, + Set<String> invalidTopics, + Set<String> internalTopics, + Node controller, + Map<String, Uuid> topicIds, + Cluster clusterInstance) { this.clusterId = clusterId; - this.nodes = nodes; - this.unauthorizedTopics = unauthorizedTopics; - this.invalidTopics = invalidTopics; - this.internalTopics = internalTopics; + this.nodes = Collections.unmodifiableMap(nodes); + this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics); + this.invalidTopics = Collections.unmodifiableSet(invalidTopics); + this.internalTopics = Collections.unmodifiableSet(internalTopics); this.controller = controller; this.topicIds = topicIds; - this.metadataByPartition = new HashMap<>(partitions.size()); + Map<TopicPartition, PartitionMetadata> tmpMetadataByPartition = new HashMap<>(partitions.size()); for (PartitionMetadata p : partitions) { - this.metadataByPartition.put(p.topicPartition, p); + tmpMetadataByPartition.put(p.topicPartition, p); } + this.metadataByPartition = Collections.unmodifiableMap(tmpMetadataByPartition); if (clusterInstance == null) { computeClusterView(); @@ -106,7 +110,7 @@ public class MetadataCache { return Optional.ofNullable(nodes.get(id)); } - Cluster cluster() { + public Cluster cluster() { if (clusterInstance == null) { throw new IllegalStateException("Cached Cluster instance should not be null, but was."); } else { @@ -114,13 +118,28 @@ public class MetadataCache { } } + /** + * Get leader-epoch for partition. + * + * @param tp partition + * @return leader-epoch if known, else return OptionalInt.empty() + */ + public OptionalInt leaderEpochFor(TopicPartition tp) { + PartitionMetadata partitionMetadata = metadataByPartition.get(tp); + if (partitionMetadata == null || !partitionMetadata.leaderEpoch.isPresent()) { + return OptionalInt.empty(); + } else { + return OptionalInt.of(partitionMetadata.leaderEpoch.get()); + } + } + ClusterResource clusterResource() { return new ClusterResource(clusterId); } /** - * Merges the metadata cache's contents with the provided metadata, returning a new metadata cache. The provided - * metadata is presumed to be more recent than the cache's metadata, and therefore all overlapping metadata will + * Merges the metadata snapshot's contents with the provided metadata, returning a new metadata snapshot. The provided + * metadata is presumed to be more recent than the snapshot's metadata, and therefore all overlapping metadata will * be overridden. * * @param newClusterId the new cluster Id @@ -131,9 +150,9 @@ public class MetadataCache { * @param newController the new controller node * @param topicIds the mapping from topic name to topic ID from the MetadataResponse * @param retainTopic returns whether a topic's metadata should be retained - * @return the merged metadata cache + * @return the merged metadata snapshot */ - MetadataCache mergeWith(String newClusterId, + MetadataSnapshot mergeWith(String newClusterId, Map<Integer, Node> newNodes, Collection<PartitionMetadata> addPartitions, Set<String> addUnauthorizedTopics, @@ -173,7 +192,7 @@ public class MetadataCache { Set<String> newInvalidTopics = fillSet(addInvalidTopics, invalidTopics, shouldRetainTopic); Set<String> newInternalTopics = fillSet(addInternalTopics, internalTopics, shouldRetainTopic); - return new MetadataCache(newClusterId, newNodes, newMetadataByPartition.values(), newUnauthorizedTopics, + return new MetadataSnapshot(newClusterId, newNodes, newMetadataByPartition.values(), newUnauthorizedTopics, newInvalidTopics, newInternalTopics, newController, newTopicIds); } @@ -205,26 +224,26 @@ public class MetadataCache { invalidTopics, internalTopics, controller, topicIds); } - static MetadataCache bootstrap(List<InetSocketAddress> addresses) { + static MetadataSnapshot bootstrap(List<InetSocketAddress> addresses) { Map<Integer, Node> nodes = new HashMap<>(); int nodeId = -1; for (InetSocketAddress address : addresses) { nodes.put(nodeId, new Node(nodeId, address.getHostString(), address.getPort())); nodeId--; } - return new MetadataCache(null, nodes, Collections.emptyList(), + return new MetadataSnapshot(null, nodes, Collections.emptyList(), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Collections.emptyMap(), Cluster.bootstrap(addresses)); } - static MetadataCache empty() { - return new MetadataCache(null, Collections.emptyMap(), Collections.emptyList(), + static MetadataSnapshot empty() { + return new MetadataSnapshot(null, Collections.emptyMap(), Collections.emptyList(), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Collections.emptyMap(), Cluster.empty()); } @Override public String toString() { - return "MetadataCache{" + + return "MetadataSnapshot{" + "clusterId='" + clusterId + '\'' + ", nodes=" + nodes + ", partitions=" + metadataByPartition.values() + diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 408b8316eb8..457d86fcc91 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.clients.producer.internals; -import java.util.Optional; +import java.util.OptionalInt; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; @@ -81,7 +81,7 @@ public final class ProducerBatch { private boolean reopened; // Tracks the current-leader's epoch to which this batch would be sent, in the current to produce the batch. - private Optional<Integer> currentLeaderEpoch; + private OptionalInt currentLeaderEpoch; // Tracks the attempt in which leader was changed to currentLeaderEpoch for the 1st time. private int attemptsWhenLeaderLastChanged; @@ -100,7 +100,7 @@ public final class ProducerBatch { this.isSplitBatch = isSplitBatch; float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(), recordsBuilder.compressionType()); - this.currentLeaderEpoch = Optional.empty(); + this.currentLeaderEpoch = OptionalInt.empty(); this.attemptsWhenLeaderLastChanged = 0; recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation); } @@ -109,8 +109,9 @@ public final class ProducerBatch { * It will update the leader to which this batch will be produced for the ongoing attempt, if a newer leader is known. * @param latestLeaderEpoch latest leader's epoch. */ - void maybeUpdateLeaderEpoch(Optional<Integer> latestLeaderEpoch) { - if (!currentLeaderEpoch.equals(latestLeaderEpoch)) { + void maybeUpdateLeaderEpoch(OptionalInt latestLeaderEpoch) { + if (latestLeaderEpoch.isPresent() + && (!currentLeaderEpoch.isPresent() || currentLeaderEpoch.getAsInt() < latestLeaderEpoch.getAsInt())) { log.trace("For {}, leader will be updated, currentLeaderEpoch: {}, attemptsWhenLeaderLastChanged:{}, latestLeaderEpoch: {}, current attempt: {}", this, currentLeaderEpoch, attemptsWhenLeaderLastChanged, latestLeaderEpoch, attempts); attemptsWhenLeaderLastChanged = attempts(); @@ -558,7 +559,7 @@ public final class ProducerBatch { } // VisibleForTesting - Optional<Integer> currentLeaderEpoch() { + OptionalInt currentLeaderEpoch() { return currentLeaderEpoch; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 98d0bfea22a..7e38c3c8903 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -26,12 +26,13 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.OptionalInt; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.ApiVersions; -import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MetadataSnapshot; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.utils.ProducerIdAndEpoch; @@ -637,27 +638,27 @@ public class RecordAccumulator { } /** - * Iterate over partitions to see which one have batches ready and collect leaders of those partitions - * into the set of ready nodes. If partition has no leader, add the topic to the set of topics with - * no leader. This function also calculates stats for adaptive partitioning. + * Iterate over partitions to see which one have batches ready and collect leaders of those + * partitions into the set of ready nodes. If partition has no leader, add the topic to the set + * of topics with no leader. This function also calculates stats for adaptive partitioning. * - * @param metadata The cluster metadata - * @param nowMs The current time - * @param topic The topic - * @param topicInfo The topic info + * @param metadataSnapshot The cluster metadata + * @param nowMs The current time + * @param topic The topic + * @param topicInfo The topic info * @param nextReadyCheckDelayMs The delay for next check - * @param readyNodes The set of ready nodes (to be filled in) - * @param unknownLeaderTopics The set of topics with no leader (to be filled in) + * @param readyNodes The set of ready nodes (to be filled in) + * @param unknownLeaderTopics The set of topics with no leader (to be filled in) * @return The delay for next check */ - private long partitionReady(Metadata metadata, long nowMs, String topic, + private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, String topic, TopicInfo topicInfo, long nextReadyCheckDelayMs, Set<Node> readyNodes, Set<String> unknownLeaderTopics) { ConcurrentMap<Integer, Deque<ProducerBatch>> batches = topicInfo.batches; // Collect the queue sizes for available partitions to be used in adaptive partitioning. int[] queueSizes = null; int[] partitionIds = null; - if (enableAdaptivePartitioning && batches.size() >= metadata.fetch().partitionsForTopic(topic).size()) { + if (enableAdaptivePartitioning && batches.size() >= metadataSnapshot.cluster().partitionsForTopic(topic).size()) { // We don't do adaptive partitioning until we scheduled at least a batch for all // partitions (i.e. we have the corresponding entries in the batches map), we just // do uniform. The reason is that we build queue sizes from the batches map, @@ -674,8 +675,7 @@ public class RecordAccumulator { // Advance queueSizesIndex so that we properly index available // partitions. Do it here so that it's done for all code paths. - Metadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(part); - Node leader = leaderAndEpoch.leader.orElse(null); + Node leader = metadataSnapshot.cluster().leaderFor(part); if (leader != null && queueSizes != null) { ++queueSizesIndex; assert queueSizesIndex < queueSizes.length; @@ -689,9 +689,14 @@ public class RecordAccumulator { final int dequeSize; final boolean full; - // This loop is especially hot with large partition counts. + OptionalInt leaderEpoch = metadataSnapshot.leaderEpochFor(part); + + // This loop is especially hot with large partition counts. So - - // We are careful to only perform the minimum required inside the + // 1. We should avoid code that increases synchronization between application thread calling + // send(), and background thread running runOnce(), see https://issues.apache.org/jira/browse/KAFKA-16226 + + // 2. We are careful to only perform the minimum required inside the // synchronized block, as this lock is also used to synchronize producer threads // attempting to append() to a partition/batch. @@ -704,7 +709,7 @@ public class RecordAccumulator { } waitedTimeMs = batch.waitedTimeMs(nowMs); - batch.maybeUpdateLeaderEpoch(leaderAndEpoch.epoch); + batch.maybeUpdateLeaderEpoch(leaderEpoch); backingOff = shouldBackoff(batch.hasLeaderChangedForTheOngoingRetry(), batch, waitedTimeMs); dequeSize = deque.size(); full = dequeSize > 1 || batch.isFull(); @@ -764,7 +769,7 @@ public class RecordAccumulator { * </ul> * </ol> */ - public ReadyCheckResult ready(Metadata metadata, long nowMs) { + public ReadyCheckResult ready(MetadataSnapshot metadataSnapshot, long nowMs) { Set<Node> readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; Set<String> unknownLeaderTopics = new HashSet<>(); @@ -772,7 +777,7 @@ public class RecordAccumulator { // cumulative frequency table (used in partitioner). for (Map.Entry<String, TopicInfo> topicInfoEntry : this.topicInfoMap.entrySet()) { final String topic = topicInfoEntry.getKey(); - nextReadyCheckDelayMs = partitionReady(metadata, nowMs, topic, topicInfoEntry.getValue(), nextReadyCheckDelayMs, readyNodes, unknownLeaderTopics); + nextReadyCheckDelayMs = partitionReady(metadataSnapshot, nowMs, topic, topicInfoEntry.getValue(), nextReadyCheckDelayMs, readyNodes, unknownLeaderTopics); } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); } @@ -843,9 +848,9 @@ public class RecordAccumulator { return false; } - private List<ProducerBatch> drainBatchesForOneNode(Metadata metadata, Node node, int maxSize, long now) { + private List<ProducerBatch> drainBatchesForOneNode(MetadataSnapshot metadataSnapshot, Node node, int maxSize, long now) { int size = 0; - List<PartitionInfo> parts = metadata.fetch().partitionsForNode(node.id()); + List<PartitionInfo> parts = metadataSnapshot.cluster().partitionsForNode(node.id()); List<ProducerBatch> ready = new ArrayList<>(); if (parts.isEmpty()) return ready; @@ -861,17 +866,12 @@ public class RecordAccumulator { // Only proceed if the partition has no in-flight batches. if (isMuted(tp)) continue; - Metadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); - // Although a small chance, but skip this partition if leader has changed since the partition -> node assignment obtained from outside the loop. - // In this case, skip sending it to the old leader, as it would return aa NO_LEADER_OR_FOLLOWER error. - if (!leaderAndEpoch.leader.isPresent()) - continue; - if (!node.equals(leaderAndEpoch.leader.get())) - continue; Deque<ProducerBatch> deque = getDeque(tp); if (deque == null) continue; + OptionalInt leaderEpoch = metadataSnapshot.leaderEpochFor(tp); + final ProducerBatch batch; synchronized (deque) { // invariant: !isMuted(tp,now) && deque != null @@ -880,8 +880,8 @@ public class RecordAccumulator { continue; // first != null - first.maybeUpdateLeaderEpoch(leaderAndEpoch.epoch); // Only drain the batch if it is not during backoff period. + first.maybeUpdateLeaderEpoch(leaderEpoch); if (shouldBackoff(first.hasLeaderChangedForTheOngoingRetry(), first, first.waitedTimeMs(now))) continue; @@ -945,22 +945,24 @@ public class RecordAccumulator { } /** - * Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified - * size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over. + * Drain all the data for the given nodes and collate them into a list of batches that will fit + * within the specified size on a per-node basis. This method attempts to avoid choosing the same + * topic-node over and over. * - * @param metadata The current cluster metadata - * @param nodes The list of node to drain - * @param maxSize The maximum number of bytes to drain - * @param now The current unix time in milliseconds - * @return A list of {@link ProducerBatch} for each node specified with total size less than the requested maxSize. + * @param metadataSnapshot The current cluster metadata + * @param nodes The list of node to drain + * @param maxSize The maximum number of bytes to drain + * @param now The current unix time in milliseconds + * @return A list of {@link ProducerBatch} for each node specified with total size less than the + * requested maxSize. */ - public Map<Integer, List<ProducerBatch>> drain(Metadata metadata, Set<Node> nodes, int maxSize, long now) { + public Map<Integer, List<ProducerBatch>> drain(MetadataSnapshot metadataSnapshot, Set<Node> nodes, int maxSize, long now) { if (nodes.isEmpty()) return Collections.emptyMap(); Map<Integer, List<ProducerBatch>> batches = new HashMap<>(); for (Node node : nodes) { - List<ProducerBatch> ready = drainBatchesForOneNode(metadata, node, maxSize, now); + List<ProducerBatch> ready = drainBatchesForOneNode(metadataSnapshot, node, maxSize, now); batches.put(node.id(), ready); } return batches; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index ef2151e9819..273f05330b2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MetadataSnapshot; import org.apache.kafka.clients.NetworkClientUtils; import org.apache.kafka.clients.RequestCompletionHandler; import org.apache.kafka.common.InvalidRecordException; @@ -358,8 +359,9 @@ public class Sender implements Runnable { } private long sendProducerData(long now) { + MetadataSnapshot metadataSnapshot = metadata.fetchMetadataSnapshot(); // get the list of partitions with data ready to send - RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(metadata, now); + RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(metadataSnapshot, now); // if there are any partitions whose leaders are not known yet, force metadata update if (!result.unknownLeaderTopics.isEmpty()) { @@ -394,7 +396,7 @@ public class Sender implements Runnable { } // create produce requests - Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(metadata, result.readyNodes, this.maxRequestSize, now); + Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(metadataSnapshot, result.readyNodes, this.maxRequestSize, now); addToInflightBatches(batches); if (guaranteeMessageOrder) { // Mute all the partitions drained diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 47cdd3f0d7e..bc6c387c5e0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -172,9 +172,9 @@ public class MetadataResponse extends AbstractResponse { return new PartitionInfo(metadata.topic(), metadata.partition(), metadata.leaderId.map(nodesById::get).orElse(null), - convertToNodeArray(metadata.replicaIds, nodesById), - convertToNodeArray(metadata.inSyncReplicaIds, nodesById), - convertToNodeArray(metadata.offlineReplicaIds, nodesById)); + (metadata.replicaIds == null) ? null : convertToNodeArray(metadata.replicaIds, nodesById), + (metadata.inSyncReplicaIds == null) ? null : convertToNodeArray(metadata.inSyncReplicaIds, nodesById), + (metadata.offlineReplicaIds == null) ? null : convertToNodeArray(metadata.offlineReplicaIds, nodesById)); } private static Node[] convertToNodeArray(List<Integer> replicaIds, Map<Integer, Node> nodesById) { diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataSnapshotTest.java similarity index 64% rename from clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java rename to clients/src/test/java/org/apache/kafka/clients/MetadataSnapshotTest.java index 387643c3c5f..b1c29ffe189 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataCacheTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataSnapshotTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients; +import java.util.OptionalInt; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -36,7 +37,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -public class MetadataCacheTest { +public class MetadataSnapshotTest { @Test public void testMissingLeaderEndpoint() { @@ -61,7 +62,7 @@ public class MetadataCacheTest { nodesById.put(7, new Node(7, "localhost", 2078)); nodesById.put(8, new Node(8, "localhost", 2079)); - MetadataCache cache = new MetadataCache("clusterId", + MetadataSnapshot cache = new MetadataSnapshot("clusterId", nodesById, Collections.singleton(partitionMetadata), Collections.emptySet(), @@ -83,4 +84,48 @@ public class MetadataCacheTest { assertEquals(nodesById.get(7), replicas.get(7)); } + @Test + public void testLeaderEpochFor() { + // Setup partition 0 with a leader-epoch of 10. + TopicPartition topicPartition1 = new TopicPartition("topic", 0); + MetadataResponse.PartitionMetadata partitionMetadata1 = new MetadataResponse.PartitionMetadata( + Errors.NONE, + topicPartition1, + Optional.of(5), + Optional.of(10), + Arrays.asList(5, 6, 7), + Arrays.asList(5, 6, 7), + Collections.emptyList()); + + // Setup partition 1 with an unknown leader epoch. + TopicPartition topicPartition2 = new TopicPartition("topic", 1); + MetadataResponse.PartitionMetadata partitionMetadata2 = new MetadataResponse.PartitionMetadata( + Errors.NONE, + topicPartition2, + Optional.of(5), + Optional.empty(), + Arrays.asList(5, 6, 7), + Arrays.asList(5, 6, 7), + Collections.emptyList()); + + Map<Integer, Node> nodesById = new HashMap<>(); + nodesById.put(5, new Node(5, "localhost", 2077)); + nodesById.put(6, new Node(6, "localhost", 2078)); + nodesById.put(7, new Node(7, "localhost", 2079)); + + MetadataSnapshot cache = new MetadataSnapshot("clusterId", + nodesById, + Arrays.asList(partitionMetadata1, partitionMetadata2), + Collections.emptySet(), + Collections.emptySet(), + Collections.emptySet(), + null, + Collections.emptyMap()); + + assertEquals(OptionalInt.of(10), cache.leaderEpochFor(topicPartition1)); + + assertEquals(OptionalInt.empty(), cache.leaderEpochFor(topicPartition2)); + + assertEquals(OptionalInt.empty(), cache.leaderEpochFor(new TopicPartition("topic_missing", 0))); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index 30e7c3ab186..10802ea89f4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -587,11 +587,11 @@ public class MetadataTest { // Sentinel instances InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0); - Cluster fromMetadata = MetadataCache.bootstrap(Collections.singletonList(address)).cluster(); + Cluster fromMetadata = MetadataSnapshot.bootstrap(Collections.singletonList(address)).cluster(); Cluster fromCluster = Cluster.bootstrap(Collections.singletonList(address)); assertEquals(fromMetadata, fromCluster); - Cluster fromMetadataEmpty = MetadataCache.empty().cluster(); + Cluster fromMetadataEmpty = MetadataSnapshot.empty().cluster(); Cluster fromClusterEmpty = Cluster.empty(); assertEquals(fromMetadataEmpty, fromClusterEmpty); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java index 24629b612b2..7f98e08e534 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.clients.producer.internals; -import java.util.Optional; +import java.util.OptionalInt; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; @@ -276,41 +276,55 @@ public class ProducerBatchTest { ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now); // Starting state for the batch, no attempt made to send it yet. - assertEquals(Optional.empty(), batch.currentLeaderEpoch()); + assertEquals(OptionalInt.empty(), batch.currentLeaderEpoch()); assertEquals(0, batch.attemptsWhenLeaderLastChanged()); // default value - batch.maybeUpdateLeaderEpoch(Optional.empty()); + batch.maybeUpdateLeaderEpoch(OptionalInt.empty()); assertFalse(batch.hasLeaderChangedForTheOngoingRetry()); // 1st attempt[Not a retry] to send the batch. // Check leader isn't flagged as a new leader. int batchLeaderEpoch = 100; - batch.maybeUpdateLeaderEpoch(Optional.of(batchLeaderEpoch)); + batch.maybeUpdateLeaderEpoch(OptionalInt.of(batchLeaderEpoch)); assertFalse(batch.hasLeaderChangedForTheOngoingRetry(), "batch leader is assigned for 1st time"); - assertEquals(batchLeaderEpoch, batch.currentLeaderEpoch().get()); + assertEquals(batchLeaderEpoch, batch.currentLeaderEpoch().getAsInt()); assertEquals(0, batch.attemptsWhenLeaderLastChanged()); // 2nd attempt[1st retry] to send the batch to a new leader. // Check leader change is detected. batchLeaderEpoch = 101; batch.reenqueued(0); - batch.maybeUpdateLeaderEpoch(Optional.of(batchLeaderEpoch)); + batch.maybeUpdateLeaderEpoch(OptionalInt.of(batchLeaderEpoch)); assertTrue(batch.hasLeaderChangedForTheOngoingRetry(), "batch leader has changed"); - assertEquals(batchLeaderEpoch, batch.currentLeaderEpoch().get()); + assertEquals(batchLeaderEpoch, batch.currentLeaderEpoch().getAsInt()); assertEquals(1, batch.attemptsWhenLeaderLastChanged()); // 2nd attempt[1st retry] still ongoing, yet to be made. // Check same leaderEpoch(101) is still considered as a leader-change. - batch.maybeUpdateLeaderEpoch(Optional.of(batchLeaderEpoch)); + batch.maybeUpdateLeaderEpoch(OptionalInt.of(batchLeaderEpoch)); assertTrue(batch.hasLeaderChangedForTheOngoingRetry(), "batch leader has changed"); - assertEquals(batchLeaderEpoch, batch.currentLeaderEpoch().get()); + assertEquals(batchLeaderEpoch, batch.currentLeaderEpoch().getAsInt()); assertEquals(1, batch.attemptsWhenLeaderLastChanged()); // 3rd attempt[2nd retry] to the same leader-epoch(101). // Check same leaderEpoch(101) as not detected as a leader-change. batch.reenqueued(0); - batch.maybeUpdateLeaderEpoch(Optional.of(batchLeaderEpoch)); + batch.maybeUpdateLeaderEpoch(OptionalInt.of(batchLeaderEpoch)); assertFalse(batch.hasLeaderChangedForTheOngoingRetry(), "batch leader has not changed"); - assertEquals(batchLeaderEpoch, batch.currentLeaderEpoch().get()); + assertEquals(batchLeaderEpoch, batch.currentLeaderEpoch().getAsInt()); + assertEquals(1, batch.attemptsWhenLeaderLastChanged()); + + // Attempt made to update batch leader-epoch to an older leader-epoch(100). + // Check batch leader-epoch remains unchanged as 101. + batch.maybeUpdateLeaderEpoch(OptionalInt.of(batchLeaderEpoch - 1)); + assertFalse(batch.hasLeaderChangedForTheOngoingRetry(), "batch leader has not changed"); + assertEquals(batchLeaderEpoch, batch.currentLeaderEpoch().getAsInt()); + assertEquals(1, batch.attemptsWhenLeaderLastChanged()); + + // Attempt made to update batch leader-epoch to an unknown leader(optional.empty()) + // Check batch leader-epoch remains unchanged as 101. + batch.maybeUpdateLeaderEpoch(OptionalInt.empty()); + assertFalse(batch.hasLeaderChangedForTheOngoingRetry(), "batch leader has not changed"); + assertEquals(batchLeaderEpoch, batch.currentLeaderEpoch().getAsInt()); assertEquals(1, batch.attemptsWhenLeaderLastChanged()); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 9a0170aa7d5..864bd9e9b00 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -17,9 +17,10 @@ package org.apache.kafka.clients.producer.internals; import java.util.Optional; +import java.util.OptionalInt; import java.util.function.Function; import org.apache.kafka.clients.ApiVersions; -import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MetadataSnapshot; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Partitioner; @@ -32,6 +33,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.CompressionRatioEstimator; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.DefaultRecord; @@ -41,6 +43,8 @@ import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.ProducerIdAndEpoch; @@ -88,29 +92,38 @@ public class RecordAccumulatorTest { private TopicPartition tp1 = new TopicPartition(topic, partition1); private TopicPartition tp2 = new TopicPartition(topic, partition2); private TopicPartition tp3 = new TopicPartition(topic, partition3); - private PartitionInfo part1 = new PartitionInfo(topic, partition1, node1, null, null); - private PartitionInfo part2 = new PartitionInfo(topic, partition2, node1, null, null); - private PartitionInfo part3 = new PartitionInfo(topic, partition3, node2, null, null); + + private PartitionMetadata partMetadata1 = new PartitionMetadata(Errors.NONE, tp1, Optional.of(node1.id()), Optional.empty(), null, null, null); + private PartitionMetadata partMetadata2 = new PartitionMetadata(Errors.NONE, tp2, Optional.of(node1.id()), Optional.empty(), null, null, null); + private PartitionMetadata partMetadata3 = new PartitionMetadata(Errors.NONE, tp3, Optional.of(node2.id()), Optional.empty(), null, null, null); + private List<PartitionMetadata> partMetadatas = new ArrayList<>(Arrays.asList(partMetadata1, partMetadata2, partMetadata3)); + + private Map<Integer, Node> nodes = Arrays.asList(node1, node2).stream().collect(Collectors.toMap(Node::id, Function.identity())); + private MetadataSnapshot metadataCache = new MetadataSnapshot(null, + nodes, + partMetadatas, + Collections.emptySet(), + Collections.emptySet(), + Collections.emptySet(), + null, + Collections.emptyMap()); + + private Cluster cluster = metadataCache.cluster(); + private MockTime time = new MockTime(); private byte[] key = "key".getBytes(); private byte[] value = "value".getBytes(); private int msgSize = DefaultRecord.sizeInBytes(0, 0, key.length, value.length, Record.EMPTY_HEADERS); - Metadata metadataMock; - private Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3), - Collections.emptySet(), Collections.emptySet()); + private Metrics metrics = new Metrics(time); private final long maxBlockTimeMs = 1000; private final LogContext logContext = new LogContext(); - @BeforeEach - public void setup() { - metadataMock = setupMetadata(cluster); - } + @BeforeEach void setup() {} @AfterEach public void teardown() { this.metrics.close(); - Mockito.reset(metadataMock); } @Test @@ -119,13 +132,31 @@ public class RecordAccumulatorTest { // add tp-4 int partition4 = 3; TopicPartition tp4 = new TopicPartition(topic, partition4); - PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null); + PartitionMetadata partMetadata4 = new PartitionMetadata(Errors.NONE, tp4, Optional.of(node2.id()), Optional.empty(), null, null, null); + partMetadatas.add(partMetadata4); + + // This test requires that partitions to be drained in order for each node i.e. + // node1 -> tp1, tp3, and node2 -> tp2, tp4. + // So setup cluster with this order, and pass this cluster to MetadataCache to preserve this order. + PartitionInfo part1 = MetadataResponse.toPartitionInfo(partMetadata1, nodes); + PartitionInfo part2 = MetadataResponse.toPartitionInfo(partMetadata2, nodes); + PartitionInfo part3 = MetadataResponse.toPartitionInfo(partMetadata3, nodes); + PartitionInfo part4 = MetadataResponse.toPartitionInfo(partMetadata4, nodes); + Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3, part4), + Collections.emptySet(), Collections.emptySet()); + metadataCache = new MetadataSnapshot(null, + nodes, + partMetadatas, + Collections.emptySet(), + Collections.emptySet(), + Collections.emptySet(), + null, + Collections.emptyMap(), + cluster); long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD; RecordAccumulator accum = createTestRecordAccumulator((int) batchSize, Integer.MAX_VALUE, CompressionType.NONE, 10); - Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3, part4), - Collections.emptySet(), Collections.emptySet()); - metadataMock = setupMetadata(cluster); + // initial data accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); @@ -134,7 +165,7 @@ public class RecordAccumulatorTest { accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); // drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the max request size is full after the first batch drained - Map<Integer, List<ProducerBatch>> batches1 = accum.drain(metadataMock, new HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0); + Map<Integer, List<ProducerBatch>> batches1 = accum.drain(metadataCache, new HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0); verifyTopicPartitionInBatches(batches1, tp1, tp3); // add record for tp1, tp3 @@ -143,11 +174,11 @@ public class RecordAccumulatorTest { // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the max request size is full after the first batch drained // The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4 - Map<Integer, List<ProducerBatch>> batches2 = accum.drain(metadataMock, new HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0); + Map<Integer, List<ProducerBatch>> batches2 = accum.drain(metadataCache, new HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0); verifyTopicPartitionInBatches(batches2, tp2, tp4); // make sure in next run, the drain index will start from the beginning - Map<Integer, List<ProducerBatch>> batches3 = accum.drain(metadataMock, new HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0); + Map<Integer, List<ProducerBatch>> batches3 = accum.drain(metadataCache, new HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0); verifyTopicPartitionInBatches(batches3, tp1, tp3); // add record for tp2, tp3, tp4 and mute the tp4 @@ -156,7 +187,7 @@ public class RecordAccumulatorTest { accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.mutePartition(tp4); // drain batches from 2 nodes: node1 => tp2, node2 => tp3 (because tp4 is muted) - Map<Integer, List<ProducerBatch>> batches4 = accum.drain(metadataMock, new HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0); + Map<Integer, List<ProducerBatch>> batches4 = accum.drain(metadataCache, new HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0); verifyTopicPartitionInBatches(batches4, tp2, tp3); // add record for tp1, tp2, tp3, and unmute tp4 @@ -165,7 +196,7 @@ public class RecordAccumulatorTest { accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); accum.unmutePartition(tp4); // set maxSize as a max value, so that the all partitions in 2 nodes should be drained: node1 => [tp1, tp2], node2 => [tp3, tp4] - Map<Integer, List<ProducerBatch>> batches5 = accum.drain(metadataMock, new HashSet<>(Arrays.asList(node1, node2)), Integer.MAX_VALUE, 0); + Map<Integer, List<ProducerBatch>> batches5 = accum.drain(metadataCache, new HashSet<>(Arrays.asList(node1, node2)), Integer.MAX_VALUE, 0); verifyTopicPartitionInBatches(batches5, tp1, tp2, tp3, tp4); } @@ -196,25 +227,25 @@ public class RecordAccumulatorTest { int appends = expectedNumAppends(batchSize); for (int i = 0; i < appends; i++) { // append to the first batch - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), metadataCache.cluster()); Deque<ProducerBatch> partitionBatches = accum.getDeque(tp1); assertEquals(1, partitionBatches.size()); ProducerBatch batch = partitionBatches.peekFirst(); assertTrue(batch.isWritable()); - assertEquals(0, accum.ready(metadataMock, now).readyNodes.size(), "No partitions should be ready."); + assertEquals(0, accum.ready(metadataCache, now).readyNodes.size(), "No partitions should be ready."); } // this append doesn't fit in the first batch, so a new batch is created and the first batch is closed - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), metadataCache.cluster()); Deque<ProducerBatch> partitionBatches = accum.getDeque(tp1); assertEquals(2, partitionBatches.size()); Iterator<ProducerBatch> partitionBatchesIterator = partitionBatches.iterator(); assertTrue(partitionBatchesIterator.next().isWritable()); - assertEquals(Collections.singleton(node1), accum.ready(metadataMock, time.milliseconds()).readyNodes, "Our partition's leader should be ready"); + assertEquals(Collections.singleton(node1), accum.ready(metadataCache, time.milliseconds()).readyNodes, "Our partition's leader should be ready"); - List<ProducerBatch> batches = accum.drain(metadataMock, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); + List<ProducerBatch> batches = accum.drain(metadataCache, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); ProducerBatch batch = batches.get(0); @@ -242,8 +273,8 @@ public class RecordAccumulatorTest { byte[] value = new byte[2 * batchSize]; RecordAccumulator accum = createTestRecordAccumulator( batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0); - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); - assertEquals(Collections.singleton(node1), accum.ready(metadataMock, time.milliseconds()).readyNodes, "Our partition's leader should be ready"); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), metadataCache.cluster()); + assertEquals(Collections.singleton(node1), accum.ready(metadataCache, time.milliseconds()).readyNodes, "Our partition's leader should be ready"); Deque<ProducerBatch> batches = accum.getDeque(tp1); assertEquals(1, batches.size()); @@ -280,8 +311,8 @@ public class RecordAccumulatorTest { RecordAccumulator accum = createTestRecordAccumulator( batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0); - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); - assertEquals(Collections.singleton(node1), accum.ready(metadataMock, time.milliseconds()).readyNodes, "Our partition's leader should be ready"); + accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), metadataCache.cluster()); + assertEquals(Collections.singleton(node1), accum.ready(metadataCache, time.milliseconds()).readyNodes, "Our partition's leader should be ready"); Deque<ProducerBatch> batches = accum.getDeque(tp1); assertEquals(1, batches.size()); @@ -305,10 +336,10 @@ public class RecordAccumulatorTest { RecordAccumulator accum = createTestRecordAccumulator( 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, lingerMs); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); - assertEquals(0, accum.ready(metadataMock, time.milliseconds()).readyNodes.size(), "No partitions should be ready"); + assertEquals(0, accum.ready(metadataCache, time.milliseconds()).readyNodes.size(), "No partitions should be ready"); time.sleep(10); - assertEquals(Collections.singleton(node1), accum.ready(metadataMock, time.milliseconds()).readyNodes, "Our partition's leader should be ready"); - List<ProducerBatch> batches = accum.drain(metadataMock, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); + assertEquals(Collections.singleton(node1), accum.ready(metadataCache, time.milliseconds()).readyNodes, "Our partition's leader should be ready"); + List<ProducerBatch> batches = accum.drain(metadataCache, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); ProducerBatch batch = batches.get(0); @@ -329,9 +360,9 @@ public class RecordAccumulatorTest { for (int i = 0; i < appends; i++) accum.append(tp.topic(), tp.partition(), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); } - assertEquals(Collections.singleton(node1), accum.ready(metadataMock, time.milliseconds()).readyNodes, "Partition's leader should be ready"); + assertEquals(Collections.singleton(node1), accum.ready(metadataCache, time.milliseconds()).readyNodes, "Partition's leader should be ready"); - List<ProducerBatch> batches = accum.drain(metadataMock, Collections.singleton(node1), 1024, 0).get(node1.id()); + List<ProducerBatch> batches = accum.drain(metadataCache, Collections.singleton(node1), 1024, 0).get(node1.id()); assertEquals(1, batches.size(), "But due to size bound only one partition should have been retrieved"); } @@ -360,8 +391,8 @@ public class RecordAccumulatorTest { int read = 0; long now = time.milliseconds(); while (read < numThreads * msgs) { - Set<Node> nodes = accum.ready(metadataMock, now).readyNodes; - List<ProducerBatch> batches = accum.drain(metadataMock, nodes, 5 * 1024, 0).get(node1.id()); + Set<Node> nodes = accum.ready(metadataCache, now).readyNodes; + List<ProducerBatch> batches = accum.drain(metadataCache, nodes, 5 * 1024, 0).get(node1.id()); if (batches != null) { for (ProducerBatch batch : batches) { for (Record record : batch.records().records()) @@ -392,7 +423,7 @@ public class RecordAccumulatorTest { // Partition on node1 only for (int i = 0; i < appends; i++) accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); - RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock, time.milliseconds()); + RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); assertEquals(0, result.readyNodes.size(), "No nodes should be ready."); assertEquals(lingerMs, result.nextReadyCheckDelayMs, "Next check time should be the linger time"); @@ -401,14 +432,14 @@ public class RecordAccumulatorTest { // Add partition on node2 only for (int i = 0; i < appends; i++) accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); - result = accum.ready(metadataMock, time.milliseconds()); + result = accum.ready(metadataCache, time.milliseconds()); assertEquals(0, result.readyNodes.size(), "No nodes should be ready."); assertEquals(lingerMs / 2, result.nextReadyCheckDelayMs, "Next check time should be defined by node1, half remaining linger time"); // Add data for another partition on node1, enough to make data sendable immediately for (int i = 0; i < appends + 1; i++) accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); - result = accum.ready(metadataMock, time.milliseconds()); + result = accum.ready(metadataCache, time.milliseconds()); assertEquals(Collections.singleton(node1), result.readyNodes, "Node1 should be ready"); // Note this can actually be < linger time because it may use delays from partitions that aren't sendable // but have leaders with other sendable data. @@ -430,9 +461,9 @@ public class RecordAccumulatorTest { long now = time.milliseconds(); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); - RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock, now + lingerMs + 1); + RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, now + lingerMs + 1); assertEquals(Collections.singleton(node1), result.readyNodes, "Node1 should be ready"); - Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataMock, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); + Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); assertEquals(1, batches.size(), "Node1 should be the only ready node."); assertEquals(1, batches.get(0).size(), "Partition 0 should only have one batch drained."); @@ -442,23 +473,24 @@ public class RecordAccumulatorTest { // Put message for partition 1 into accumulator accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); - result = accum.ready(metadataMock, now + lingerMs + 1); + result = accum.ready(metadataCache, now + lingerMs + 1); assertEquals(Collections.singleton(node1), result.readyNodes, "Node1 should be ready"); // tp1 should backoff while tp2 should not - batches = accum.drain(metadataMock, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); + batches = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1); assertEquals(1, batches.size(), "Node1 should be the only ready node."); assertEquals(1, batches.get(0).size(), "Node1 should only have one batch drained."); assertEquals(tp2, batches.get(0).get(0).topicPartition, "Node1 should only have one batch for partition 1."); // Partition 0 can be drained after retry backoff - result = accum.ready(metadataMock, now + retryBackoffMs + 1); + result = accum.ready(metadataCache, now + retryBackoffMs + 1); assertEquals(Collections.singleton(node1), result.readyNodes, "Node1 should be ready"); - batches = accum.drain(metadataMock, result.readyNodes, Integer.MAX_VALUE, now + retryBackoffMs + 1); + batches = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, now + retryBackoffMs + 1); assertEquals(1, batches.size(), "Node1 should be the only ready node."); assertEquals(1, batches.get(0).size(), "Node1 should only have one batch drained."); assertEquals(tp1, batches.get(0).get(0).topicPartition, "Node1 should only have one batch for partition 0."); } + @Test public void testFlush() throws Exception { int lingerMs = Integer.MAX_VALUE; @@ -469,14 +501,14 @@ public class RecordAccumulatorTest { accum.append(topic, i % 3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); assertTrue(accum.hasIncomplete()); } - RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock, time.milliseconds()); + RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); assertEquals(0, result.readyNodes.size(), "No nodes should be ready."); accum.beginFlush(); - result = accum.ready(metadataMock, time.milliseconds()); + result = accum.ready(metadataCache, time.milliseconds()); // drain and deallocate all batches - Map<Integer, List<ProducerBatch>> results = accum.drain(metadataMock, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + Map<Integer, List<ProducerBatch>> results = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertTrue(accum.hasIncomplete()); for (List<ProducerBatch> batches: results.values()) @@ -536,9 +568,9 @@ public class RecordAccumulatorTest { } for (int i = 0; i < numRecords; i++) accum.append(topic, i % 3, 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false, time.milliseconds(), cluster); - RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock, time.milliseconds()); + RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); assertFalse(result.readyNodes.isEmpty()); - Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataMock, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertTrue(accum.hasUndrained()); assertTrue(accum.hasIncomplete()); @@ -581,9 +613,9 @@ public class RecordAccumulatorTest { } for (int i = 0; i < numRecords; i++) accum.append(topic, i % 3, 0L, key, value, null, new TestCallback(), maxBlockTimeMs, false, time.milliseconds(), cluster); - RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock, time.milliseconds()); + RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); assertFalse(result.readyNodes.isEmpty()); - Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataMock, result.readyNodes, Integer.MAX_VALUE, + Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertTrue(accum.hasUndrained()); assertTrue(accum.hasIncomplete()); @@ -620,10 +652,10 @@ public class RecordAccumulatorTest { if (time.milliseconds() < System.currentTimeMillis()) time.setCurrentTimeMs(System.currentTimeMillis()); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); - assertEquals(0, accum.ready(metadataMock, time.milliseconds()).readyNodes.size(), "No partition should be ready."); + assertEquals(0, accum.ready(metadataCache, time.milliseconds()).readyNodes.size(), "No partition should be ready."); time.sleep(lingerMs); - readyNodes = accum.ready(metadataMock, time.milliseconds()).readyNodes; + readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready"); expiredBatches = accum.expiredBatches(time.milliseconds()); @@ -638,7 +670,7 @@ public class RecordAccumulatorTest { time.sleep(deliveryTimeoutMs - lingerMs); expiredBatches = accum.expiredBatches(time.milliseconds()); assertEquals(1, expiredBatches.size(), "The batch may expire when the partition is muted"); - assertEquals(0, accum.ready(metadataMock, time.milliseconds()).readyNodes.size(), "No partitions should be ready."); + assertEquals(0, accum.ready(metadataCache, time.milliseconds()).readyNodes.size(), "No partitions should be ready."); } } @@ -669,11 +701,11 @@ public class RecordAccumulatorTest { // Test batches not in retry for (int i = 0; i < appends; i++) { accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); - assertEquals(0, accum.ready(metadataMock, time.milliseconds()).readyNodes.size(), "No partitions should be ready."); + assertEquals(0, accum.ready(metadataCache, time.milliseconds()).readyNodes.size(), "No partitions should be ready."); } // Make the batches ready due to batch full accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster); - Set<Node> readyNodes = accum.ready(metadataMock, time.milliseconds()).readyNodes; + Set<Node> readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready"); // Advance the clock to expire the batch. time.sleep(deliveryTimeoutMs + 1); @@ -684,7 +716,7 @@ public class RecordAccumulatorTest { accum.unmutePartition(tp1); expiredBatches = accum.expiredBatches(time.milliseconds()); assertEquals(0, expiredBatches.size(), "All batches should have been expired earlier"); - assertEquals(0, accum.ready(metadataMock, time.milliseconds()).readyNodes.size(), "No partitions should be ready."); + assertEquals(0, accum.ready(metadataCache, time.milliseconds()).readyNodes.size(), "No partitions should be ready."); // Advance the clock to make the next batch ready due to linger.ms time.sleep(lingerMs); @@ -698,15 +730,15 @@ public class RecordAccumulatorTest { accum.unmutePartition(tp1); expiredBatches = accum.expiredBatches(time.milliseconds()); assertEquals(0, expiredBatches.size(), "All batches should have been expired"); - assertEquals(0, accum.ready(metadataMock, time.milliseconds()).readyNodes.size(), "No partitions should be ready."); + assertEquals(0, accum.ready(metadataCache, time.milliseconds()).readyNodes.size(), "No partitions should be ready."); // Test batches in retry. // Create a retried batch accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster); time.sleep(lingerMs); - readyNodes = accum.ready(metadataMock, time.milliseconds()).readyNodes; + readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready"); - Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataMock, readyNodes, Integer.MAX_VALUE, time.milliseconds()); + Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataCache, readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertEquals(drained.get(node1.id()).size(), 1, "There should be only one batch."); time.sleep(1000L); accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds()); @@ -728,7 +760,7 @@ public class RecordAccumulatorTest { // Test that when being throttled muted batches are expired before the throttle time is over. accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster); time.sleep(lingerMs); - readyNodes = accum.ready(metadataMock, time.milliseconds()).readyNodes; + readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready"); // Advance the clock to expire the batch. time.sleep(requestTimeout + 1); @@ -746,7 +778,7 @@ public class RecordAccumulatorTest { time.sleep(throttleTimeMs); expiredBatches = accum.expiredBatches(time.milliseconds()); assertEquals(0, expiredBatches.size(), "All batches should have been expired earlier"); - assertEquals(1, accum.ready(metadataMock, time.milliseconds()).readyNodes.size(), "No partitions should be ready."); + assertEquals(1, accum.ready(metadataCache, time.milliseconds()).readyNodes.size(), "No partitions should be ready."); } @Test @@ -760,28 +792,28 @@ public class RecordAccumulatorTest { int appends = expectedNumAppends(batchSize); for (int i = 0; i < appends; i++) { accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); - assertEquals(0, accum.ready(metadataMock, now).readyNodes.size(), "No partitions should be ready."); + assertEquals(0, accum.ready(metadataCache, now).readyNodes.size(), "No partitions should be ready."); } time.sleep(2000); // Test ready with muted partition accum.mutePartition(tp1); - RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock, time.milliseconds()); + RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); assertEquals(0, result.readyNodes.size(), "No node should be ready"); // Test ready without muted partition accum.unmutePartition(tp1); - result = accum.ready(metadataMock, time.milliseconds()); + result = accum.ready(metadataCache, time.milliseconds()); assertTrue(result.readyNodes.size() > 0, "The batch should be ready"); // Test drain with muted partition accum.mutePartition(tp1); - Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataMock, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertEquals(0, drained.get(node1.id()).size(), "No batch should have been drained"); // Test drain without muted partition. accum.unmutePartition(tp1); - drained = accum.drain(metadataMock, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertTrue(drained.get(node1.id()).size() > 0, "The batch should have been drained."); } @@ -831,20 +863,20 @@ public class RecordAccumulatorTest { false, time.milliseconds(), cluster); assertTrue(accumulator.hasUndrained()); - RecordAccumulator.ReadyCheckResult firstResult = accumulator.ready(metadataMock, time.milliseconds()); + RecordAccumulator.ReadyCheckResult firstResult = accumulator.ready(metadataCache, time.milliseconds()); assertEquals(0, firstResult.readyNodes.size()); - Map<Integer, List<ProducerBatch>> firstDrained = accumulator.drain(metadataMock, firstResult.readyNodes, + Map<Integer, List<ProducerBatch>> firstDrained = accumulator.drain(metadataCache, firstResult.readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertEquals(0, firstDrained.size()); // Once the transaction begins completion, then the batch should be drained immediately. Mockito.when(transactionManager.isCompleting()).thenReturn(true); - RecordAccumulator.ReadyCheckResult secondResult = accumulator.ready(metadataMock, time.milliseconds()); + RecordAccumulator.ReadyCheckResult secondResult = accumulator.ready(metadataCache, time.milliseconds()); assertEquals(1, secondResult.readyNodes.size()); Node readyNode = secondResult.readyNodes.iterator().next(); - Map<Integer, List<ProducerBatch>> secondDrained = accumulator.drain(metadataMock, secondResult.readyNodes, + Map<Integer, List<ProducerBatch>> secondDrained = accumulator.drain(metadataCache, secondResult.readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertEquals(Collections.singleton(readyNode.id()), secondDrained.keySet()); List<ProducerBatch> batches = secondDrained.get(readyNode.id()); @@ -874,16 +906,16 @@ public class RecordAccumulatorTest { accum.reenqueue(batch, now); time.sleep(101L); // Drain the batch. - RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock, time.milliseconds()); + RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); assertTrue(result.readyNodes.size() > 0, "The batch should be ready"); - Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataMock, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertEquals(1, drained.size(), "Only node1 should be drained"); assertEquals(1, drained.get(node1.id()).size(), "Only one batch should be drained"); // Split and reenqueue the batch. accum.splitAndReenqueue(drained.get(node1.id()).get(0)); time.sleep(101L); - drained = accum.drain(metadataMock, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertFalse(drained.isEmpty()); assertFalse(drained.get(node1.id()).isEmpty()); drained.get(node1.id()).get(0).complete(acked.get(), 100L); @@ -891,7 +923,7 @@ public class RecordAccumulatorTest { assertTrue(future1.isDone()); assertEquals(0, future1.get().offset()); - drained = accum.drain(metadataMock, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertFalse(drained.isEmpty()); assertFalse(drained.get(node1.id()).isEmpty()); drained.get(node1.id()).get(0).complete(acked.get(), 100L); @@ -912,14 +944,14 @@ public class RecordAccumulatorTest { int numSplitBatches = prepareSplitBatches(accum, seed, 100, 20); assertTrue(numSplitBatches > 0, "There should be some split batches"); // Drain all the split batches. - RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock, time.milliseconds()); + RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); for (int i = 0; i < numSplitBatches; i++) { Map<Integer, List<ProducerBatch>> drained = - accum.drain(metadataMock, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertFalse(drained.isEmpty()); assertFalse(drained.get(node1.id()).isEmpty()); } - assertTrue(accum.ready(metadataMock, time.milliseconds()).readyNodes.isEmpty(), "All the batches should have been drained."); + assertTrue(accum.ready(metadataCache, time.milliseconds()).readyNodes.isEmpty(), "All the batches should have been drained."); assertEquals(bufferCapacity, accum.bufferPoolAvailableMemory(), "The split batches should be allocated off the accumulator"); } @@ -966,16 +998,16 @@ public class RecordAccumulatorTest { batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize, CompressionType.NONE, lingerMs); accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); - Set<Node> readyNodes = accum.ready(metadataMock, time.milliseconds()).readyNodes; - Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataMock, readyNodes, Integer.MAX_VALUE, time.milliseconds()); + Set<Node> readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; + Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataCache, readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertTrue(drained.isEmpty()); //assertTrue(accum.soonToExpireInFlightBatches().isEmpty()); // advanced clock and send one batch out but it should not be included in soon to expire inflight // batches because batch's expiry is quite far. time.sleep(lingerMs + 1); - readyNodes = accum.ready(metadataMock, time.milliseconds()).readyNodes; - drained = accum.drain(metadataMock, readyNodes, Integer.MAX_VALUE, time.milliseconds()); + readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; + drained = accum.drain(metadataCache, readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertEquals(1, drained.size(), "A batch did not drain after linger"); //assertTrue(accum.soonToExpireInFlightBatches().isEmpty()); @@ -984,8 +1016,8 @@ public class RecordAccumulatorTest { time.sleep(lingerMs * 4); // Now drain and check that accumulator picked up the drained batch because its expiry is soon. - readyNodes = accum.ready(metadataMock, time.milliseconds()).readyNodes; - drained = accum.drain(metadataMock, readyNodes, Integer.MAX_VALUE, time.milliseconds()); + readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; + drained = accum.drain(metadataCache, readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertEquals(1, drained.size(), "A batch did not drain after linger"); } @@ -1007,9 +1039,9 @@ public class RecordAccumulatorTest { for (Boolean mute : muteStates) { accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster); time.sleep(lingerMs); - readyNodes = accum.ready(metadataMock, time.milliseconds()).readyNodes; + readyNodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; assertEquals(Collections.singleton(node1), readyNodes, "Our partition's leader should be ready"); - Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataMock, readyNodes, Integer.MAX_VALUE, time.milliseconds()); + Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataCache, readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertEquals(1, drained.get(node1.id()).size(), "There should be only one batch."); time.sleep(rtt); accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds()); @@ -1021,7 +1053,7 @@ public class RecordAccumulatorTest { // test expiration time.sleep(deliveryTimeoutMs - rtt); - accum.drain(metadataMock, Collections.singleton(node1), Integer.MAX_VALUE, time.milliseconds()); + accum.drain(metadataCache, Collections.singleton(node1), Integer.MAX_VALUE, time.milliseconds()); expiredBatches = accum.expiredBatches(time.milliseconds()); assertEquals(mute ? 1 : 0, expiredBatches.size(), "RecordAccumulator has expired batches if the partition is not muted"); } @@ -1062,12 +1094,12 @@ public class RecordAccumulatorTest { // We only appended if we do not retry. if (!switchPartition) { appends++; - assertEquals(0, accum.ready(metadataMock, now).readyNodes.size(), "No partitions should be ready."); + assertEquals(0, accum.ready(metadataCache, now).readyNodes.size(), "No partitions should be ready."); } } // Batch should be full. - assertEquals(1, accum.ready(metadataMock, time.milliseconds()).readyNodes.size()); + assertEquals(1, accum.ready(metadataCache, time.milliseconds()).readyNodes.size()); assertEquals(appends, expectedAppends); switchPartition = false; @@ -1126,6 +1158,12 @@ public class RecordAccumulatorTest { } }; + PartitionInfo part1 = MetadataResponse.toPartitionInfo(partMetadata1, nodes); + PartitionInfo part2 = MetadataResponse.toPartitionInfo(partMetadata2, nodes); + PartitionInfo part3 = MetadataResponse.toPartitionInfo(partMetadata3, nodes); + Cluster cluster = new Cluster(null, asList(node1, node2), asList(part1, part2, part3), + Collections.emptySet(), Collections.emptySet()); + // Produce small record, we should switch to first partition. accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, value, Record.EMPTY_HEADERS, callbacks, maxBlockTimeMs, false, time.milliseconds(), cluster); @@ -1195,7 +1233,7 @@ public class RecordAccumulatorTest { } // Let the accumulator generate the probability tables. - accum.ready(metadataMock, time.milliseconds()); + accum.ready(metadataCache, time.milliseconds()); // Set up callbacks so that we know what partition is chosen. final AtomicInteger partition = new AtomicInteger(RecordMetadata.UNKNOWN_PARTITION); @@ -1239,7 +1277,7 @@ public class RecordAccumulatorTest { // Test that partitions residing on high-latency nodes don't get switched to. accum.updateNodeLatencyStats(0, time.milliseconds() - 200, true); accum.updateNodeLatencyStats(0, time.milliseconds(), false); - accum.ready(metadataMock, time.milliseconds()); + accum.ready(metadataCache, time.milliseconds()); // Do one append, because partition gets switched after append. accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, largeValue, Record.EMPTY_HEADERS, @@ -1276,9 +1314,9 @@ public class RecordAccumulatorTest { time.sleep(10); // We should have one batch ready. - Set<Node> nodes = accum.ready(metadataMock, time.milliseconds()).readyNodes; + Set<Node> nodes = accum.ready(metadataCache, time.milliseconds()).readyNodes; assertEquals(1, nodes.size(), "Should have 1 leader ready"); - List<ProducerBatch> batches = accum.drain(metadataMock, nodes, Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue(); + List<ProducerBatch> batches = accum.drain(metadataCache, nodes, Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue(); assertEquals(1, batches.size(), "Should have 1 batch ready"); int actualBatchSize = batches.get(0).records().sizeInBytes(); assertTrue(actualBatchSize > batchSize / 2, "Batch must be greater than half batch.size"); @@ -1294,12 +1332,9 @@ public class RecordAccumulatorTest { @Test public void testReadyAndDrainWhenABatchIsBeingRetried() throws InterruptedException { int part1LeaderEpoch = 100; - // Create cluster metadata, partition1 being hosted by node1. - part1 = new PartitionInfo(topic, partition1, node1, null, null, null); - cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1), - Collections.emptySet(), Collections.emptySet()); - final int finalEpoch = part1LeaderEpoch; - metadataMock = setupMetadata(cluster, tp -> finalEpoch); + // Create cluster metadata, partition1 being hosted by node1 + PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, tp1, Optional.of(node1.id()), Optional.of(part1LeaderEpoch), null, null, null); + MetadataSnapshot metadataCache = new MetadataSnapshot(null, nodes, Arrays.asList(part1Metadata), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Collections.emptyMap()); int batchSize = 10; int lingerMs = 10; @@ -1319,14 +1354,14 @@ public class RecordAccumulatorTest { // 1st attempt(not a retry) to produce batchA, it should be ready & drained to be produced. { now += lingerMs + 1; - RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock, now); + RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, now); assertTrue(result.readyNodes.contains(node1), "Node1 is ready"); - Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataMock, + Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataCache, result.readyNodes, 999999 /* maxSize */, now); assertTrue(batches.containsKey(node1.id()) && batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained"); ProducerBatch batch = batches.get(node1.id()).get(0); - assertEquals(Optional.of(part1LeaderEpoch), batch.currentLeaderEpoch()); + assertEquals(OptionalInt.of(part1LeaderEpoch), batch.currentLeaderEpoch()); assertEquals(0, batch.attemptsWhenLeaderLastChanged()); // Re-enqueue batch for subsequent retries & test-cases accum.reenqueue(batch, now); @@ -1335,11 +1370,11 @@ public class RecordAccumulatorTest { // In this retry of batchA, wait-time between retries is less than configured and no leader change, so should backoff. { now += 1; - RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock, now); + RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, now); assertFalse(result.readyNodes.contains(node1), "Node1 is not ready"); // Try to drain from node1, it should return no batches. - Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataMock, + Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataCache, new HashSet<>(Arrays.asList(node1)), 999999 /* maxSize */, now); assertTrue(batches.containsKey(node1.id()) && batches.get(node1.id()).isEmpty(), "No batches ready to be drained on Node1"); @@ -1350,19 +1385,16 @@ public class RecordAccumulatorTest { now += 1; part1LeaderEpoch++; // Create cluster metadata, with new leader epoch. - part1 = new PartitionInfo(topic, partition1, node1, null, null, null); - cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1), - Collections.emptySet(), Collections.emptySet()); - final int finalPart1LeaderEpoch = part1LeaderEpoch; - metadataMock = setupMetadata(cluster, tp -> finalPart1LeaderEpoch); - RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock, now); + part1Metadata = new PartitionMetadata(Errors.NONE, tp1, Optional.of(node1.id()), Optional.of(part1LeaderEpoch), null, null, null); + metadataCache = new MetadataSnapshot(null, nodes, Arrays.asList(part1Metadata), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Collections.emptyMap()); + RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, now); assertTrue(result.readyNodes.contains(node1), "Node1 is ready"); - Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataMock, + Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataCache, result.readyNodes, 999999 /* maxSize */, now); assertTrue(batches.containsKey(node1.id()) && batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained"); ProducerBatch batch = batches.get(node1.id()).get(0); - assertEquals(Optional.of(part1LeaderEpoch), batch.currentLeaderEpoch()); + assertEquals(OptionalInt.of(part1LeaderEpoch), batch.currentLeaderEpoch()); assertEquals(1, batch.attemptsWhenLeaderLastChanged()); // Re-enqueue batch for subsequent retries/test-cases. @@ -1373,19 +1405,16 @@ public class RecordAccumulatorTest { { now += 2 * retryBackoffMs; // Create cluster metadata, with new leader epoch. - part1 = new PartitionInfo(topic, partition1, node1, null, null, null); - cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1), - Collections.emptySet(), Collections.emptySet()); - final int finalPart1LeaderEpoch = part1LeaderEpoch; - metadataMock = setupMetadata(cluster, tp -> finalPart1LeaderEpoch); - RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock, now); + part1Metadata = new PartitionMetadata(Errors.NONE, tp1, Optional.of(node1.id()), Optional.of(part1LeaderEpoch), null, null, null); + metadataCache = new MetadataSnapshot(null, nodes, Arrays.asList(part1Metadata), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Collections.emptyMap()); + RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, now); assertTrue(result.readyNodes.contains(node1), "Node1 is ready"); - Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataMock, + Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataCache, result.readyNodes, 999999 /* maxSize */, now); assertTrue(batches.containsKey(node1.id()) && batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained"); ProducerBatch batch = batches.get(node1.id()).get(0); - assertEquals(Optional.of(part1LeaderEpoch), batch.currentLeaderEpoch()); + assertEquals(OptionalInt.of(part1LeaderEpoch), batch.currentLeaderEpoch()); assertEquals(1, batch.attemptsWhenLeaderLastChanged()); // Re-enqueue batch for subsequent retries/test-cases. @@ -1397,19 +1426,16 @@ public class RecordAccumulatorTest { now += 2 * retryBackoffMs; part1LeaderEpoch++; // Create cluster metadata, with new leader epoch. - part1 = new PartitionInfo(topic, partition1, node1, null, null, null); - cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1), - Collections.emptySet(), Collections.emptySet()); - final int finalPart1LeaderEpoch = part1LeaderEpoch; - metadataMock = setupMetadata(cluster, tp -> finalPart1LeaderEpoch); - RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock, now); + part1Metadata = new PartitionMetadata(Errors.NONE, tp1, Optional.of(node1.id()), Optional.of(part1LeaderEpoch), null, null, null); + metadataCache = new MetadataSnapshot(null, nodes, Arrays.asList(part1Metadata), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Collections.emptyMap()); + RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, now); assertTrue(result.readyNodes.contains(node1), "Node1 is ready"); - Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataMock, + Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataCache, result.readyNodes, 999999 /* maxSize */, now); assertTrue(batches.containsKey(node1.id()) && batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained"); ProducerBatch batch = batches.get(node1.id()).get(0); - assertEquals(Optional.of(part1LeaderEpoch), batch.currentLeaderEpoch()); + assertEquals(OptionalInt.of(part1LeaderEpoch), batch.currentLeaderEpoch()); assertEquals(3, batch.attemptsWhenLeaderLastChanged()); // Re-enqueue batch for subsequent retries/test-cases. @@ -1426,97 +1452,15 @@ public class RecordAccumulatorTest { CompressionType.NONE, lingerMs); // Create cluster metadata, node2 doesn't host any partitions. - part1 = new PartitionInfo(topic, partition1, node1, null, null, null); - cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1), - Collections.emptySet(), Collections.emptySet()); - metadataMock = Mockito.mock(Metadata.class); - Mockito.when(metadataMock.fetch()).thenReturn(cluster); - Mockito.when(metadataMock.currentLeader(tp1)).thenReturn( - new Metadata.LeaderAndEpoch(Optional.of(node1), - Optional.of(999 /* dummy value */))); + PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, tp1, Optional.of(node1.id()), Optional.empty(), null, null, null); + MetadataSnapshot metadataCache = new MetadataSnapshot(null, nodes, Arrays.asList(part1Metadata), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Collections.emptyMap()); // Drain for node2, it should return 0 batches, - Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataMock, + Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataCache, new HashSet<>(Arrays.asList(node2)), 999999 /* maxSize */, time.milliseconds()); assertTrue(batches.get(node2.id()).isEmpty()); } - @Test - public void testDrainOnANodeWhenItCeasesToBeALeader() throws InterruptedException { - int batchSize = 10; - int lingerMs = 10; - long totalSize = 10 * 1024; - RecordAccumulator accum = createTestRecordAccumulator(batchSize, totalSize, - CompressionType.NONE, lingerMs); - - // While node1 is being drained, leader changes from node1 -> node2 for a partition. - { - // Create cluster metadata, partition1&2 being hosted by node1&2 resp. - part1 = new PartitionInfo(topic, partition1, node1, null, null, null); - part2 = new PartitionInfo(topic, partition2, node2, null, null, null); - cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2), - Collections.emptySet(), Collections.emptySet()); - metadataMock = Mockito.mock(Metadata.class); - Mockito.when(metadataMock.fetch()).thenReturn(cluster); - // But metadata has a newer leader for partition1 i.e node2. - Mockito.when(metadataMock.currentLeader(tp1)).thenReturn( - new Metadata.LeaderAndEpoch(Optional.of(node2), - Optional.of(999 /* dummy value */))); - Mockito.when(metadataMock.currentLeader(tp2)).thenReturn( - new Metadata.LeaderAndEpoch(Optional.of(node2), - Optional.of(999 /* dummy value */))); - - // Create 1 batch each for partition1 & partition2. - long now = time.milliseconds(); - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, - maxBlockTimeMs, false, now, cluster); - accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, - maxBlockTimeMs, false, now, cluster); - - // Drain for node1, it should return 0 batches, as partition1's leader in metadata changed. - // Drain for node2, it should return 1 batch, for partition2. - Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataMock, - new HashSet<>(Arrays.asList(node1, node2)), 999999 /* maxSize */, now); - assertTrue(batches.get(node1.id()).isEmpty()); - assertEquals(1, batches.get(node2.id()).size()); - } - - // Cleanup un-drained batches to have an empty accum before next test. - accum.abortUndrainedBatches(new RuntimeException()); - - // While node1 is being drained, leader changes from node1 -> "no-leader" for partition. - { - // Create cluster metadata, partition1&2 being hosted by node1&2 resp. - part1 = new PartitionInfo(topic, partition1, node1, null, null, null); - part2 = new PartitionInfo(topic, partition2, node2, null, null, null); - cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2), - Collections.emptySet(), Collections.emptySet()); - metadataMock = Mockito.mock(Metadata.class); - Mockito.when(metadataMock.fetch()).thenReturn(cluster); - // But metadata no longer has a leader for partition1. - Mockito.when(metadataMock.currentLeader(tp1)).thenReturn( - new Metadata.LeaderAndEpoch(Optional.empty(), - Optional.of(999 /* dummy value */))); - Mockito.when(metadataMock.currentLeader(tp2)).thenReturn( - new Metadata.LeaderAndEpoch(Optional.of(node2), - Optional.of(999 /* dummy value */))); - - // Create 1 batch each for partition1 & partition2. - long now = time.milliseconds(); - accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, - maxBlockTimeMs, false, now, cluster); - accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS, null, - maxBlockTimeMs, false, now, cluster); - - // Drain for node1, it should return 0 batches, as partition1's leader in metadata changed. - // Drain for node2, it should return 1 batch, for partition2. - Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataMock, - new HashSet<>(Arrays.asList(node1, node2)), 999999 /* maxSize */, now); - assertTrue(batches.get(node1.id()).isEmpty()); - assertEquals(1, batches.get(node2.id()).size()); - } - } - private int prepareSplitBatches(RecordAccumulator accum, long seed, int recordSize, int numRecords) throws InterruptedException { Random random = new Random(); @@ -1529,9 +1473,9 @@ public class RecordAccumulatorTest { accum.append(topic, partition1, 0L, null, bytesWithPoorCompression(random, recordSize), Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster); } - RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock, time.milliseconds()); + RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); assertFalse(result.readyNodes.isEmpty()); - Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataMock, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertEquals(1, batches.size()); assertEquals(1, batches.values().iterator().next().size()); ProducerBatch batch = batches.values().iterator().next().get(0); @@ -1547,8 +1491,8 @@ public class RecordAccumulatorTest { boolean batchDrained; do { batchDrained = false; - RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock, time.milliseconds()); - Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataMock, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); + Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); for (List<ProducerBatch> batchList : batches.values()) { for (ProducerBatch batch : batchList) { batchDrained = true; @@ -1665,27 +1609,4 @@ public class RecordAccumulatorTest { txnManager, new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)); } - - /** - * Setup a mocked metadata object. - */ - private Metadata setupMetadata(Cluster cluster) { - return setupMetadata(cluster, tp -> 999 /* dummy epoch */); - } - - /** - * Setup a mocked metadata object. - */ - private Metadata setupMetadata(Cluster cluster, final Function<TopicPartition, Integer> epochSupplier) { - Metadata metadataMock = Mockito.mock(Metadata.class); - Mockito.when(metadataMock.fetch()).thenReturn(cluster); - for (String topic: cluster.topics()) { - for (PartitionInfo partInfo: cluster.partitionsForTopic(topic)) { - TopicPartition tp = new TopicPartition(partInfo.topic(), partInfo.partition()); - Integer partLeaderEpoch = epochSupplier.apply(tp); - Mockito.when(metadataMock.currentLeader(tp)).thenReturn(new Metadata.LeaderAndEpoch(Optional.of(partInfo.leader()), Optional.of(partLeaderEpoch))); - } - } - return metadataMock; - } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 06c9448a336..3a2dddf966e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.clients.producer.internals; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.MetadataSnapshot; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.NodeApiVersions; @@ -467,7 +468,7 @@ public class SenderTest { final byte[] key = "key".getBytes(); final byte[] value = "value".getBytes(); final long maxBlockTimeMs = 1000; - Cluster cluster = TestUtils.singletonCluster(); + MetadataSnapshot metadataCache = TestUtils.metadataSnapshotWith(1); RecordAccumulator.AppendCallbacks callbacks = new RecordAccumulator.AppendCallbacks() { @Override public void setPartition(int partition) { @@ -479,7 +480,7 @@ public class SenderTest { expiryCallbackCount.incrementAndGet(); try { accumulator.append(tp1.topic(), tp1.partition(), 0L, key, value, - Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); + Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), metadataCache.cluster()); } catch (InterruptedException e) { throw new RuntimeException("Unexpected interruption", e); } @@ -490,14 +491,14 @@ public class SenderTest { final long nowMs = time.milliseconds(); for (int i = 0; i < messagesPerBatch; i++) - accumulator.append(tp1.topic(), tp1.partition(), 0L, key, value, null, callbacks, maxBlockTimeMs, false, nowMs, cluster); + accumulator.append(tp1.topic(), tp1.partition(), 0L, key, value, null, callbacks, maxBlockTimeMs, false, nowMs, metadataCache.cluster()); // Advance the clock to expire the first batch. time.sleep(10000); Node clusterNode = metadata.fetch().nodes().get(0); Map<Integer, List<ProducerBatch>> drainedBatches = - accumulator.drain(metadata, Collections.singleton(clusterNode), Integer.MAX_VALUE, time.milliseconds()); + accumulator.drain(metadataCache, Collections.singleton(clusterNode), Integer.MAX_VALUE, time.milliseconds()); sender.addToInflightBatches(drainedBatches); // Disconnect the target node for the pending produce request. This will ensure that sender will try to diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index bb6e4af7dde..f130c6a198e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer.internals; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MetadataSnapshot; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.consumer.CommitFailedException; @@ -69,6 +70,7 @@ import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.InitProducerIdResponse; import org.apache.kafka.common.requests.JoinGroupRequest; +import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata; import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ProduceResponse; import org.apache.kafka.common.requests.RequestTestUtils; @@ -2471,16 +2473,16 @@ public class TransactionManagerTest { Node node1 = new Node(0, "localhost", 1111); Node node2 = new Node(1, "localhost", 1112); - PartitionInfo part1 = new PartitionInfo(topic, 0, node1, null, null); - PartitionInfo part2 = new PartitionInfo(topic, 1, node2, null, null); - - Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2), - Collections.emptySet(), Collections.emptySet()); - Metadata metadataMock = setupMetadata(cluster); + Map<Integer, Node> nodesById = new HashMap<>(); + nodesById.put(node1.id(), node1); + nodesById.put(node2.id(), node2); + PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, tp0, Optional.of(node1.id()), Optional.empty(), null, null, null); + PartitionMetadata part2Metadata = new PartitionMetadata(Errors.NONE, tp1, Optional.of(node2.id()), Optional.empty(), null, null, null); + MetadataSnapshot metadataCache = new MetadataSnapshot(null, nodesById, Arrays.asList(part1Metadata, part2Metadata), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Collections.emptyMap()); Set<Node> nodes = new HashSet<>(); nodes.add(node1); nodes.add(node2); - Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(metadataMock, nodes, Integer.MAX_VALUE, + Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(metadataCache, nodes, Integer.MAX_VALUE, time.milliseconds()); // We shouldn't drain batches which haven't been added to the transaction yet. @@ -2506,12 +2508,10 @@ public class TransactionManagerTest { // Try to drain a message destined for tp1, it should get drained. Node node1 = new Node(1, "localhost", 1112); - PartitionInfo part1 = new PartitionInfo(topic, 1, node1, null, null); - Cluster cluster = new Cluster(null, Collections.singletonList(node1), Collections.singletonList(part1), - Collections.emptySet(), Collections.emptySet()); - Metadata metadataMock = setupMetadata(cluster); + PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, tp1, Optional.of(node1.id()), Optional.empty(), null, null, null); + MetadataSnapshot metadataCache = new MetadataSnapshot(null, Collections.singletonMap(node1.id(), node1), Arrays.asList(part1Metadata), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Collections.emptyMap()); appendToAccumulator(tp1); - Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(metadataMock, Collections.singleton(node1), + Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(metadataCache, Collections.singleton(node1), Integer.MAX_VALUE, time.milliseconds()); @@ -2529,15 +2529,12 @@ public class TransactionManagerTest { // Don't execute transactionManager.maybeAddPartitionToTransaction(tp0). This should result in an error on drain. appendToAccumulator(tp0); Node node1 = new Node(0, "localhost", 1111); - PartitionInfo part1 = new PartitionInfo(topic, 0, node1, null, null); - - Cluster cluster = new Cluster(null, Collections.singletonList(node1), Collections.singletonList(part1), - Collections.emptySet(), Collections.emptySet()); - Metadata metadataMock = setupMetadata(cluster); + PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, tp0, Optional.of(node1.id()), Optional.empty(), null, null, null); + MetadataSnapshot metadataCache = new MetadataSnapshot(null, Collections.singletonMap(node1.id(), node1), Arrays.asList(part1Metadata), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Collections.emptyMap()); Set<Node> nodes = new HashSet<>(); nodes.add(node1); - Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(metadataMock, nodes, Integer.MAX_VALUE, + Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(metadataCache, nodes, Integer.MAX_VALUE, time.milliseconds()); // We shouldn't drain batches which haven't been added to the transaction yet. diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 3fc51f23a2b..a74c869d293 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import org.apache.kafka.clients.MetadataSnapshot; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.Cluster; @@ -29,10 +30,12 @@ import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.RecordVersion; import org.apache.kafka.common.record.UnalignedRecords; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.ByteBufferChannel; +import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; @@ -123,6 +126,42 @@ public class TestUtils { return clusterWith(nodes, Collections.singletonMap(topic, partitions)); } + /** + * Test utility function to get MetadataSnapshot with configured nodes and partitions. + * @param nodes number of nodes in the cluster + * @param topicPartitionCounts map of topic -> # of partitions + * @return a MetadataSnapshot with number of nodes, partitions as per the input. + */ + + public static MetadataSnapshot metadataSnapshotWith(final int nodes, final Map<String, Integer> topicPartitionCounts) { + final Node[] ns = new Node[nodes]; + Map<Integer, Node> nodesById = new HashMap<>(); + for (int i = 0; i < nodes; i++) { + ns[i] = new Node(i, "localhost", 1969); + nodesById.put(ns[i].id(), ns[i]); + } + final List<PartitionMetadata> partsMetadatas = new ArrayList<>(); + for (final Map.Entry<String, Integer> topicPartition : topicPartitionCounts.entrySet()) { + final String topic = topicPartition.getKey(); + final int partitions = topicPartition.getValue(); + for (int i = 0; i < partitions; i++) { + TopicPartition tp = new TopicPartition(topic, partitions); + Node node = ns[i % ns.length]; + partsMetadatas.add(new PartitionMetadata(Errors.NONE, tp, Optional.of(node.id()), Optional.empty(), null, null, null)); + } + } + return new MetadataSnapshot("kafka-cluster", nodesById, partsMetadatas, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Collections.emptyMap()); + } + + /** + * Test utility function to get MetadataSnapshot of cluster with configured, and 0 partitions. + * @param nodes number of nodes in the cluster. + * @return a MetadataSnapshot of cluster with number of nodes in the input. + */ + public static MetadataSnapshot metadataSnapshotWith(int nodes) { + return metadataSnapshotWith(nodes, new HashMap<>()); + } + /** * Generate an array of random bytes *