dajac commented on code in PR #14099: URL: https://github.com/apache/kafka/pull/14099#discussion_r1274915017
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ########## @@ -756,4 +759,24 @@ public void testNewOffsetCommitTombstoneRecord() { Record record = RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1); assertEquals(expectedRecord, record); } + Review Comment: Should we add a test which verify that partitions with no racks are not put into the record? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ########## @@ -131,13 +131,24 @@ public static Record newGroupSubscriptionMetadataRecord( Map<String, TopicMetadata> newSubscriptionMetadata ) { ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue(); - newSubscriptionMetadata.forEach((topicName, topicMetadata) -> + newSubscriptionMetadata.forEach((topicName, topicMetadata) -> { + List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>(); + if (!topicMetadata.partitionRacks().isEmpty()) { + topicMetadata.partitionRacks().forEach((partition, racks) -> { + partitionMetadata.add(new ConsumerGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(partition) + .setRacks(new ArrayList<>(racks)) + ); + }); + } + // If partition rack information is empty, store an empty list in the record. Review Comment: nit: Would it make sense to put this comment before the `if` statement? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ########## @@ -612,15 +616,14 @@ public void testNewGroupMetadataRecordThrowsWhenEmptyAssignment() { MetadataVersion.IBP_3_5_IV2 )); } - Review Comment: nit: Let's revert this. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ########## @@ -131,13 +131,24 @@ public static Record newGroupSubscriptionMetadataRecord( Map<String, TopicMetadata> newSubscriptionMetadata ) { ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue(); - newSubscriptionMetadata.forEach((topicName, topicMetadata) -> + newSubscriptionMetadata.forEach((topicName, topicMetadata) -> { + List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> partitionMetadata = new ArrayList<>(); + if (!topicMetadata.partitionRacks().isEmpty()) { + topicMetadata.partitionRacks().forEach((partition, racks) -> { + partitionMetadata.add(new ConsumerGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(partition) + .setRacks(new ArrayList<>(racks)) + ); + }); + } + // If partition rack information is empty, store an empty list in the record. value.topics().add(new ConsumerGroupPartitionMetadataValue.TopicMetadata() .setTopicId(topicMetadata.id()) .setTopicName(topicMetadata.name()) .setNumPartitions(topicMetadata.numPartitions()) - ) - ); + .setPartitionMetadata(partitionMetadata.isEmpty() ? Collections.emptyList() : partitionMetadata) Review Comment: nit: Why can't we use `partitionMetadata` if it is empty? That seems just fine. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java: ########## @@ -26,18 +26,17 @@ */ @InterfaceStability.Unstable public interface PartitionAssignor { - /** * Unique name for this assignor. */ String name(); /** - * Perform the group assignment given the current members and - * topic metadata. + * Assigns partitions to group members based on the given assignment specification and topic metadata. * - * @param assignmentSpec The assignment spec. + * @param assignmentSpec The assignment spec which included member metadata. Review Comment: nit: `includes`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** + * The subscribed topic metadata class is used by the {@link PartitionAssignor} to obtain + * topic and partition metadata for the topics that the consumer group is subscribed to. + */ +public class SubscribedTopicMetadata implements SubscribedTopicDescriber { + /** + * The topic IDs mapped to their corresponding {@link TopicMetadata} + * object, which contains topic and partition metadata. + */ + Map<Uuid, TopicMetadata> topicMetadata; + + public SubscribedTopicMetadata(Map<Uuid, TopicMetadata> topicMetadata) { + this.topicMetadata = topicMetadata; + } + + /** + * The number of partitions for the given topic ID. + * + * @param topicId Uuid corresponding to the topic. + * @return The number of partitions corresponding to the given topic ID, + * or -1 if the topic ID does not exist. + */ + @Override + public int numPartitions(Uuid topicId) { + return this.topicMetadata.containsKey(topicId) ? this.topicMetadata.get(topicId).numPartitions() : -1; Review Comment: nit: It is usually better to write this as follow: ``` TopicMetadata topic = this.topicMetadata.get(topicId); return topic == null ? -1 : topic.numPartitions(); ``` This ways you do only one lookup. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SubscribedTopicDescriber.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Set; + +/** + * The subscribed topic describer is used by the {@link PartitionAssignor} + * to obtain topic and partition metadata of the subscribed topics. + * + * The interface is kept in an internal module until KIP-848 is fully + * implemented and ready to be released. + */ +@InterfaceStability.Unstable +public interface SubscribedTopicDescriber { + /** + * The number of partitions for the given topic ID. Review Comment: nit: In AssignmentSpec, you used `member Id`. Should we use `Id` in this file as well? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java: ########## @@ -40,23 +45,31 @@ public class TopicMetadata { */ private final int numPartitions; + /** + * Map of every partition to a set of its rackIds. Review Comment: nit: `rack Ids`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java: ########## @@ -40,23 +45,31 @@ public class TopicMetadata { */ private final int numPartitions; + /** + * Map of every partition to a set of its rackIds. + * If the rack information is unavailable, this is an empty map. Review Comment: nit: Should we say that the map contains mapping for partitions only of the partitions has racks or something like this? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** + * The subscribed topic metadata class is used by the {@link PartitionAssignor} to obtain + * topic and partition metadata for the topics that the consumer group is subscribed to. + */ +public class SubscribedTopicMetadata implements SubscribedTopicDescriber { + /** + * The topic IDs mapped to their corresponding {@link TopicMetadata} + * object, which contains topic and partition metadata. + */ + Map<Uuid, TopicMetadata> topicMetadata; + + public SubscribedTopicMetadata(Map<Uuid, TopicMetadata> topicMetadata) { + this.topicMetadata = topicMetadata; + } + + /** + * The number of partitions for the given topic ID. + * + * @param topicId Uuid corresponding to the topic. + * @return The number of partitions corresponding to the given topic ID, + * or -1 if the topic ID does not exist. + */ + @Override + public int numPartitions(Uuid topicId) { + return this.topicMetadata.containsKey(topicId) ? this.topicMetadata.get(topicId).numPartitions() : -1; + } + + /** + * Returns all the available racks associated with the replicas of the given partition. + * + * @param topicId Uuid corresponding to the partition's topic. + * @param partition Partition number within topic. Review Comment: nit: `Partition Id` or `Partition Index`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** + * The subscribed topic metadata class is used by the {@link PartitionAssignor} to obtain + * topic and partition metadata for the topics that the consumer group is subscribed to. + */ +public class SubscribedTopicMetadata implements SubscribedTopicDescriber { + /** + * The topic IDs mapped to their corresponding {@link TopicMetadata} + * object, which contains topic and partition metadata. + */ + Map<Uuid, TopicMetadata> topicMetadata; + + public SubscribedTopicMetadata(Map<Uuid, TopicMetadata> topicMetadata) { + this.topicMetadata = topicMetadata; + } + + /** + * The number of partitions for the given topic ID. + * + * @param topicId Uuid corresponding to the topic. + * @return The number of partitions corresponding to the given topic ID, + * or -1 if the topic ID does not exist. + */ + @Override + public int numPartitions(Uuid topicId) { + return this.topicMetadata.containsKey(topicId) ? this.topicMetadata.get(topicId).numPartitions() : -1; + } + + /** + * Returns all the available racks associated with the replicas of the given partition. + * + * @param topicId Uuid corresponding to the partition's topic. + * @param partition Partition number within topic. + * @return The set of racks corresponding to the replicas of the topics partition. + * If the topic ID does not exist, an empty set is returned + */ + @Override + public Set<String> racksForPartition(Uuid topicId, int partition) { + return this.topicMetadata.containsKey(topicId) ? + this.topicMetadata.get(topicId).partitionRacks().get(partition) : + Collections.emptySet(); Review Comment: nit: Same comment as before. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java: ########## @@ -40,23 +45,31 @@ public class TopicMetadata { */ private final int numPartitions; + /** + * Map of every partition to a set of its rackIds. + * If the rack information is unavailable, this is an empty map. + */ + private final Map<Integer, Set<String>> partitionRacks; + public TopicMetadata( Uuid id, String name, - int numPartitions + int numPartitions, + Map<Integer, Set<String>> partitionRacks ) { - this.id = Objects.requireNonNull(id); - if (Uuid.ZERO_UUID.equals(id)) { - throw new IllegalArgumentException("Topic id cannot be ZERO_UUID."); - } - this.name = Objects.requireNonNull(name); - if (name.isEmpty()) { - throw new IllegalArgumentException("Topic name cannot be empty."); - } - this.numPartitions = numPartitions; - if (numPartitions < 0) { - throw new IllegalArgumentException("Number of partitions cannot be negative."); - } + this.id = Objects.requireNonNull(id); + this.partitionRacks = Objects.requireNonNull(partitionRacks); Review Comment: nit: Could we put this one last in the constructor? The line before it and after it goes together. ########## group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json: ########## @@ -29,7 +29,14 @@ { "name": "TopicName", "versions": "0+", "type": "string", "about": "The topic name." }, { "name": "NumPartitions", "versions": "0+", "type": "int32", - "about": "The number of partitions of the topic." } + "about": "The number of partitions of the topic." }, + { "name": "PartitionMetadata", "versions": "0+", "type": "[]PartitionMetadata", + "about": "Partitions mapped to a set of racks.", "fields": [ Review Comment: nit: Could we update the doc here to explain that we only store partition metadata if racks is non-empty? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java: ########## @@ -106,16 +129,26 @@ public String toString() { "id=" + id + ", name=" + name + ", numPartitions=" + numPartitions + + ", partitionRacks=" + partitionRacks + ')'; } public static TopicMetadata fromRecord( ConsumerGroupPartitionMetadataValue.TopicMetadata record ) { + // Converting the data type from a list stored in the record to a map. + Map<Integer, Set<String>> partitionRacks = new HashMap<>(); + for (ConsumerGroupPartitionMetadataValue.PartitionMetadata partitionMetadata : record.partitionMetadata()) { + partitionRacks.put( + partitionMetadata.partition(), + Collections.unmodifiableSet(new HashSet<>(partitionMetadata.racks())) + ); + } + return new TopicMetadata( record.topicId(), record.topicName(), - record.numPartitions() - ); + record.numPartitions(), + partitionRacks.isEmpty() ? Collections.emptyMap() : partitionRacks); Review Comment: nit: You can use `partitionRacks` in all cases here. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ########## @@ -145,13 +147,13 @@ public TargetAssignmentBuilder.TargetAssignmentResult build() { Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>(); // All the existing members are prepared. - members.forEach((memberId, member) -> { - memberSpecs.put(memberId, createAssignmentMemberSpec( + members.forEach((memberId, member) -> memberSpecs.put(memberId, + createAssignmentMemberSpec( member, targetAssignment.getOrDefault(memberId, Assignment.EMPTY), subscriptionMetadata - )); - }); + ) Review Comment: nit: It would be better like this if we really want to bing `memberSpecs.put` on the previous line. Otherwise, we can just keep the old format. ``` members.forEach((memberId, member) -> memberSpecs.put(memberId, createAssignmentMemberSpec( member, targetAssignment.getOrDefault(memberId, Assignment.EMPTY), subscriptionMetadata )); ``` ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ########## @@ -417,19 +417,23 @@ public void testUpdateSubscriptionMetadata() { consumerGroup.computeSubscriptionMetadata( null, null, - image.topics() + image.topics(), + image.cluster() ) ); // Compute while taking into account member 1. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)) + mkEntry("foo", + new TopicMetadata(fooTopicId, "foo", 1, Collections.emptyMap()) + ) Review Comment: nit: This format is weird. It should be as follow: ``` mkEntry( "foo", new TopicMetadata(fooTopicId, "foo", 1, Collections.emptyMap()) ) ``` or kept on one line. One line seems fine here. There are many other cases in this file. I won't mention them. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ########## @@ -168,20 +170,24 @@ public TargetAssignmentBuilder.TargetAssignmentResult build() { }); // Prepare the expected topic metadata. - Map<Uuid, AssignmentTopicMetadata> topicMetadata = new HashMap<>(); - subscriptionMetadata.forEach((topicName, metadata) -> { - topicMetadata.put(metadata.id(), new AssignmentTopicMetadata(metadata.numPartitions())); - }); + Map<Uuid, TopicMetadata> topicMetadataMap = new HashMap<>(); + subscriptionMetadata.forEach((topicName, topicMetadata) -> + topicMetadataMap.put(topicMetadata.id(), topicMetadata)); // Prepare the expected assignment spec. AssignmentSpec assignmentSpec = new AssignmentSpec( - memberSpecs, - topicMetadata + memberSpecs + ); + + SubscribedTopicMetadata assignmentTopicMetadata = new SubscribedTopicMetadata( + topicMetadataMap ); Review Comment: nit: Could you put this right after the `topicMetadataMap` preparation. They seems to go together. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ########## @@ -222,8 +231,25 @@ public void testCreateAssignmentMemberSpec() { Map<String, TopicMetadata> subscriptionMetadata = new HashMap<String, TopicMetadata>() { { - put("foo", new TopicMetadata(fooTopicId, "foo", 5)); - put("bar", new TopicMetadata(barTopicId, "bar", 5)); + put("foo", new TopicMetadata(fooTopicId, "foo", 5, + mkMap( + mkEntry(0, Collections.emptySet()), + mkEntry(1, Collections.emptySet()), + mkEntry(2, Collections.emptySet()), + mkEntry(3, Collections.emptySet()), + mkEntry(4, Collections.emptySet()) + ) Review Comment: As the racks are empty, we don't have to define the partitions, right? Could we just used `Collections.emptyMap()`? There are other cases in this file. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ########## @@ -203,7 +209,10 @@ public TargetAssignmentBuilder.TargetAssignmentResult build() { // Verify that the assignor was called once with the expected // assignment spec. - verify(assignor, times(1)).assign(assignmentSpec); + verify(assignor, times(1)) + .assign( + assignmentSpec, assignmentTopicMetadata + ); Review Comment: nit: This code format is weird. If you really want to break the line, I would format it as follow: ``` verify(assignor, times(1)).assign( assignmentSpec, assignmentTopicMetadata ); ``` or as follow (like you did at L189) ``` verify(assignor, times(1)) .assign(assignmentSpec, assignmentTopicMetadata); ``` ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java: ########## @@ -624,8 +745,27 @@ public void testDeleteMember() { 20 ); - Uuid fooTopicId = context.addTopicMetadata("foo", 6); - Uuid barTopicId = context.addTopicMetadata("bar", 6); + Uuid fooTopicId = context.addTopicMetadata("foo", 6, + mkMap( + mkEntry(0, Collections.emptySet()), + mkEntry(1, Collections.emptySet()), + mkEntry(2, Collections.emptySet()), + mkEntry(3, Collections.emptySet()), + mkEntry(4, Collections.emptySet()), + mkEntry(5, Collections.emptySet()) + ) + ); + + Uuid barTopicId = context.addTopicMetadata("bar", 6, Review Comment: Should we have at least one test where we set racks for some partitions? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +/** + * The subscribed topic metadata class is used by the {@link PartitionAssignor} to obtain + * topic and partition metadata for the topics that the consumer group is subscribed to. + */ +public class SubscribedTopicMetadata implements SubscribedTopicDescriber { Review Comment: Should we add basic unit tests for this class? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TopicMetadataTest.java: ########## @@ -28,24 +30,24 @@ public class TopicMetadataTest { @Test public void testAttributes() { Uuid topicId = Uuid.randomUuid(); - TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15); + TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15, Collections.emptyMap()); Review Comment: Should we set a map and verify that we get the correct one back? ########## reviewers.py: ########## @@ -28,7 +28,7 @@ def prompt_for_user(): while True: try: - user_input = input("\nName or email (case insensitive): ") + user_input = input("\nName or email (case insensitive): ") Review Comment: This file is still here :) ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java: ########## @@ -26,18 +26,17 @@ */ @InterfaceStability.Unstable public interface PartitionAssignor { - /** * Unique name for this assignor. */ String name(); /** - * Perform the group assignment given the current members and - * topic metadata. + * Assigns partitions to group members based on the given assignment specification and topic metadata. * - * @param assignmentSpec The assignment spec. + * @param assignmentSpec The assignment spec which included member metadata. + * @param subscribedTopicDescriber The topic and partition metadata describer {@link SubscribedTopicDescriber}. Review Comment: nit: I think that we don't have to put `{@link SubscribedTopicDescriber}` here as the javadoc will automatically link the type of `subscribedTopicDescriber`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org