Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-16 Thread via GitHub


msn-tldr commented on PR #15323:
URL: https://github.com/apache/kafka/pull/15323#issuecomment-1949077518

   @ijuma thanks for flagging https://github.com/apache/kafka/pull/15376. 
   
   @hachikuji  Looks like this was going to add a test that tested the 
concurrent update of `Metadata`, and fetching `MetadataSnapshot`/`Cluster`. 
This is useful, so i have created a follow-up PR 
https://github.com/apache/kafka/pull/15385


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-16 Thread via GitHub


msn-tldr commented on PR #15323:
URL: https://github.com/apache/kafka/pull/15323#issuecomment-1948142511

   @hachikuji thanks for merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-15 Thread via GitHub


hachikuji merged PR #15323:
URL: https://github.com/apache/kafka/pull/15323


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-14 Thread via GitHub


ijuma commented on PR #15323:
URL: https://github.com/apache/kafka/pull/15323#issuecomment-1945066461

   @hachikuji @msn-tldr #15376 looks related.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-14 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1489860692


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -279,7 +286,7 @@ synchronized Optional 
partitionMetadataIfCur
  * @return a mapping from topic names to topic IDs for all topics with 
valid IDs in the cache
  */
 public synchronized Map topicIds() {

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-14 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1489832782


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -279,7 +286,7 @@ synchronized Optional 
partitionMetadataIfCur
  * @return a mapping from topic names to topic IDs for all topics with 
valid IDs in the cache
  */
 public synchronized Map topicIds() {

Review Comment:
   That's a good call out, i will check other methods reading from this 
snapshot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-14 Thread via GitHub


ijuma commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1489615650


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -279,7 +286,7 @@ synchronized Optional 
partitionMetadataIfCur
  * @return a mapping from topic names to topic IDs for all topics with 
valid IDs in the cache
  */
 public synchronized Map topicIds() {

Review Comment:
   Why is this synchronized? We should avoid synchronized for methods that are 
simply reading a snapshot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-14 Thread via GitHub


ijuma commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1489615650


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -279,7 +286,7 @@ synchronized Optional 
partitionMetadataIfCur
  * @return a mapping from topic names to topic IDs for all topics with 
valid IDs in the cache
  */
 public synchronized Map topicIds() {

Review Comment:
   Why is this synchronized? We should avoid synchronized for methods that are 
simply reading a snapshot. Also, are there other methods like this one that 
don't need to be synchronized?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-14 Thread via GitHub


msn-tldr commented on PR #15323:
URL: https://github.com/apache/kafka/pull/15323#issuecomment-1943675980

   Went through the test failures across all jdk/scala combos, they are 
unrelated, and have been failing before this PR as well
   https://ge.apache.org/s/wftzjb3q6slyc/tests/overview?outcome=FAILED
   https://ge.apache.org/s/lyqs6eqs4mtny/tests/overview?outcome=FAILED
   https://ge.apache.org/s/x6x27oapk6qsa/tests/overview?outcome=FAILED
   https://ge.apache.org/s/sleegbh5pfyfo/tests/overview?outcome=FAILED 
   
   
   The test failure belong to test `kafka.server.LogDirFailureTest`, they are 
being fixed here https://issues.apache.org/jira/browse/KAFKA-16225
   
   @hachikuji I believe this good to be merged, what do you think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-13 Thread via GitHub


hachikuji commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1488286776


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -124,14 +124,14 @@ public Metadata(long refreshBackoffMs,
  * Get the current cluster info without blocking
  */
 public synchronized Cluster fetch() {
-return cache.cluster();
+return metadataSnapshot.cluster();

Review Comment:
   We could drop synchronization here as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-13 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1487739946


##
clients/src/test/java/org/apache/kafka/test/TestUtils.java:
##
@@ -123,6 +126,34 @@ public static Cluster clusterWith(final int nodes, final 
String topic, final int
 return clusterWith(nodes, Collections.singletonMap(topic, partitions));
 }
 
+public static MetadataCache metadataCacheWith(final int nodes, final 
Map topicPartitionCounts) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-13 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1487728160


##
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java:
##
@@ -39,8 +39,9 @@
 import java.util.stream.Collectors;
 
 /**
- * An internal mutable cache of nodes, topics, and partitions in the Kafka 
cluster. This keeps an up-to-date Cluster
+ * An internal immutable cache of nodes, topics, and partitions in the Kafka 
cluster. This keeps an up-to-date Cluster
  * instance which is optimized for read access.
+ * Prefer to extend MetadataCache's API for internal client usage Vs the 
public {@link Cluster}
  */
 public class MetadataCache {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-12 Thread via GitHub


hachikuji commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1486800347


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -127,6 +127,13 @@ public synchronized Cluster fetch() {
 return cache.cluster();
 }
 
+/**
+ * Get the current metadata cache.
+ */
+public synchronized MetadataCache fetchCache() {

Review Comment:
   It makes sense to require exclusive access when building the cache, but here 
we're just accessing the built value. So I don't think the synchronization is 
necessary. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-12 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1486639832


##
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java:
##
@@ -113,14 +116,28 @@ Optional nodeById(int id) {
 return Optional.ofNullable(nodes.get(id));
 }
 
-Cluster cluster() {
+public Cluster cluster() {
 if (clusterInstance == null) {
 throw new IllegalStateException("Cached Cluster instance should 
not be null, but was.");
 } else {
 return clusterInstance;
 }
 }
 
+/**
+ * Get leader-epoch for partition.
+ * @param tp partition
+ * @return leader-epoch if known, else return optional.empty()
+ */
+public Optional leaderEpochFor(TopicPartition tp) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-12 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1486583557


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -879,17 +884,12 @@ private List 
drainBatchesForOneNode(Metadata metadata, Node node,
 // Only proceed if the partition has no in-flight batches.
 if (isMuted(tp))
 continue;
-Metadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
-// Although a small chance, but skip this partition if leader has 
changed since the partition -> node assignment obtained from outside the loop.

Review Comment:
   Correct, since snapshot is immutable & race condition is not possible, hence 
removed the code & comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-12 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1486590491


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##
@@ -110,7 +110,8 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
  * @param latestLeaderEpoch latest leader's epoch.
  */
 void maybeUpdateLeaderEpoch(Optional latestLeaderEpoch) {
-if (!currentLeaderEpoch.equals(latestLeaderEpoch)) {
+if (latestLeaderEpoch.isPresent()

Review Comment:
   The change is intentional. The optimisation that KIP-951 proposed is that 
producer-batch should skip backoff period if "new leader" is known. This change 
corrects the previous logic to update the batch's leader-epoch only if its 
newer. I added a test in `ProducerBatchTest.java` to reflect that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-12 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1486583557


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -879,17 +884,12 @@ private List 
drainBatchesForOneNode(Metadata metadata, Node node,
 // Only proceed if the partition has no in-flight batches.
 if (isMuted(tp))
 continue;
-Metadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
-// Although a small chance, but skip this partition if leader has 
changed since the partition -> node assignment obtained from outside the loop.

Review Comment:
   Correct, since its immutable, this is not possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-12 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1486572035


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -127,6 +127,13 @@ public synchronized Cluster fetch() {
 return cache.cluster();
 }
 
+/**
+ * Get the current metadata cache.
+ */
+public synchronized MetadataCache fetchCache() {

Review Comment:
   @hachikuji Interesting. 
   `Metadata.update()` requires mutual exclusion while updating `cache`, other 
internal data structures of `Metadata`. So it makes sense to keep the 
synchronizatiion, what do you think?
   Moreover, `fetchCache` is called once in `Sender::sendProducerData`, so it's 
not a bottle neck in the hotpath.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-09 Thread via GitHub


hachikuji commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1484643624


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -127,6 +127,13 @@ public synchronized Cluster fetch() {
 return cache.cluster();
 }
 
+/**
+ * Get the current metadata cache.
+ */
+public synchronized MetadataCache fetchCache() {

Review Comment:
   Perhaps we could make `cache` volatile and avoid the synchronization?



##
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java:
##
@@ -39,8 +39,9 @@
 import java.util.stream.Collectors;
 
 /**
- * An internal mutable cache of nodes, topics, and partitions in the Kafka 
cluster. This keeps an up-to-date Cluster
+ * An internal immutable cache of nodes, topics, and partitions in the Kafka 
cluster. This keeps an up-to-date Cluster
  * instance which is optimized for read access.
+ * Prefer to extend MetadataCache's API for internal client usage Vs the 
public {@link Cluster}
  */
 public class MetadataCache {

Review Comment:
   We don't have to do it here, but "snapshot" might be a better name since it 
suggests immutability.



##
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java:
##
@@ -113,14 +116,28 @@ Optional nodeById(int id) {
 return Optional.ofNullable(nodes.get(id));
 }
 
-Cluster cluster() {
+public Cluster cluster() {
 if (clusterInstance == null) {
 throw new IllegalStateException("Cached Cluster instance should 
not be null, but was.");
 } else {
 return clusterInstance;
 }
 }
 
+/**
+ * Get leader-epoch for partition.
+ * @param tp partition
+ * @return leader-epoch if known, else return optional.empty()
+ */
+public Optional leaderEpochFor(TopicPartition tp) {

Review Comment:
   Would it make sense to use `OptionalInt`?



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##
@@ -110,7 +110,8 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
  * @param latestLeaderEpoch latest leader's epoch.
  */
 void maybeUpdateLeaderEpoch(Optional latestLeaderEpoch) {
-if (!currentLeaderEpoch.equals(latestLeaderEpoch)) {
+if (latestLeaderEpoch.isPresent()

Review Comment:
   Hmm, this looks like a change in behavior. Previously we would override a 
known current leader epoch if `latestLeaderEpoch` is not defined. I am not sure 
if that was intentional. I recall there we had some logic which assumed that 
metadata version might be downgraded and no longer provide a leader epoch. Not 
sure it matters here though since we are not changing the leader, just updating 
the epoch.



##
clients/src/test/java/org/apache/kafka/test/TestUtils.java:
##
@@ -123,6 +126,34 @@ public static Cluster clusterWith(final int nodes, final 
String topic, final int
 return clusterWith(nodes, Collections.singletonMap(topic, partitions));
 }
 
+public static MetadataCache metadataCacheWith(final int nodes, final 
Map topicPartitionCounts) {

Review Comment:
   Useful to have a brief javadoc to explain replica counts and assignment 
strategy.



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -879,17 +884,12 @@ private List 
drainBatchesForOneNode(Metadata metadata, Node node,
 // Only proceed if the partition has no in-flight batches.
 if (isMuted(tp))
 continue;
-Metadata.LeaderAndEpoch leaderAndEpoch = 
metadata.currentLeader(tp);
-// Although a small chance, but skip this partition if leader has 
changed since the partition -> node assignment obtained from outside the loop.

Review Comment:
   I guess this race is no longer possible since we are using the snapshot, 
which is immutable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-09 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1484638827


##
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java:
##
@@ -52,7 +52,7 @@ public class MetadataCache {
 private final Map metadataByPartition;
 private final Map topicIds;
 private final Map topicNames;
-private Cluster clusterInstance;
+private InternalCluster clusterInstance;

Review Comment:
   > We should confirm though.
   
   I don't see it being used mutably in code. I see historically, it was made 
mutable to support deletion/updates within cache, but the deletion/update code 
has since been removed. As far i can see, read-only semantic. So i have treated 
`MetadataCache` as immutable cache, made its internal data structures 
unmodifiable and updated the javadoc.
   
   All clients test pass locally, hopefully Jenkins signal is green too 🤞 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-08 Thread via GitHub


hachikuji commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1483493757


##
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java:
##
@@ -52,7 +52,7 @@ public class MetadataCache {
 private final Map metadataByPartition;
 private final Map topicIds;
 private final Map topicNames;
-private Cluster clusterInstance;
+private InternalCluster clusterInstance;

Review Comment:
   Discussed offline. It does not look like `PartitionMetadata` should be 
treated as mutable. It comes directly from the Metadata response and I can't 
think of a reason the client would update any of the replica sets directly. We 
should confirm though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-08 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1483459996


##
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java:
##
@@ -52,7 +52,7 @@ public class MetadataCache {
 private final Map metadataByPartition;
 private final Map topicIds;
 private final Map topicNames;
-private Cluster clusterInstance;
+private InternalCluster clusterInstance;

Review Comment:
   @hachikuji I had thought about `MetadataCache`. It has 1 accessor i.e. 
`partitionMetadata()` that is returning mutable `PartitionMetadata`, and is not 
making defensive copies. Rest all accessors are returning immutable objects, or 
making defensive copies. 
   Is it ok for `partitionMetadata()` to make defensive copies? That could lead 
to memory going up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-08 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1483459996


##
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java:
##
@@ -52,7 +52,7 @@ public class MetadataCache {
 private final Map metadataByPartition;
 private final Map topicIds;
 private final Map topicNames;
-private Cluster clusterInstance;
+private InternalCluster clusterInstance;

Review Comment:
   @hachikuji I had thought about `MetadataCache`. It has 1 accessor i.e. 
`partitionMetadata()` that is returning mutable `PartitionMetadata`, and is not 
making defensive copies. Rest all accessors are returning immutable objects, or 
making defensive copies. 
   Is it ok for `partitionMetadata()` to make defensive copies? That would lead 
to memory going up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-08 Thread via GitHub


hachikuji commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1483386345


##
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java:
##
@@ -52,7 +52,7 @@ public class MetadataCache {
 private final Map metadataByPartition;
 private final Map topicIds;
 private final Map topicNames;
-private Cluster clusterInstance;
+private InternalCluster clusterInstance;

Review Comment:
   The javadoc for `MetadataCache` describes it as mutable, but as far as I can 
tell, we do not actually modify any of the collections. We always build new 
instances instead of updating an existing one. That makes me wonder if we can 
change the javadoc and use `MetadataCache` as the immutable snapshot of 
metadata state. Then we could drop `InternalCluster` in favor of 
`MetadataCache`. Would that work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-08 Thread via GitHub


msn-tldr commented on PR #15323:
URL: https://github.com/apache/kafka/pull/15323#issuecomment-1934424289

   The associated Jenkins failure is due to compilation errors with Scala 2.12 
introduced here 
   https://github.com/apache/kafka/pull/15327#issuecomment-1933811374


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-08 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1483170888


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition 
part, Node leader,
 }
 
 /**
- * Iterate over partitions to see which one have batches ready and collect 
leaders of those partitions
- * into the set of ready nodes.  If partition has no leader, add the topic 
to the set of topics with
- * no leader.  This function also calculates stats for adaptive 
partitioning.
+ * Iterate over partitions to see which one have batches ready and collect 
leaders of those
+ * partitions into the set of ready nodes.  If partition has no leader, 
add the topic to the set
+ * of topics with no leader.  This function also calculates stats for 
adaptive partitioning.
  *
- * @param metadata The cluster metadata
- * @param nowMs The current time
- * @param topic The topic
- * @param topicInfo The topic info
+ * @param cluster   The cluster metadata
+ * @param nowMs The current time
+ * @param topic The topic
+ * @param topicInfo The topic info
  * @param nextReadyCheckDelayMs The delay for next check
- * @param readyNodes The set of ready nodes (to be filled in)
- * @param unknownLeaderTopics The set of topics with no leader (to be 
filled in)
+ * @param readyNodesThe set of ready nodes (to be filled in)
+ * @param unknownLeaderTopics   The set of topics with no leader (to be 
filled in)
  * @return The delay for next check
  */
-private long partitionReady(Metadata metadata, long nowMs, String topic,
+private long partitionReady(Cluster cluster, long nowMs, String topic,

Review Comment:
   @hachikuji Thanks for the draft PR. I have introduced `InternalCluster` as a 
wrapper around public `Cluster`. I have extended `InternalCluster` to 
`leaderEpochFor` that is only for client's internal-usage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-08 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1483161801


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -1564,97 +1597,18 @@ public void 
testDrainWithANodeThatDoesntHostAnyPartitions() {
 CompressionType.NONE, lingerMs);
 
 // Create cluster metadata, node2 doesn't host any partitions.
-part1 = new PartitionInfo(topic, partition1, node1, null, null, null);
-cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1),
+PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, 
tp1, Optional.of(node1.id()), Optional.empty(), null, null, null);
+PartitionInfo part1 = MetadataResponse.toPartitionInfo(part1Metadata, 
nodesById);
+Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1),
 Collections.emptySet(), Collections.emptySet());
-metadataMock = Mockito.mock(Metadata.class);
-Mockito.when(metadataMock.fetch()).thenReturn(cluster);
-Mockito.when(metadataMock.currentLeader(tp1)).thenReturn(
-new Metadata.LeaderAndEpoch(Optional.of(node1),
-Optional.of(999 /* dummy value */)));
+InternalCluster internalCluster = new InternalCluster(cluster, 
Collections.singletonMap(tp1, part1Metadata));
 
 // Drain for node2, it should return 0 batches,
-Map> batches = accum.drain(metadataMock,
+Map> batches = 
accum.drain(internalCluster,
 new HashSet<>(Arrays.asList(node2)), 99 /* maxSize */, 
time.milliseconds());
 assertTrue(batches.get(node2.id()).isEmpty());
 }
 
-@Test
-public void testDrainOnANodeWhenItCeasesToBeALeader() throws 
InterruptedException {

Review Comment:
   This is no longer needed as `drainBatchesForOneNode` uses `InternalCluster` 
now Vs `Metadata` earlier. With `Metadata` is mutable, it can happen a node is 
a partition leader but then leadership moves another node. This is not possible 
as `InternalCluster` is immutable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-07 Thread via GitHub


hachikuji commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1481936087


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition 
part, Node leader,
 }
 
 /**
- * Iterate over partitions to see which one have batches ready and collect 
leaders of those partitions
- * into the set of ready nodes.  If partition has no leader, add the topic 
to the set of topics with
- * no leader.  This function also calculates stats for adaptive 
partitioning.
+ * Iterate over partitions to see which one have batches ready and collect 
leaders of those
+ * partitions into the set of ready nodes.  If partition has no leader, 
add the topic to the set
+ * of topics with no leader.  This function also calculates stats for 
adaptive partitioning.
  *
- * @param metadata The cluster metadata
- * @param nowMs The current time
- * @param topic The topic
- * @param topicInfo The topic info
+ * @param cluster   The cluster metadata
+ * @param nowMs The current time
+ * @param topic The topic
+ * @param topicInfo The topic info
  * @param nextReadyCheckDelayMs The delay for next check
- * @param readyNodes The set of ready nodes (to be filled in)
- * @param unknownLeaderTopics The set of topics with no leader (to be 
filled in)
+ * @param readyNodesThe set of ready nodes (to be filled in)
+ * @param unknownLeaderTopics   The set of topics with no leader (to be 
filled in)
  * @return The delay for next check
  */
-private long partitionReady(Metadata metadata, long nowMs, String topic,
+private long partitionReady(Cluster cluster, long nowMs, String topic,

Review Comment:
   I was looking into your idea a little bit. There might be a simple enough 
variation that wouldn't require significant changes. What do you think about 
this? 
https://github.com/apache/kafka/compare/trunk...hachikuji:kafka:internal-cluster-view?expand=1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-07 Thread via GitHub


hachikuji commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1481854476


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition 
part, Node leader,
 }
 
 /**
- * Iterate over partitions to see which one have batches ready and collect 
leaders of those partitions
- * into the set of ready nodes.  If partition has no leader, add the topic 
to the set of topics with
- * no leader.  This function also calculates stats for adaptive 
partitioning.
+ * Iterate over partitions to see which one have batches ready and collect 
leaders of those
+ * partitions into the set of ready nodes.  If partition has no leader, 
add the topic to the set
+ * of topics with no leader.  This function also calculates stats for 
adaptive partitioning.
  *
- * @param metadata The cluster metadata
- * @param nowMs The current time
- * @param topic The topic
- * @param topicInfo The topic info
+ * @param cluster   The cluster metadata
+ * @param nowMs The current time
+ * @param topic The topic
+ * @param topicInfo The topic info
  * @param nextReadyCheckDelayMs The delay for next check
- * @param readyNodes The set of ready nodes (to be filled in)
- * @param unknownLeaderTopics The set of topics with no leader (to be 
filled in)
+ * @param readyNodesThe set of ready nodes (to be filled in)
+ * @param unknownLeaderTopics   The set of topics with no leader (to be 
filled in)
  * @return The delay for next check
  */
-private long partitionReady(Metadata metadata, long nowMs, String topic,
+private long partitionReady(Cluster cluster, long nowMs, String topic,

Review Comment:
   Makes sense. We'd probably have to do it the other way around though I 
guess? The client's dependence on `Cluster` cannot be easily changed, but we 
can move the internal implementation anywhere we want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-07 Thread via GitHub


msn-tldr commented on PR #15323:
URL: https://github.com/apache/kafka/pull/15323#issuecomment-1932366816

   @hachikuji 
   There are unrelated test failures on Jenkins run. Further looking at history 
of failed tests, they have been failing from before.
   
   https://ge.apache.org/s/fr7yermmdioac/tests/overview?outcome=FAILED


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-07 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1481455003


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition 
part, Node leader,
 }
 
 /**
- * Iterate over partitions to see which one have batches ready and collect 
leaders of those partitions
- * into the set of ready nodes.  If partition has no leader, add the topic 
to the set of topics with
- * no leader.  This function also calculates stats for adaptive 
partitioning.
+ * Iterate over partitions to see which one have batches ready and collect 
leaders of those
+ * partitions into the set of ready nodes.  If partition has no leader, 
add the topic to the set
+ * of topics with no leader.  This function also calculates stats for 
adaptive partitioning.
  *
- * @param metadata The cluster metadata
- * @param nowMs The current time
- * @param topic The topic
- * @param topicInfo The topic info
+ * @param cluster   The cluster metadata
+ * @param nowMs The current time
+ * @param topic The topic
+ * @param topicInfo The topic info
  * @param nextReadyCheckDelayMs The delay for next check
- * @param readyNodes The set of ready nodes (to be filled in)
- * @param unknownLeaderTopics The set of topics with no leader (to be 
filled in)
+ * @param readyNodesThe set of ready nodes (to be filled in)
+ * @param unknownLeaderTopics   The set of topics with no leader (to be 
filled in)
  * @return The delay for next check
  */
-private long partitionReady(Metadata metadata, long nowMs, String topic,
+private long partitionReady(Cluster cluster, long nowMs, String topic,

Review Comment:
   @hachikuji 
   
   Thanks for pointing it out. As it turns out I don't need to extend the 
public api of `Cluster` in order to get epoch. So internal usage doesn't change 
Cluster's api anymore. 
   
   > We have been trying to reduce the reliance on Cluster internally because 
it is public.
   
   This could be achieved by created a forwarding "internal" class 
`ClusterView` that uses `Cluster` by composition offering the same api. Then 
`client` code can be refactored to use `ClusterInternal`. That way future 
extensions of `Cluster`'s public api for internal use-cases could be prevented 
by making them in `ClusterView`.
   
   But this is going to be a size-able refactor, how about keeping it separate 
from this PR? As the intention of this PR is to fix the perf bug, cherry-pick 
it to other branches.



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition 
part, Node leader,
 }
 
 /**
- * Iterate over partitions to see which one have batches ready and collect 
leaders of those partitions
- * into the set of ready nodes.  If partition has no leader, add the topic 
to the set of topics with
- * no leader.  This function also calculates stats for adaptive 
partitioning.
+ * Iterate over partitions to see which one have batches ready and collect 
leaders of those
+ * partitions into the set of ready nodes.  If partition has no leader, 
add the topic to the set
+ * of topics with no leader.  This function also calculates stats for 
adaptive partitioning.
  *
- * @param metadata The cluster metadata
- * @param nowMs The current time
- * @param topic The topic
- * @param topicInfo The topic info
+ * @param cluster   The cluster metadata
+ * @param nowMs The current time
+ * @param topic The topic
+ * @param topicInfo The topic info
  * @param nextReadyCheckDelayMs The delay for next check
- * @param readyNodes The set of ready nodes (to be filled in)
- * @param unknownLeaderTopics The set of topics with no leader (to be 
filled in)
+ * @param readyNodesThe set of ready nodes (to be filled in)
+ * @param unknownLeaderTopics   The set of topics with no leader (to be 
filled in)
  * @return The delay for next check
  */
-private long partitionReady(Metadata metadata, long nowMs, String topic,
+private long partitionReady(Cluster cluster, long nowMs, String topic,

Review Comment:
   @hachikuji 
   
   Thanks for pointing it out. As it turns out I don't need to extend the 
public api of `Cluster` in order to get epoch. So internal usage doesn't change 
Cluster's api anymore. 
   
   > We have been trying to reduce the reliance on Cluster internally because 
it is public.
   
   This could be achieved by created a forwarding "internal" class 
`ClusterView` that uses `Cluster` by composition offering the same api. Then 
`

Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-07 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1481455003


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition 
part, Node leader,
 }
 
 /**
- * Iterate over partitions to see which one have batches ready and collect 
leaders of those partitions
- * into the set of ready nodes.  If partition has no leader, add the topic 
to the set of topics with
- * no leader.  This function also calculates stats for adaptive 
partitioning.
+ * Iterate over partitions to see which one have batches ready and collect 
leaders of those
+ * partitions into the set of ready nodes.  If partition has no leader, 
add the topic to the set
+ * of topics with no leader.  This function also calculates stats for 
adaptive partitioning.
  *
- * @param metadata The cluster metadata
- * @param nowMs The current time
- * @param topic The topic
- * @param topicInfo The topic info
+ * @param cluster   The cluster metadata
+ * @param nowMs The current time
+ * @param topic The topic
+ * @param topicInfo The topic info
  * @param nextReadyCheckDelayMs The delay for next check
- * @param readyNodes The set of ready nodes (to be filled in)
- * @param unknownLeaderTopics The set of topics with no leader (to be 
filled in)
+ * @param readyNodesThe set of ready nodes (to be filled in)
+ * @param unknownLeaderTopics   The set of topics with no leader (to be 
filled in)
  * @return The delay for next check
  */
-private long partitionReady(Metadata metadata, long nowMs, String topic,
+private long partitionReady(Cluster cluster, long nowMs, String topic,

Review Comment:
   @hachikuji 
   
   Thanks for pointing it out. As it turns out I don't need to extend the 
public interface of `Cluster` in order to get epoch. So internal usage doesn't 
change Cluster's interface anymore. 
   
   > We have been trying to reduce the reliance on Cluster internally because 
it is public.
   
   This could be achieved by created a forwarding class `ClusterInternal` that 
uses `Cluster` by composition offering the same interface. Then `client` code 
can be refactored to use `ClusterInternal`. That way future extensions of 
`Cluster`'s public interface for internal use-cases could be prevented by 
making them in `ClusterInternal`.
   
   But this is going to be a size-able refactor, how about keeping it separate 
from this PR? As the intention of this PR is to fix the perf bug, cherry-pick 
it to other branches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-07 Thread via GitHub


msn-tldr commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1481455003


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition 
part, Node leader,
 }
 
 /**
- * Iterate over partitions to see which one have batches ready and collect 
leaders of those partitions
- * into the set of ready nodes.  If partition has no leader, add the topic 
to the set of topics with
- * no leader.  This function also calculates stats for adaptive 
partitioning.
+ * Iterate over partitions to see which one have batches ready and collect 
leaders of those
+ * partitions into the set of ready nodes.  If partition has no leader, 
add the topic to the set
+ * of topics with no leader.  This function also calculates stats for 
adaptive partitioning.
  *
- * @param metadata The cluster metadata
- * @param nowMs The current time
- * @param topic The topic
- * @param topicInfo The topic info
+ * @param cluster   The cluster metadata
+ * @param nowMs The current time
+ * @param topic The topic
+ * @param topicInfo The topic info
  * @param nextReadyCheckDelayMs The delay for next check
- * @param readyNodes The set of ready nodes (to be filled in)
- * @param unknownLeaderTopics The set of topics with no leader (to be 
filled in)
+ * @param readyNodesThe set of ready nodes (to be filled in)
+ * @param unknownLeaderTopics   The set of topics with no leader (to be 
filled in)
  * @return The delay for next check
  */
-private long partitionReady(Metadata metadata, long nowMs, String topic,
+private long partitionReady(Cluster cluster, long nowMs, String topic,

Review Comment:
   @hachikuji 
   
   Thanks for pointing it out. As it turns out I don't need to extend the 
public interface of `Cluster` in order to get epoch. So internal usage doesn't 
change Cluster's interface anymore.
   
   > We have been trying to reduce the reliance on Cluster internally because 
it is public.
   
   This could be achieved by created a forwarding class `ClusterInternal` that 
uses `Cluster` by composition offering the same interface. Then `client` code 
can be refactored to use `ClusterInternal`. 
   
   But this is going to be a size-able PR, how about keeping it separate from 
this PR? As the intention of this PR is to fix the perf bug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]

2024-02-06 Thread via GitHub


hachikuji commented on code in PR #15323:
URL: https://github.com/apache/kafka/pull/15323#discussion_r1480620848


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -647,27 +647,27 @@ private long batchReady(boolean exhausted, TopicPartition 
part, Node leader,
 }
 
 /**
- * Iterate over partitions to see which one have batches ready and collect 
leaders of those partitions
- * into the set of ready nodes.  If partition has no leader, add the topic 
to the set of topics with
- * no leader.  This function also calculates stats for adaptive 
partitioning.
+ * Iterate over partitions to see which one have batches ready and collect 
leaders of those
+ * partitions into the set of ready nodes.  If partition has no leader, 
add the topic to the set
+ * of topics with no leader.  This function also calculates stats for 
adaptive partitioning.
  *
- * @param metadata The cluster metadata
- * @param nowMs The current time
- * @param topic The topic
- * @param topicInfo The topic info
+ * @param cluster   The cluster metadata
+ * @param nowMs The current time
+ * @param topic The topic
+ * @param topicInfo The topic info
  * @param nextReadyCheckDelayMs The delay for next check
- * @param readyNodes The set of ready nodes (to be filled in)
- * @param unknownLeaderTopics The set of topics with no leader (to be 
filled in)
+ * @param readyNodesThe set of ready nodes (to be filled in)
+ * @param unknownLeaderTopics   The set of topics with no leader (to be 
filled in)
  * @return The delay for next check
  */
-private long partitionReady(Metadata metadata, long nowMs, String topic,
+private long partitionReady(Cluster cluster, long nowMs, String topic,

Review Comment:
   In some ways, this is a step backwards. We have been trying to reduce the 
reliance on `Cluster` internally because it is public. With a lot of internal 
usage, we end up making changes to the API which are only needed for the 
internal implementation (as we are doing in this PR). Have you considered 
alternatives? Perhaps we could expose something like `Cluster`, but with a 
reduced scope?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org