Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on PR #15323: URL: https://github.com/apache/kafka/pull/15323#issuecomment-1949077518 @ijuma thanks for flagging https://github.com/apache/kafka/pull/15376. @hachikuji Looks like this was going to add a test that tested the concurrent update of `Metadata`, and fetching `MetadataSnapshot`/`Cluster`. This is useful, so i have created a follow-up PR https://github.com/apache/kafka/pull/15385 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on PR #15323: URL: https://github.com/apache/kafka/pull/15323#issuecomment-1948142511 @hachikuji thanks for merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
hachikuji merged PR #15323: URL: https://github.com/apache/kafka/pull/15323 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
ijuma commented on PR #15323: URL: https://github.com/apache/kafka/pull/15323#issuecomment-1945066461 @hachikuji @msn-tldr #15376 looks related. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1489860692 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -279,7 +286,7 @@ synchronized Optional partitionMetadataIfCur * @return a mapping from topic names to topic IDs for all topics with valid IDs in the cache */ public synchronized Map topicIds() { Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1489832782 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -279,7 +286,7 @@ synchronized Optional partitionMetadataIfCur * @return a mapping from topic names to topic IDs for all topics with valid IDs in the cache */ public synchronized Map topicIds() { Review Comment: That's a good call out, i will check other methods reading from this snapshot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
ijuma commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1489615650 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -279,7 +286,7 @@ synchronized Optional partitionMetadataIfCur * @return a mapping from topic names to topic IDs for all topics with valid IDs in the cache */ public synchronized Map topicIds() { Review Comment: Why is this synchronized? We should avoid synchronized for methods that are simply reading a snapshot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
ijuma commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1489615650 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -279,7 +286,7 @@ synchronized Optional partitionMetadataIfCur * @return a mapping from topic names to topic IDs for all topics with valid IDs in the cache */ public synchronized Map topicIds() { Review Comment: Why is this synchronized? We should avoid synchronized for methods that are simply reading a snapshot. Also, are there other methods like this one that don't need to be synchronized? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on PR #15323: URL: https://github.com/apache/kafka/pull/15323#issuecomment-1943675980 Went through the test failures across all jdk/scala combos, they are unrelated, and have been failing before this PR as well https://ge.apache.org/s/wftzjb3q6slyc/tests/overview?outcome=FAILED https://ge.apache.org/s/lyqs6eqs4mtny/tests/overview?outcome=FAILED https://ge.apache.org/s/x6x27oapk6qsa/tests/overview?outcome=FAILED https://ge.apache.org/s/sleegbh5pfyfo/tests/overview?outcome=FAILED The test failure belong to test `kafka.server.LogDirFailureTest`, they are being fixed here https://issues.apache.org/jira/browse/KAFKA-16225 @hachikuji I believe this good to be merged, what do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
hachikuji commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1488286776 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -124,14 +124,14 @@ public Metadata(long refreshBackoffMs, * Get the current cluster info without blocking */ public synchronized Cluster fetch() { -return cache.cluster(); +return metadataSnapshot.cluster(); Review Comment: We could drop synchronization here as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1487739946 ## clients/src/test/java/org/apache/kafka/test/TestUtils.java: ## @@ -123,6 +126,34 @@ public static Cluster clusterWith(final int nodes, final String topic, final int return clusterWith(nodes, Collections.singletonMap(topic, partitions)); } +public static MetadataCache metadataCacheWith(final int nodes, final Map topicPartitionCounts) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1487728160 ## clients/src/main/java/org/apache/kafka/clients/MetadataCache.java: ## @@ -39,8 +39,9 @@ import java.util.stream.Collectors; /** - * An internal mutable cache of nodes, topics, and partitions in the Kafka cluster. This keeps an up-to-date Cluster + * An internal immutable cache of nodes, topics, and partitions in the Kafka cluster. This keeps an up-to-date Cluster * instance which is optimized for read access. + * Prefer to extend MetadataCache's API for internal client usage Vs the public {@link Cluster} */ public class MetadataCache { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
hachikuji commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1486800347 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -127,6 +127,13 @@ public synchronized Cluster fetch() { return cache.cluster(); } +/** + * Get the current metadata cache. + */ +public synchronized MetadataCache fetchCache() { Review Comment: It makes sense to require exclusive access when building the cache, but here we're just accessing the built value. So I don't think the synchronization is necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1486639832 ## clients/src/main/java/org/apache/kafka/clients/MetadataCache.java: ## @@ -113,14 +116,28 @@ Optional nodeById(int id) { return Optional.ofNullable(nodes.get(id)); } -Cluster cluster() { +public Cluster cluster() { if (clusterInstance == null) { throw new IllegalStateException("Cached Cluster instance should not be null, but was."); } else { return clusterInstance; } } +/** + * Get leader-epoch for partition. + * @param tp partition + * @return leader-epoch if known, else return optional.empty() + */ +public Optional leaderEpochFor(TopicPartition tp) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1486583557 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -879,17 +884,12 @@ private List drainBatchesForOneNode(Metadata metadata, Node node, // Only proceed if the partition has no in-flight batches. if (isMuted(tp)) continue; -Metadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); -// Although a small chance, but skip this partition if leader has changed since the partition -> node assignment obtained from outside the loop. Review Comment: Correct, since snapshot is immutable & race condition is not possible, hence removed the code & comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1486590491 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ## @@ -110,7 +110,8 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon * @param latestLeaderEpoch latest leader's epoch. */ void maybeUpdateLeaderEpoch(Optional latestLeaderEpoch) { -if (!currentLeaderEpoch.equals(latestLeaderEpoch)) { +if (latestLeaderEpoch.isPresent() Review Comment: The change is intentional. The optimisation that KIP-951 proposed is that producer-batch should skip backoff period if "new leader" is known. This change corrects the previous logic to update the batch's leader-epoch only if its newer. I added a test in `ProducerBatchTest.java` to reflect that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1486583557 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -879,17 +884,12 @@ private List drainBatchesForOneNode(Metadata metadata, Node node, // Only proceed if the partition has no in-flight batches. if (isMuted(tp)) continue; -Metadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); -// Although a small chance, but skip this partition if leader has changed since the partition -> node assignment obtained from outside the loop. Review Comment: Correct, since its immutable, this is not possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1486572035 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -127,6 +127,13 @@ public synchronized Cluster fetch() { return cache.cluster(); } +/** + * Get the current metadata cache. + */ +public synchronized MetadataCache fetchCache() { Review Comment: @hachikuji Interesting. `Metadata.update()` requires mutual exclusion while updating `cache`, other internal data structures of `Metadata`. So it makes sense to keep the synchronizatiion, what do you think? Moreover, `fetchCache` is called once in `Sender::sendProducerData`, so it's not a bottle neck in the hotpath. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
hachikuji commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1484643624 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -127,6 +127,13 @@ public synchronized Cluster fetch() { return cache.cluster(); } +/** + * Get the current metadata cache. + */ +public synchronized MetadataCache fetchCache() { Review Comment: Perhaps we could make `cache` volatile and avoid the synchronization? ## clients/src/main/java/org/apache/kafka/clients/MetadataCache.java: ## @@ -39,8 +39,9 @@ import java.util.stream.Collectors; /** - * An internal mutable cache of nodes, topics, and partitions in the Kafka cluster. This keeps an up-to-date Cluster + * An internal immutable cache of nodes, topics, and partitions in the Kafka cluster. This keeps an up-to-date Cluster * instance which is optimized for read access. + * Prefer to extend MetadataCache's API for internal client usage Vs the public {@link Cluster} */ public class MetadataCache { Review Comment: We don't have to do it here, but "snapshot" might be a better name since it suggests immutability. ## clients/src/main/java/org/apache/kafka/clients/MetadataCache.java: ## @@ -113,14 +116,28 @@ Optional nodeById(int id) { return Optional.ofNullable(nodes.get(id)); } -Cluster cluster() { +public Cluster cluster() { if (clusterInstance == null) { throw new IllegalStateException("Cached Cluster instance should not be null, but was."); } else { return clusterInstance; } } +/** + * Get leader-epoch for partition. + * @param tp partition + * @return leader-epoch if known, else return optional.empty() + */ +public Optional leaderEpochFor(TopicPartition tp) { Review Comment: Would it make sense to use `OptionalInt`? ## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ## @@ -110,7 +110,8 @@ public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, lon * @param latestLeaderEpoch latest leader's epoch. */ void maybeUpdateLeaderEpoch(Optional latestLeaderEpoch) { -if (!currentLeaderEpoch.equals(latestLeaderEpoch)) { +if (latestLeaderEpoch.isPresent() Review Comment: Hmm, this looks like a change in behavior. Previously we would override a known current leader epoch if `latestLeaderEpoch` is not defined. I am not sure if that was intentional. I recall there we had some logic which assumed that metadata version might be downgraded and no longer provide a leader epoch. Not sure it matters here though since we are not changing the leader, just updating the epoch. ## clients/src/test/java/org/apache/kafka/test/TestUtils.java: ## @@ -123,6 +126,34 @@ public static Cluster clusterWith(final int nodes, final String topic, final int return clusterWith(nodes, Collections.singletonMap(topic, partitions)); } +public static MetadataCache metadataCacheWith(final int nodes, final Map topicPartitionCounts) { Review Comment: Useful to have a brief javadoc to explain replica counts and assignment strategy. ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -879,17 +884,12 @@ private List drainBatchesForOneNode(Metadata metadata, Node node, // Only proceed if the partition has no in-flight batches. if (isMuted(tp)) continue; -Metadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); -// Although a small chance, but skip this partition if leader has changed since the partition -> node assignment obtained from outside the loop. Review Comment: I guess this race is no longer possible since we are using the snapshot, which is immutable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1484638827 ## clients/src/main/java/org/apache/kafka/clients/MetadataCache.java: ## @@ -52,7 +52,7 @@ public class MetadataCache { private final Map metadataByPartition; private final Map topicIds; private final Map topicNames; -private Cluster clusterInstance; +private InternalCluster clusterInstance; Review Comment: > We should confirm though. I don't see it being used mutably in code. I see historically, it was made mutable to support deletion/updates within cache, but the deletion/update code has since been removed. As far i can see, read-only semantic. So i have treated `MetadataCache` as immutable cache, made its internal data structures unmodifiable and updated the javadoc. All clients test pass locally, hopefully Jenkins signal is green too 🤞 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
hachikuji commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1483493757 ## clients/src/main/java/org/apache/kafka/clients/MetadataCache.java: ## @@ -52,7 +52,7 @@ public class MetadataCache { private final Map metadataByPartition; private final Map topicIds; private final Map topicNames; -private Cluster clusterInstance; +private InternalCluster clusterInstance; Review Comment: Discussed offline. It does not look like `PartitionMetadata` should be treated as mutable. It comes directly from the Metadata response and I can't think of a reason the client would update any of the replica sets directly. We should confirm though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1483459996 ## clients/src/main/java/org/apache/kafka/clients/MetadataCache.java: ## @@ -52,7 +52,7 @@ public class MetadataCache { private final Map metadataByPartition; private final Map topicIds; private final Map topicNames; -private Cluster clusterInstance; +private InternalCluster clusterInstance; Review Comment: @hachikuji I had thought about `MetadataCache`. It has 1 accessor i.e. `partitionMetadata()` that is returning mutable `PartitionMetadata`, and is not making defensive copies. Rest all accessors are returning immutable objects, or making defensive copies. Is it ok for `partitionMetadata()` to make defensive copies? That could lead to memory going up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1483459996 ## clients/src/main/java/org/apache/kafka/clients/MetadataCache.java: ## @@ -52,7 +52,7 @@ public class MetadataCache { private final Map metadataByPartition; private final Map topicIds; private final Map topicNames; -private Cluster clusterInstance; +private InternalCluster clusterInstance; Review Comment: @hachikuji I had thought about `MetadataCache`. It has 1 accessor i.e. `partitionMetadata()` that is returning mutable `PartitionMetadata`, and is not making defensive copies. Rest all accessors are returning immutable objects, or making defensive copies. Is it ok for `partitionMetadata()` to make defensive copies? That would lead to memory going up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
hachikuji commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1483386345 ## clients/src/main/java/org/apache/kafka/clients/MetadataCache.java: ## @@ -52,7 +52,7 @@ public class MetadataCache { private final Map metadataByPartition; private final Map topicIds; private final Map topicNames; -private Cluster clusterInstance; +private InternalCluster clusterInstance; Review Comment: The javadoc for `MetadataCache` describes it as mutable, but as far as I can tell, we do not actually modify any of the collections. We always build new instances instead of updating an existing one. That makes me wonder if we can change the javadoc and use `MetadataCache` as the immutable snapshot of metadata state. Then we could drop `InternalCluster` in favor of `MetadataCache`. Would that work? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on PR #15323: URL: https://github.com/apache/kafka/pull/15323#issuecomment-1934424289 The associated Jenkins failure is due to compilation errors with Scala 2.12 introduced here https://github.com/apache/kafka/pull/15327#issuecomment-1933811374 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1483170888 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition part, Node leader, } /** - * Iterate over partitions to see which one have batches ready and collect leaders of those partitions - * into the set of ready nodes. If partition has no leader, add the topic to the set of topics with - * no leader. This function also calculates stats for adaptive partitioning. + * Iterate over partitions to see which one have batches ready and collect leaders of those + * partitions into the set of ready nodes. If partition has no leader, add the topic to the set + * of topics with no leader. This function also calculates stats for adaptive partitioning. * - * @param metadata The cluster metadata - * @param nowMs The current time - * @param topic The topic - * @param topicInfo The topic info + * @param cluster The cluster metadata + * @param nowMs The current time + * @param topic The topic + * @param topicInfo The topic info * @param nextReadyCheckDelayMs The delay for next check - * @param readyNodes The set of ready nodes (to be filled in) - * @param unknownLeaderTopics The set of topics with no leader (to be filled in) + * @param readyNodesThe set of ready nodes (to be filled in) + * @param unknownLeaderTopics The set of topics with no leader (to be filled in) * @return The delay for next check */ -private long partitionReady(Metadata metadata, long nowMs, String topic, +private long partitionReady(Cluster cluster, long nowMs, String topic, Review Comment: @hachikuji Thanks for the draft PR. I have introduced `InternalCluster` as a wrapper around public `Cluster`. I have extended `InternalCluster` to `leaderEpochFor` that is only for client's internal-usage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1483161801 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ## @@ -1564,97 +1597,18 @@ public void testDrainWithANodeThatDoesntHostAnyPartitions() { CompressionType.NONE, lingerMs); // Create cluster metadata, node2 doesn't host any partitions. -part1 = new PartitionInfo(topic, partition1, node1, null, null, null); -cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1), +PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, tp1, Optional.of(node1.id()), Optional.empty(), null, null, null); +PartitionInfo part1 = MetadataResponse.toPartitionInfo(part1Metadata, nodesById); +Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1), Collections.emptySet(), Collections.emptySet()); -metadataMock = Mockito.mock(Metadata.class); -Mockito.when(metadataMock.fetch()).thenReturn(cluster); -Mockito.when(metadataMock.currentLeader(tp1)).thenReturn( -new Metadata.LeaderAndEpoch(Optional.of(node1), -Optional.of(999 /* dummy value */))); +InternalCluster internalCluster = new InternalCluster(cluster, Collections.singletonMap(tp1, part1Metadata)); // Drain for node2, it should return 0 batches, -Map> batches = accum.drain(metadataMock, +Map> batches = accum.drain(internalCluster, new HashSet<>(Arrays.asList(node2)), 99 /* maxSize */, time.milliseconds()); assertTrue(batches.get(node2.id()).isEmpty()); } -@Test -public void testDrainOnANodeWhenItCeasesToBeALeader() throws InterruptedException { Review Comment: This is no longer needed as `drainBatchesForOneNode` uses `InternalCluster` now Vs `Metadata` earlier. With `Metadata` is mutable, it can happen a node is a partition leader but then leadership moves another node. This is not possible as `InternalCluster` is immutable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
hachikuji commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1481936087 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition part, Node leader, } /** - * Iterate over partitions to see which one have batches ready and collect leaders of those partitions - * into the set of ready nodes. If partition has no leader, add the topic to the set of topics with - * no leader. This function also calculates stats for adaptive partitioning. + * Iterate over partitions to see which one have batches ready and collect leaders of those + * partitions into the set of ready nodes. If partition has no leader, add the topic to the set + * of topics with no leader. This function also calculates stats for adaptive partitioning. * - * @param metadata The cluster metadata - * @param nowMs The current time - * @param topic The topic - * @param topicInfo The topic info + * @param cluster The cluster metadata + * @param nowMs The current time + * @param topic The topic + * @param topicInfo The topic info * @param nextReadyCheckDelayMs The delay for next check - * @param readyNodes The set of ready nodes (to be filled in) - * @param unknownLeaderTopics The set of topics with no leader (to be filled in) + * @param readyNodesThe set of ready nodes (to be filled in) + * @param unknownLeaderTopics The set of topics with no leader (to be filled in) * @return The delay for next check */ -private long partitionReady(Metadata metadata, long nowMs, String topic, +private long partitionReady(Cluster cluster, long nowMs, String topic, Review Comment: I was looking into your idea a little bit. There might be a simple enough variation that wouldn't require significant changes. What do you think about this? https://github.com/apache/kafka/compare/trunk...hachikuji:kafka:internal-cluster-view?expand=1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
hachikuji commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1481854476 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition part, Node leader, } /** - * Iterate over partitions to see which one have batches ready and collect leaders of those partitions - * into the set of ready nodes. If partition has no leader, add the topic to the set of topics with - * no leader. This function also calculates stats for adaptive partitioning. + * Iterate over partitions to see which one have batches ready and collect leaders of those + * partitions into the set of ready nodes. If partition has no leader, add the topic to the set + * of topics with no leader. This function also calculates stats for adaptive partitioning. * - * @param metadata The cluster metadata - * @param nowMs The current time - * @param topic The topic - * @param topicInfo The topic info + * @param cluster The cluster metadata + * @param nowMs The current time + * @param topic The topic + * @param topicInfo The topic info * @param nextReadyCheckDelayMs The delay for next check - * @param readyNodes The set of ready nodes (to be filled in) - * @param unknownLeaderTopics The set of topics with no leader (to be filled in) + * @param readyNodesThe set of ready nodes (to be filled in) + * @param unknownLeaderTopics The set of topics with no leader (to be filled in) * @return The delay for next check */ -private long partitionReady(Metadata metadata, long nowMs, String topic, +private long partitionReady(Cluster cluster, long nowMs, String topic, Review Comment: Makes sense. We'd probably have to do it the other way around though I guess? The client's dependence on `Cluster` cannot be easily changed, but we can move the internal implementation anywhere we want. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on PR #15323: URL: https://github.com/apache/kafka/pull/15323#issuecomment-1932366816 @hachikuji There are unrelated test failures on Jenkins run. Further looking at history of failed tests, they have been failing from before. https://ge.apache.org/s/fr7yermmdioac/tests/overview?outcome=FAILED -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1481455003 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition part, Node leader, } /** - * Iterate over partitions to see which one have batches ready and collect leaders of those partitions - * into the set of ready nodes. If partition has no leader, add the topic to the set of topics with - * no leader. This function also calculates stats for adaptive partitioning. + * Iterate over partitions to see which one have batches ready and collect leaders of those + * partitions into the set of ready nodes. If partition has no leader, add the topic to the set + * of topics with no leader. This function also calculates stats for adaptive partitioning. * - * @param metadata The cluster metadata - * @param nowMs The current time - * @param topic The topic - * @param topicInfo The topic info + * @param cluster The cluster metadata + * @param nowMs The current time + * @param topic The topic + * @param topicInfo The topic info * @param nextReadyCheckDelayMs The delay for next check - * @param readyNodes The set of ready nodes (to be filled in) - * @param unknownLeaderTopics The set of topics with no leader (to be filled in) + * @param readyNodesThe set of ready nodes (to be filled in) + * @param unknownLeaderTopics The set of topics with no leader (to be filled in) * @return The delay for next check */ -private long partitionReady(Metadata metadata, long nowMs, String topic, +private long partitionReady(Cluster cluster, long nowMs, String topic, Review Comment: @hachikuji Thanks for pointing it out. As it turns out I don't need to extend the public api of `Cluster` in order to get epoch. So internal usage doesn't change Cluster's api anymore. > We have been trying to reduce the reliance on Cluster internally because it is public. This could be achieved by created a forwarding "internal" class `ClusterView` that uses `Cluster` by composition offering the same api. Then `client` code can be refactored to use `ClusterInternal`. That way future extensions of `Cluster`'s public api for internal use-cases could be prevented by making them in `ClusterView`. But this is going to be a size-able refactor, how about keeping it separate from this PR? As the intention of this PR is to fix the perf bug, cherry-pick it to other branches. ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition part, Node leader, } /** - * Iterate over partitions to see which one have batches ready and collect leaders of those partitions - * into the set of ready nodes. If partition has no leader, add the topic to the set of topics with - * no leader. This function also calculates stats for adaptive partitioning. + * Iterate over partitions to see which one have batches ready and collect leaders of those + * partitions into the set of ready nodes. If partition has no leader, add the topic to the set + * of topics with no leader. This function also calculates stats for adaptive partitioning. * - * @param metadata The cluster metadata - * @param nowMs The current time - * @param topic The topic - * @param topicInfo The topic info + * @param cluster The cluster metadata + * @param nowMs The current time + * @param topic The topic + * @param topicInfo The topic info * @param nextReadyCheckDelayMs The delay for next check - * @param readyNodes The set of ready nodes (to be filled in) - * @param unknownLeaderTopics The set of topics with no leader (to be filled in) + * @param readyNodesThe set of ready nodes (to be filled in) + * @param unknownLeaderTopics The set of topics with no leader (to be filled in) * @return The delay for next check */ -private long partitionReady(Metadata metadata, long nowMs, String topic, +private long partitionReady(Cluster cluster, long nowMs, String topic, Review Comment: @hachikuji Thanks for pointing it out. As it turns out I don't need to extend the public api of `Cluster` in order to get epoch. So internal usage doesn't change Cluster's api anymore. > We have been trying to reduce the reliance on Cluster internally because it is public. This could be achieved by created a forwarding "internal" class `ClusterView` that uses `Cluster` by composition offering the same api. Then `
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1481455003 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition part, Node leader, } /** - * Iterate over partitions to see which one have batches ready and collect leaders of those partitions - * into the set of ready nodes. If partition has no leader, add the topic to the set of topics with - * no leader. This function also calculates stats for adaptive partitioning. + * Iterate over partitions to see which one have batches ready and collect leaders of those + * partitions into the set of ready nodes. If partition has no leader, add the topic to the set + * of topics with no leader. This function also calculates stats for adaptive partitioning. * - * @param metadata The cluster metadata - * @param nowMs The current time - * @param topic The topic - * @param topicInfo The topic info + * @param cluster The cluster metadata + * @param nowMs The current time + * @param topic The topic + * @param topicInfo The topic info * @param nextReadyCheckDelayMs The delay for next check - * @param readyNodes The set of ready nodes (to be filled in) - * @param unknownLeaderTopics The set of topics with no leader (to be filled in) + * @param readyNodesThe set of ready nodes (to be filled in) + * @param unknownLeaderTopics The set of topics with no leader (to be filled in) * @return The delay for next check */ -private long partitionReady(Metadata metadata, long nowMs, String topic, +private long partitionReady(Cluster cluster, long nowMs, String topic, Review Comment: @hachikuji Thanks for pointing it out. As it turns out I don't need to extend the public interface of `Cluster` in order to get epoch. So internal usage doesn't change Cluster's interface anymore. > We have been trying to reduce the reliance on Cluster internally because it is public. This could be achieved by created a forwarding class `ClusterInternal` that uses `Cluster` by composition offering the same interface. Then `client` code can be refactored to use `ClusterInternal`. That way future extensions of `Cluster`'s public interface for internal use-cases could be prevented by making them in `ClusterInternal`. But this is going to be a size-able refactor, how about keeping it separate from this PR? As the intention of this PR is to fix the perf bug, cherry-pick it to other branches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1481455003 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition part, Node leader, } /** - * Iterate over partitions to see which one have batches ready and collect leaders of those partitions - * into the set of ready nodes. If partition has no leader, add the topic to the set of topics with - * no leader. This function also calculates stats for adaptive partitioning. + * Iterate over partitions to see which one have batches ready and collect leaders of those + * partitions into the set of ready nodes. If partition has no leader, add the topic to the set + * of topics with no leader. This function also calculates stats for adaptive partitioning. * - * @param metadata The cluster metadata - * @param nowMs The current time - * @param topic The topic - * @param topicInfo The topic info + * @param cluster The cluster metadata + * @param nowMs The current time + * @param topic The topic + * @param topicInfo The topic info * @param nextReadyCheckDelayMs The delay for next check - * @param readyNodes The set of ready nodes (to be filled in) - * @param unknownLeaderTopics The set of topics with no leader (to be filled in) + * @param readyNodesThe set of ready nodes (to be filled in) + * @param unknownLeaderTopics The set of topics with no leader (to be filled in) * @return The delay for next check */ -private long partitionReady(Metadata metadata, long nowMs, String topic, +private long partitionReady(Cluster cluster, long nowMs, String topic, Review Comment: @hachikuji Thanks for pointing it out. As it turns out I don't need to extend the public interface of `Cluster` in order to get epoch. So internal usage doesn't change Cluster's interface anymore. > We have been trying to reduce the reliance on Cluster internally because it is public. This could be achieved by created a forwarding class `ClusterInternal` that uses `Cluster` by composition offering the same interface. Then `client` code can be refactored to use `ClusterInternal`. But this is going to be a size-able PR, how about keeping it separate from this PR? As the intention of this PR is to fix the perf bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
hachikuji commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1480620848 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java: ## @@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition part, Node leader, } /** - * Iterate over partitions to see which one have batches ready and collect leaders of those partitions - * into the set of ready nodes. If partition has no leader, add the topic to the set of topics with - * no leader. This function also calculates stats for adaptive partitioning. + * Iterate over partitions to see which one have batches ready and collect leaders of those + * partitions into the set of ready nodes. If partition has no leader, add the topic to the set + * of topics with no leader. This function also calculates stats for adaptive partitioning. * - * @param metadata The cluster metadata - * @param nowMs The current time - * @param topic The topic - * @param topicInfo The topic info + * @param cluster The cluster metadata + * @param nowMs The current time + * @param topic The topic + * @param topicInfo The topic info * @param nextReadyCheckDelayMs The delay for next check - * @param readyNodes The set of ready nodes (to be filled in) - * @param unknownLeaderTopics The set of topics with no leader (to be filled in) + * @param readyNodesThe set of ready nodes (to be filled in) + * @param unknownLeaderTopics The set of topics with no leader (to be filled in) * @return The delay for next check */ -private long partitionReady(Metadata metadata, long nowMs, String topic, +private long partitionReady(Cluster cluster, long nowMs, String topic, Review Comment: In some ways, this is a step backwards. We have been trying to reduce the reliance on `Cluster` internally because it is public. With a lot of internal usage, we end up making changes to the API which are only needed for the internal implementation (as we are doing in this PR). Have you considered alternatives? Perhaps we could expose something like `Cluster`, but with a reduced scope? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org