[GitHub] [kafka] cmccabe commented on pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage

2023-05-10 Thread via GitHub


cmccabe commented on PR #13686:
URL: https://github.com/apache/kafka/pull/13686#issuecomment-1543363569

   > Also noting that this significantly changes the directory structure that 
the metadata shell presents.
   
   Agreed. The shell contents are interface-unstable, though.
   
   Maybe someday that will change but so far it has not. Therefore changes 
don't require a KIP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage

2023-05-10 Thread via GitHub


cmccabe commented on code in PR #13686:
URL: https://github.com/apache/kafka/pull/13686#discussion_r1190653264


##
metadata/src/main/java/org/apache/kafka/image/node/ScramCredentialDataNode.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.image.node.printer.MetadataNodePrinter;
+import org.apache.kafka.metadata.ScramCredentialData;
+
+
+public class ScramCredentialDataNode implements MetadataNode {
+private final ScramCredentialData data;
+
+public ScramCredentialDataNode(ScramCredentialData data) {
+this.data = data;
+}
+
+@Override
+public boolean isDirectory() {
+return false;
+}
+
+@Override
+public void print(MetadataNodePrinter printer) {
+StringBuilder bld = new StringBuilder();
+bld.append("ScramCredentialData");
+bld.append("(salt=");
+if (printer.redactionCriteria().shouldRedactScram()) {
+bld.append("[redacted]");
+} else {
+bld.append(Bytes.wrap(data.salt()).toString());
+}
+bld.append(", storedKey=");
+if (printer.redactionCriteria().shouldRedactScram()) {
+bld.append("[redacted]");
+} else {
+bld.append(Bytes.wrap(data.storedKey()).toString());
+}
+bld.append(", serverKey=");
+if (printer.redactionCriteria().shouldRedactScram()) {
+bld.append("[redacted]");
+} else {
+bld.append(Bytes.wrap(data.serverKey()).toString());
+}
+bld.append(", iterations=");
+if (printer.redactionCriteria().shouldRedactScram()) {
+bld.append("[redacted]");

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13473: KAFKA-14866:Remove controller module metrics when broker is shutting down

2023-05-10 Thread via GitHub


hudeqi commented on PR #13473:
URL: https://github.com/apache/kafka/pull/13473#issuecomment-1543327504

   @dajac When you get a chance, please take a look. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13473: KAFKA-14866:Remove controller module metrics when broker is shutting down

2023-05-10 Thread via GitHub


hudeqi commented on PR #13473:
URL: https://github.com/apache/kafka/pull/13473#issuecomment-1543326829

   > Hey @hudeqi I had a similar PR merge recently (#13623) and the comments I 
received in that PR could be applied to this one also. Specifically,
   > 
   > 1. the metric names could be verified individually in the test.
   > 2. the metric names could be moved in a const and we can use companion 
object to define those.
   > 
   > Perhaps, you are make those changes in this PR as request the same 
committer to review this PR as well?
   
   thanks! divijvaidya!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-10 Thread via GitHub


kamalcph commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1190617224


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,25 +623,212 @@ public String toString() {
 }
 }
 
-long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws 
RemoteStorageException {
-Optional offset = Optional.empty();
-Optional maybeLog = 
fetchLog.apply(topicIdPartition.topicPartition());
-if (maybeLog.isPresent()) {
-UnifiedLog log = maybeLog.get();
-Option maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-if (maybeLeaderEpochFileCache.isDefined()) {
-LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-OptionalInt epoch = cache.latestEpoch();
-while (!offset.isPresent() && epoch.isPresent()) {
-offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-epoch = cache.previousEpoch(epoch.getAsInt());
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadataOptional = 
epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadataOptional.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
+int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
+RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+if (firstBatch == null)
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+int firstBatchSize = firstBatch.sizeInBytes();
+// An empty record is sent instead of an incomplete batch when
+//  - there is no minimum-one-message constraint and
+//  - the first batch size is more than maximum bytes that can be 
sent and
+//  - for FetchRequest version 3 or above.
+if (!remoteStorageFetchInfo.minOneMessage &&
+!remoteStorageFetchInfo.hardMaxBytesLimit &&
+firstBatchSize > maxBytes) {
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY);
+}
+
+int updatedFetchSize =
+remoteStorageFetchInfo.minOneMessage && firstBatchSize > 
maxBytes ? firstBatchSize : maxBytes;
+
+ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+int remainingBytes = updatedFetchSize;
+
+firstBatch.writeTo(buffer);
+remainingBytes -= firstBatchSize;
+
+if (remainingBytes > 0) {
+// read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+Utils.readFully(remoteSegInputStream, buffer);
+}
+buffer.flip();
+
+FetchDataInfo fetchDataInfo = new FetchDataInfo(
+new LogOffsetMetadata(offset, 
remoteLogSegmentMetadata.startOffset(), startPos),
+MemoryRecords.readableRecords(buffer));
+  

[GitHub] [kafka] fqaiser94 commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-10 Thread via GitHub


fqaiser94 commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1190413444


##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
 self.stop_and_await()
 
+@cluster(num_nodes=6)
+@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+"""
+This test verifies that the cluster successfully upgrades despite 
changes in the table
+repartition topic format.
+
+Starts 3 KafkaStreams instances with version  and 
upgrades one-by-one to 
+"""
+
+extra_properties = {'test.run_table_agg': 'true'}
+
+self.set_up_services()
+
+self.driver.start()
+
+# encoding different target values for different versions
+#  - old version: value=A
+#  - new version with `upgrade_from` flag set: value=B
+#  - new version w/o `upgrade_from` set set: value=C
+
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'A'
+extra_properties['test.expected_agg_values'] = 'A'
+self.start_all_nodes_with(from_version, extra_properties)
+
+counter = 1
+random.seed()
+
+# rolling bounce
+random.shuffle(self.processors)
+p3 = self.processors[-1]
+for p in self.processors:
+p.CLEAN_NODE_ENABLED = False
+
+# bounce two instances to new version (verifies that new version can 
process records
+# written by old version)
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'B'
+extra_properties['test.expected_agg_values'] = 'A,B'
+for p in self.processors[:-1]:
+self.do_stop_start_bounce(p, from_version[:-2], to_version, 
counter, extra_properties)
+counter = counter + 1
+

Review Comment:
   ```suggestion
   # verify that old version can process records from new version
   self.wait_for_table_agg_success('A,B')
   ```
   
   Can we assert this condition earlier, like here? 
   I don't think we need to bounce the remaining instance before asserting this 
condition. 
   
   Here's how I'm thinking about it, if it helps: 
   
![image](https://github.com/apache/kafka/assets/20507243/7277a4df-0c93-4a42-8f7c-4d7adb02f6a6)
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fqaiser94 commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-10 Thread via GitHub


fqaiser94 commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1190413444


##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
 self.stop_and_await()
 
+@cluster(num_nodes=6)
+@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+"""
+This test verifies that the cluster successfully upgrades despite 
changes in the table
+repartition topic format.
+
+Starts 3 KafkaStreams instances with version  and 
upgrades one-by-one to 
+"""
+
+extra_properties = {'test.run_table_agg': 'true'}
+
+self.set_up_services()
+
+self.driver.start()
+
+# encoding different target values for different versions
+#  - old version: value=A
+#  - new version with `upgrade_from` flag set: value=B
+#  - new version w/o `upgrade_from` set set: value=C
+
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'A'
+extra_properties['test.expected_agg_values'] = 'A'
+self.start_all_nodes_with(from_version, extra_properties)
+
+counter = 1
+random.seed()
+
+# rolling bounce
+random.shuffle(self.processors)
+p3 = self.processors[-1]
+for p in self.processors:
+p.CLEAN_NODE_ENABLED = False
+
+# bounce two instances to new version (verifies that new version can 
process records
+# written by old version)
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'B'
+extra_properties['test.expected_agg_values'] = 'A,B'
+for p in self.processors[:-1]:
+self.do_stop_start_bounce(p, from_version[:-2], to_version, 
counter, extra_properties)
+counter = counter + 1
+

Review Comment:
   ```suggestion
   # verify that old version can process records from new version
   self.wait_for_table_agg_success('A,B')
   ```
   
   Can we assert this condition earlier, like here? 
   I don't think we need to bounce the remaining instance before asserting this 
condition. 
   
   Here's how I'm thinking about it: 
   
![image](https://github.com/apache/kafka/assets/20507243/7277a4df-0c93-4a42-8f7c-4d7adb02f6a6)
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fqaiser94 commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-10 Thread via GitHub


fqaiser94 commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1190418116


##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
 self.stop_and_await()
 
+@cluster(num_nodes=6)
+@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+"""
+This test verifies that the cluster successfully upgrades despite 
changes in the table
+repartition topic format.
+
+Starts 3 KafkaStreams instances with version  and 
upgrades one-by-one to 
+"""
+
+extra_properties = {'test.run_table_agg': 'true'}
+
+self.set_up_services()
+
+self.driver.start()
+
+# encoding different target values for different versions
+#  - old version: value=A
+#  - new version with `upgrade_from` flag set: value=B
+#  - new version w/o `upgrade_from` set set: value=C
+
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'A'
+extra_properties['test.expected_agg_values'] = 'A'
+self.start_all_nodes_with(from_version, extra_properties)
+
+counter = 1
+random.seed()
+
+# rolling bounce
+random.shuffle(self.processors)
+p3 = self.processors[-1]
+for p in self.processors:
+p.CLEAN_NODE_ENABLED = False
+
+# bounce two instances to new version (verifies that new version can 
process records
+# written by old version)
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'B'
+extra_properties['test.expected_agg_values'] = 'A,B'
+for p in self.processors[:-1]:
+self.do_stop_start_bounce(p, from_version[:-2], to_version, 
counter, extra_properties)
+counter = counter + 1
+
+# bounce remaining instance on old version (just for verification 
purposes, to verify that
+# instance on old version can process records written by new version)
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'A'
+extra_properties['test.expected_agg_values'] = 'A,B'
+self.do_stop_start_bounce(p3, None, from_version, counter, 
extra_properties)
+counter = counter + 1
+
+self.wait_for_table_agg_success('A,B')

Review Comment:
   ```suggestion
   # bounce remaining instance on old version to produce a new unique 
value
   extra_properties = extra_properties.copy()
   extra_properties['test.agg_produce_value'] = 'C'
   extra_properties['test.expected_agg_values'] = 'A,B,C'
   self.do_stop_start_bounce(p3, None, from_version, counter, 
extra_properties)
   counter = counter + 1
   
   # verify that new version can process records from old version
   self.wait_for_table_agg_success('A,B,C')
   ```
   
   I think it might be better to have this old (not-upgraded) instance start 
producing a new value `'C'` when we bounce it here. That way, we can assert 
using `self.wait_for_table_agg_success('A,B,C')` and be sure that the two 
upgraded instances have successfully processed messages from the old 
(not-upgraded) instance as well. 
   
   
![image](https://github.com/apache/kafka/assets/20507243/0f703375-d2b9-44e8-85f1-94a7dc2a6398)
   
   
   (Note, if you accept this change, you will need to make changes below here 
to produce new unique values like D,E,F, etc.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fqaiser94 commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-10 Thread via GitHub


fqaiser94 commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1190413444


##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
 self.stop_and_await()
 
+@cluster(num_nodes=6)
+@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+"""
+This test verifies that the cluster successfully upgrades despite 
changes in the table
+repartition topic format.
+
+Starts 3 KafkaStreams instances with version  and 
upgrades one-by-one to 
+"""
+
+extra_properties = {'test.run_table_agg': 'true'}
+
+self.set_up_services()
+
+self.driver.start()
+
+# encoding different target values for different versions
+#  - old version: value=A
+#  - new version with `upgrade_from` flag set: value=B
+#  - new version w/o `upgrade_from` set set: value=C
+
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'A'
+extra_properties['test.expected_agg_values'] = 'A'
+self.start_all_nodes_with(from_version, extra_properties)
+
+counter = 1
+random.seed()
+
+# rolling bounce
+random.shuffle(self.processors)
+p3 = self.processors[-1]
+for p in self.processors:
+p.CLEAN_NODE_ENABLED = False
+
+# bounce two instances to new version (verifies that new version can 
process records
+# written by old version)
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'B'
+extra_properties['test.expected_agg_values'] = 'A,B'
+for p in self.processors[:-1]:
+self.do_stop_start_bounce(p, from_version[:-2], to_version, 
counter, extra_properties)
+counter = counter + 1
+

Review Comment:
   ```suggestion
   # verify that old version can process records from new version
   self.wait_for_table_agg_success('A,B')
   ```
   
   Can we assert this condition earlier, like here? 
   I don't think we need to bounce the remaining instance before asserting this 
condition. 
   
   
![image](https://github.com/apache/kafka/assets/20507243/05ebd2fe-3192-47ea-9d25-9da337919a01)
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fqaiser94 commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-10 Thread via GitHub


fqaiser94 commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1190412985


##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
 self.stop_and_await()
 
+@cluster(num_nodes=6)
+@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+"""
+This test verifies that the cluster successfully upgrades despite 
changes in the table
+repartition topic format.
+
+Starts 3 KafkaStreams instances with version  and 
upgrades one-by-one to 
+"""
+
+extra_properties = {'test.run_table_agg': 'true'}
+
+self.set_up_services()
+
+self.driver.start()
+
+# encoding different target values for different versions
+#  - old version: value=A
+#  - new version with `upgrade_from` flag set: value=B
+#  - new version w/o `upgrade_from` set set: value=C
+
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'A'
+extra_properties['test.expected_agg_values'] = 'A'
+self.start_all_nodes_with(from_version, extra_properties)
+
+counter = 1
+random.seed()
+
+# rolling bounce
+random.shuffle(self.processors)
+p3 = self.processors[-1]
+for p in self.processors:
+p.CLEAN_NODE_ENABLED = False
+

Review Comment:
   ```suggestion
   self.wait_for_table_agg_success('A')
   ```
   
   nit: could we add this assertion here? This is more to "set the stage" for 
readers of the code and make the state transitions happening in the test easier 
to grasp. My understanding of the situation at this point in the code is as 
follows: 
   
   
![image](https://github.com/apache/kafka/assets/20507243/dc5c5c90-8f28-42dd-8d38-4ec55bd12e29)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13697: MINOR:code optimization in QuorumController

2023-05-10 Thread via GitHub


hudeqi commented on code in PR #13697:
URL: https://github.com/apache/kafka/pull/13697#discussion_r1190576044


##
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##
@@ -350,6 +350,7 @@ public void testFenceMultipleBrokers() throws Throwable {
 }
 }
 
+@Disabled

Review Comment:
   > I guess it was removed accidentally since there are no related discussion 
about this change.
   
   Ok thanks, I've fixed it. @dengziming 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming merged pull request #13693: MINOR: fix a small typo in SharedServer.scala

2023-05-10 Thread via GitHub


dengziming merged PR #13693:
URL: https://github.com/apache/kafka/pull/13693


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #13697: MINOR:code optimization in QuorumController

2023-05-10 Thread via GitHub


dengziming commented on code in PR #13697:
URL: https://github.com/apache/kafka/pull/13697#discussion_r1190566126


##
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##
@@ -350,6 +350,7 @@ public void testFenceMultipleBrokers() throws Throwable {
 }
 }
 
+@Disabled

Review Comment:
   I guess it was removed accidentally since there are no related discussion 
about this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14983) Upgrade jetty-server to 9.4.51

2023-05-10 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14983:
--
Fix Version/s: 3.4.1

> Upgrade jetty-server to 9.4.51
> --
>
> Key: KAFKA-14983
> URL: https://issues.apache.org/jira/browse/KAFKA-14983
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.4.0
>Reporter: Beltran
>Priority: Minor
> Fix For: 3.5.0, 3.4.1
>
>
> Kafka latest versions e.g. 3.4.0 includes jetty-server-9.4.48.v20220622.jar 
> that includes 2 vulnerabilities: CVE-2023-26048 and CVE-2023-26049. Upgrading 
> them to 9.4.51 would fix those issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190519534


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group

Review Comment:
   this looks grammatically correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190517157


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned

Review Comment:
   this looks grammatically correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim opened a new pull request, #13704: KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers

2023-05-10 Thread via GitHub


jeffkbkim opened a new pull request, #13704:
URL: https://github.com/apache/kafka/pull/13704

   This PR enables the new group metadata manager to generate 
GroupMetadataKey/Value records.
   
   Built on top of https://github.com/apache/kafka/pull/13663. Files to review:
* RecordHelpers.java
* RecordHelpersTest.java
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-10 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1190477811


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] cmccabe commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage

2023-05-10 Thread via GitHub


cmccabe commented on code in PR #13686:
URL: https://github.com/apache/kafka/pull/13686#discussion_r1190463495


##
metadata/src/main/java/org/apache/kafka/image/node/ConfigurationsImageNode.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.image.ConfigurationImage;
+import org.apache.kafka.image.ConfigurationsImage;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+
+public class ConfigurationsImageNode implements MetadataNode {
+/**
+ * The name of this node.
+ */
+public final static String NAME = "configs";
+
+/**
+ * The configurations image.
+ */
+private final ConfigurationsImage image;
+
+public ConfigurationsImageNode(ConfigurationsImage image) {
+this.image = image;
+}
+
+@Override
+public Collection childNames() {
+ArrayList childNames = new ArrayList<>();
+for (ConfigResource configResource : image.resourceData().keySet()) {
+if (configResource.isDefault()) {
+childNames.add(configResource.type().name());
+} else {
+childNames.add(configResource.type().name() + "_" + 
configResource.name());
+}
+}
+return childNames;
+}
+
+private static ConfigResource resourceFromName(String name) {

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-10 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1190455545


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-10 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1190454574


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-10 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1190437758


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-10 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1190447106


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-10 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1190445435


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] cmccabe commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage

2023-05-10 Thread via GitHub


cmccabe commented on code in PR #13686:
URL: https://github.com/apache/kafka/pull/13686#discussion_r1190440507


##
metadata/src/main/java/org/apache/kafka/image/node/ConfigurationImageNode.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.image.ConfigurationImage;
+import org.apache.kafka.image.node.printer.MetadataNodePrinter;
+
+import java.util.Collection;
+
+
+public class ConfigurationImageNode implements MetadataNode {
+/**
+ * The configuration image for a specific resource.
+ */
+private final ConfigurationImage image;
+
+public ConfigurationImageNode(ConfigurationImage image) {
+this.image = image;
+}
+
+@Override
+public Collection childNames() {
+return image.data().keySet();
+}
+
+@Override
+public MetadataNode child(String name) {
+String value = image.data().get(name);
+if (value == null) return null;
+return new MetadataNode() {
+@Override
+public boolean isDirectory() {
+return false;
+}
+
+@Override
+public void print(MetadataNodePrinter printer) {
+if (printer.redactionCriteria().

Review Comment:
   added



##
metadata/src/main/java/org/apache/kafka/image/node/printer/MetadataNodeRedactionCriteria.java:
##
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node.printer;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.metadata.KafkaConfigSchema;
+
+
+public interface MetadataNodeRedactionCriteria {
+/**
+ * Returns true if SCRAM data should be redacted.
+ */
+boolean shouldRedactScram();
+
+/**
+ * Returns true if a configuration should be redacted.
+ *
+ * @param type  The configuration type.
+ * @param key   The configuration key.
+ *
+ * @return  True if the configuration should be redacted.
+ */
+boolean shouldRedactConfig(ConfigResource.Type type, String key);
+
+class Strict implements MetadataNodeRedactionCriteria {

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-10 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1190438367


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-10 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1190437758


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-10 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1190436747


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] cmccabe commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage

2023-05-10 Thread via GitHub


cmccabe commented on code in PR #13686:
URL: https://github.com/apache/kafka/pull/13686#discussion_r1190434740


##
metadata/src/main/java/org/apache/kafka/image/node/ClientQuotasImageNode.java:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.image.ClientQuotaImage;
+import org.apache.kafka.image.ClientQuotasImage;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.kafka.common.quota.ClientQuotaEntity.CLIENT_ID;
+import static org.apache.kafka.common.quota.ClientQuotaEntity.IP;
+import static org.apache.kafka.common.quota.ClientQuotaEntity.USER;
+
+
+public class ClientQuotasImageNode implements MetadataNode {
+/**
+ * The name of this node.
+ */
+public static final String NAME = "clientQuotas";
+
+/**
+ * The topics image.
+ */
+private final ClientQuotasImage image;
+
+public ClientQuotasImageNode(ClientQuotasImage image) {
+this.image = image;
+}
+
+@Override
+public Collection childNames() {
+ArrayList childNames = new ArrayList<>();
+for (ClientQuotaEntity entity : image.entities().keySet()) {
+childNames.add(clientQuotaEntityToString(entity));
+}
+return childNames;
+}
+
+static String clientQuotaEntityToString(ClientQuotaEntity entity) {
+if (entity.entries().isEmpty()) {
+throw new RuntimeException("Invalid empty entity");
+}

Review Comment:
   I added a test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-10 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1190433019


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[jira] [Commented] (KAFKA-14911) Add system tests for rolling upgrade path of KIP-904

2023-05-10 Thread Farooq Qaiser (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721555#comment-17721555
 ] 

Farooq Qaiser commented on KAFKA-14911:
---

My apologies, I only saw this message yesterday, let me see if I can help out 
with reviews at this point.  

> Add system tests for rolling upgrade path of KIP-904
> 
>
> Key: KAFKA-14911
> URL: https://issues.apache.org/jira/browse/KAFKA-14911
> Project: Kafka
>  Issue Type: Test
>Reporter: Farooq Qaiser
>Assignee: Victoria Xia
>Priority: Major
> Fix For: 3.5.0
>
>
> As per [~mjsax] comment 
> [here|https://github.com/apache/kafka/pull/10747#pullrequestreview-1376539752],
>  we should add a system test to test the rolling upgrade path for 
> [KIP-904|https://cwiki.apache.org/confluence/x/P5VbDg] which introduces a new 
> serialization format for groupBy internal repartition topics and was 
> implemented as part of https://issues.apache.org/jira/browse/KAFKA-12446 
> There is `StreamsUpgradeTest.java` and `streams_upgrade_test.py` (cf 
> `test_rolling_upgrade_with_2_bounces`) as a starting point.
> Might be best to do a similar thing as for FK-joins, and add a new test 
> variation. 
> The tricky thing about the test would be, to ensure that the repartition 
> topic is not empty when we do the bounce, so the test should be setup 
> accordingly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-10 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1190428691


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] fqaiser94 commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-10 Thread via GitHub


fqaiser94 commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1190418116


##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
 self.stop_and_await()
 
+@cluster(num_nodes=6)
+@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+"""
+This test verifies that the cluster successfully upgrades despite 
changes in the table
+repartition topic format.
+
+Starts 3 KafkaStreams instances with version  and 
upgrades one-by-one to 
+"""
+
+extra_properties = {'test.run_table_agg': 'true'}
+
+self.set_up_services()
+
+self.driver.start()
+
+# encoding different target values for different versions
+#  - old version: value=A
+#  - new version with `upgrade_from` flag set: value=B
+#  - new version w/o `upgrade_from` set set: value=C
+
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'A'
+extra_properties['test.expected_agg_values'] = 'A'
+self.start_all_nodes_with(from_version, extra_properties)
+
+counter = 1
+random.seed()
+
+# rolling bounce
+random.shuffle(self.processors)
+p3 = self.processors[-1]
+for p in self.processors:
+p.CLEAN_NODE_ENABLED = False
+
+# bounce two instances to new version (verifies that new version can 
process records
+# written by old version)
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'B'
+extra_properties['test.expected_agg_values'] = 'A,B'
+for p in self.processors[:-1]:
+self.do_stop_start_bounce(p, from_version[:-2], to_version, 
counter, extra_properties)
+counter = counter + 1
+
+# bounce remaining instance on old version (just for verification 
purposes, to verify that
+# instance on old version can process records written by new version)
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'A'
+extra_properties['test.expected_agg_values'] = 'A,B'
+self.do_stop_start_bounce(p3, None, from_version, counter, 
extra_properties)
+counter = counter + 1
+
+self.wait_for_table_agg_success('A,B')

Review Comment:
   ```suggestion
   # bounce remaining instance on old version to produce a new unique 
value
   extra_properties = extra_properties.copy()
   extra_properties['test.agg_produce_value'] = 'C'
   extra_properties['test.expected_agg_values'] = 'A,B,C'
   self.do_stop_start_bounce(p3, None, from_version, counter, 
extra_properties)
   counter = counter + 1
   
   # verify that new version can process records from old version
   self.wait_for_table_agg_success('A,B,C')
   ```
   
   I think it might be better to have this old (not-upgraded) instance start 
producing a new value `'C'` when we bounce it here. That way, we can assert 
using `self.wait_for_table_agg_success('A,B,C')` and be sure that the two 
upgraded instances have successfully processed messages from the old 
(not-upgraded) instance as well. 
   
   (Note, if you accept this change, you will need to make changes below here 
to produce new unique values like D,E,F, etc.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fqaiser94 commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904

2023-05-10 Thread via GitHub


fqaiser94 commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1190412985


##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
 self.stop_and_await()
 
+@cluster(num_nodes=6)
+@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+"""
+This test verifies that the cluster successfully upgrades despite 
changes in the table
+repartition topic format.
+
+Starts 3 KafkaStreams instances with version  and 
upgrades one-by-one to 
+"""
+
+extra_properties = {'test.run_table_agg': 'true'}
+
+self.set_up_services()
+
+self.driver.start()
+
+# encoding different target values for different versions
+#  - old version: value=A
+#  - new version with `upgrade_from` flag set: value=B
+#  - new version w/o `upgrade_from` set set: value=C
+
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'A'
+extra_properties['test.expected_agg_values'] = 'A'
+self.start_all_nodes_with(from_version, extra_properties)
+
+counter = 1
+random.seed()
+
+# rolling bounce
+random.shuffle(self.processors)
+p3 = self.processors[-1]
+for p in self.processors:
+p.CLEAN_NODE_ENABLED = False
+

Review Comment:
   ```suggestion
   self.wait_for_table_agg_success('A')
   ```
   
   nit: could we add this here? 
   This is would just ensure that at this point all instances have processed a 
message and their aggregates look like `Agg(List('A'))`. 



##
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
 self.stop_and_await()
 
+@cluster(num_nodes=6)
+@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+"""
+This test verifies that the cluster successfully upgrades despite 
changes in the table
+repartition topic format.
+
+Starts 3 KafkaStreams instances with version  and 
upgrades one-by-one to 
+"""
+
+extra_properties = {'test.run_table_agg': 'true'}
+
+self.set_up_services()
+
+self.driver.start()
+
+# encoding different target values for different versions
+#  - old version: value=A
+#  - new version with `upgrade_from` flag set: value=B
+#  - new version w/o `upgrade_from` set set: value=C
+
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'A'
+extra_properties['test.expected_agg_values'] = 'A'
+self.start_all_nodes_with(from_version, extra_properties)
+
+counter = 1
+random.seed()
+
+# rolling bounce
+random.shuffle(self.processors)
+p3 = self.processors[-1]
+for p in self.processors:
+p.CLEAN_NODE_ENABLED = False
+
+# bounce two instances to new version (verifies that new version can 
process records
+# written by old version)
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'B'
+extra_properties['test.expected_agg_values'] = 'A,B'
+for p in self.processors[:-1]:
+self.do_stop_start_bounce(p, from_version[:-2], to_version, 
counter, extra_properties)
+counter = counter + 1
+
+# bounce remaining instance on old version (just for verification 
purposes, to verify that
+# instance on old version can process records written by new version)
+extra_properties = extra_properties.copy()
+extra_properties['test.agg_produce_value'] = 'A'
+extra_properties['test.expected_agg_values'] = 'A,B'
+self.do_stop_start_bounce(p3, None, from_version, counter, 
extra_properties)
+counter = counter + 1
+
+self.wait_for_table_agg_success('A,B')

Review Comment:
   ```suggestion
   # bounce remaining instance on old version to produce a new unique 
value
   extra_properties = extra_properties.copy()
   extra_properties['test.agg_produce_value'] = 'C'
   extra_properties['test.expected_agg_values'] = 'A,B,C'
   self.do_stop_start_bounce(p3, None, from_version, counter, 
extra_properties)
   counter = counter + 1
   
   # verify that new version can process records from old version
   self.wait_for_table_agg_success('A,B,C')
   ```
   
   I think it might be better to have this old (not-upgraded) instance start 
producing a new value when we bounce it here. That way, we can 

[GitHub] [kafka] cmccabe commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage

2023-05-10 Thread via GitHub


cmccabe commented on code in PR #13686:
URL: https://github.com/apache/kafka/pull/13686#discussion_r1190417412


##
metadata/src/main/java/org/apache/kafka/image/node/MetadataImageNode.java:
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.image.MetadataImage;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+
+public class MetadataImageNode implements MetadataNode {
+/**
+ * The name of this node.
+ */
+public final static String NAME = "image";
+
+/**
+ * The metadata image.
+ */
+private final MetadataImage image;
+
+public MetadataImageNode(MetadataImage image) {
+this.image = image;
+}
+
+public MetadataImage image() {
+return image;
+}
+
+@Override
+public Collection childNames() {
+return Arrays.asList(
+ProvenanceNode.NAME,
+FeaturesImageNode.NAME,
+ClusterImageNode.NAME,
+TopicsImageNode.NAME,
+ConfigurationsImageNode.NAME,
+ClientQuotasImageNode.NAME,
+ProducerIdsImageNode.NAME,
+AclsImageNode.NAME,
+ScramImageNode.NAME
+);
+}
+
+@Override
+public MetadataNode child(String name) {

Review Comment:
   In general I wanted to avoid creating the nodes until needed. But I think I 
can improve this a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage

2023-05-10 Thread via GitHub


cmccabe commented on code in PR #13686:
URL: https://github.com/apache/kafka/pull/13686#discussion_r1190417412


##
metadata/src/main/java/org/apache/kafka/image/node/MetadataImageNode.java:
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.image.MetadataImage;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+
+public class MetadataImageNode implements MetadataNode {
+/**
+ * The name of this node.
+ */
+public final static String NAME = "image";
+
+/**
+ * The metadata image.
+ */
+private final MetadataImage image;
+
+public MetadataImageNode(MetadataImage image) {
+this.image = image;
+}
+
+public MetadataImage image() {
+return image;
+}
+
+@Override
+public Collection childNames() {
+return Arrays.asList(
+ProvenanceNode.NAME,
+FeaturesImageNode.NAME,
+ClusterImageNode.NAME,
+TopicsImageNode.NAME,
+ConfigurationsImageNode.NAME,
+ClientQuotasImageNode.NAME,
+ProducerIdsImageNode.NAME,
+AclsImageNode.NAME,
+ScramImageNode.NAME
+);
+}
+
+@Override
+public MetadataNode child(String name) {

Review Comment:
   Yes, I think I can improve this a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-10 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1190417026


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-10 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1190416089


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jolshan commented on a diff in pull request #13639: KAFKA-14462; [12/N] Add GroupMetadataManager and ConsumerGroup

2023-05-10 Thread via GitHub


jolshan commented on code in PR #13639:
URL: https://github.com/apache/kafka/pull/13639#discussion_r1190415641


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:
+ * 1) The request handlers which handle the requests and generate a 

[GitHub] [kafka] jeqo commented on pull request #12647: KAFKA-14191: Add end-to-end latency metrics to Connectors

2023-05-10 Thread via GitHub


jeqo commented on PR #12647:
URL: https://github.com/apache/kafka/pull/12647#issuecomment-1542836120

   @sfc-gh-japatel thanks for the interest and reminder! Bumping the vote 
thread on the mailing list as there is 1 binding vote missing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage

2023-05-10 Thread via GitHub


cmccabe commented on code in PR #13686:
URL: https://github.com/apache/kafka/pull/13686#discussion_r1190379066


##
metadata/src/main/java/org/apache/kafka/image/node/ProducerIdsImageNode.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.image.ProducerIdsImage;
+
+import java.util.Collection;
+import java.util.Collections;
+
+
+public class ProducerIdsImageNode implements MetadataNode {
+/**
+ * The name of this node.
+ */
+public static final String NAME = "producerIds";
+
+/**
+ * The producer IDs image.
+ */
+private final ProducerIdsImage image;
+
+public ProducerIdsImageNode(ProducerIdsImage image) {
+this.image = image;
+}
+
+@Override
+public Collection childNames() {
+return Collections.singletonList("nextProducerId");
+}
+
+@Override
+public MetadataNode child(String name) {
+if (name.equals("nextProducerId")) {
+return new MetadataLeafNode(image.highestSeenProducerId() + "");

Review Comment:
   Looking at the code, it actually is the next producer ID and *not* the 
"highest seen", except in the special case where we haven't issued any producer 
IDs, where it's -1. That special case is a bit silly... we could have just had 
it be 0 there.
   
   In the future we could just use 0 instead of -1 as the start value to avoid 
the special case. But in the interest of keeping this change small I'll rename 
the accessor to `nextProducerId` and add a JavaDoc comment about the special 
case. The accessor is just used for debugging and so on in any case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #13695: HOTFIX: fix the VersionedKeyValueToBytesStoreAdapter#isOpen API

2023-05-10 Thread via GitHub


ableegoldman commented on PR #13695:
URL: https://github.com/apache/kafka/pull/13695#issuecomment-1542782902

   Merged to trunk and cherrypicked to 3.5


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #13695: HOTFIX: fix the VersionedKeyValueToBytesStoreAdapter#isOpen API

2023-05-10 Thread via GitHub


ableegoldman merged PR #13695:
URL: https://github.com/apache/kafka/pull/13695


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14981) Set `group.instance.id` in streams consumer so that rebalance will not happen if a instance is restarted

2023-05-10 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721546#comment-17721546
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14981:


Have we resolved all the known issues with static membership? IIRC there were 
some that required broker-side changes, could we accidentally introduce 
correctness-related bugs in Streams applications running against older 
clusters? 

Maybe I'm being paranoid, but I thought we had begun to recommend against using 
static membership in Streams (due to the above and/or general stability issues, 
though perhaps most/all of those have been sorted out by now – do we know?)

> Set `group.instance.id` in streams consumer so that rebalance will not happen 
> if a instance is restarted
> 
>
> Key: KAFKA-14981
> URL: https://issues.apache.org/jira/browse/KAFKA-14981
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Priority: Minor
>
> `group.instance.id` enables static membership so that if a consumer is 
> restarted within `session.timeout.ms`, rebalance will not be triggered and 
> originally assignment can be returned directly from broker. We can set this 
> id in Kafka streams using `threadId` so that no rebalance is trigger within 
> `session.timeout.ms`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rondagostino commented on pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage

2023-05-10 Thread via GitHub


rondagostino commented on PR #13686:
URL: https://github.com/apache/kafka/pull/13686#issuecomment-1542774765

   Also noting that this significantly changes the directory structure that the 
metadata shell presents.  We have `/local` and `/image`, for example, with 
whatever is currently under `/` moving under `/image`.  We also have no `data` 
directory underneath a topic partition.  For example:
   `tree /`
   ```
   image:
 acls:
   byId:
 clientQuotas:
 cluster:
   1:
 BrokerRegistration(id=1, epoch=6147, 
incarnationId=M6koIGKwQdCU_PMCtLatew, 
listeners=[Endpoint(listenerName='PLAINTEXT', securityProtocol=PLAINTEXT, 
host='localhost', port=9092)], supportedFeatures={metadata.version: 1-11}, 
rack=Optional.empty, fenced=false, inControlledShutdown=false, 
isMigratingZkBroker=false)
 configs:
 features:
   metadataVersion:
 3.5-IV2
   zkMigrationState:
 NONE
 producerIds:
   nextProducerId:
 -1
 provenance:
   offset 6, epoch 3, time 2023-05-10T20:06:51.486Z[UTC]
 scram:
   SCRAM-SHA-256:
   SCRAM-SHA-512:
 topics:
   byId:
 8ba_iKksTFagQTBNJksHPw:
   0:
 PartitionRegistration(replicas=[1], isr=[1], removingReplicas=[], 
addingReplicas=[], leader=1, leaderRecoveryState=RECOVERED, leaderEpoch=0, 
partitionEpoch=0)
   1:
 PartitionRegistration(replicas=[1], isr=[1], removingReplicas=[], 
addingReplicas=[], leader=1, leaderRecoveryState=RECOVERED, leaderEpoch=0, 
partitionEpoch=0)
   id:
 8ba_iKksTFagQTBNJksHPw
   name:
 foo
   byName:
 foo:
   0:
 PartitionRegistration(replicas=[1], isr=[1], removingReplicas=[], 
addingReplicas=[], leader=1, leaderRecoveryState=RECOVERED, leaderEpoch=0, 
partitionEpoch=0)
   1:
 PartitionRegistration(replicas=[1], isr=[1], removingReplicas=[], 
addingReplicas=[], leader=1, leaderRecoveryState=RECOVERED, leaderEpoch=0, 
partitionEpoch=0)
   id:
 8ba_iKksTFagQTBNJksHPw
   name:
 foo
   local:
 commitId:
   f42187de19772e83
 version:
   3.6.0-SNAPSHOT
   ```
   
   Whereas with the current version (there is no `tree` command -- a really 
nice addition!)
   `ls /`
   ```
   brokers  features  local  metadataQuorum  topicIds  topics
   ```
   `cat /topics/foo/0/data`
   ```
   {
 "partitionId" : 0,
 "topicId" : "8ba_iKksTFagQTBNJksHPw",
 "replicas" : [ 1 ],
 "isr" : [ 1 ],
 "removingReplicas" : [ ],
 "addingReplicas" : [ ],
 "leader" : 1,
 "leaderEpoch" : 0,
 "partitionEpoch" : 0
   }
   ```
   
   I don't think this is problematic and does not require a KIP, but would like 
to confirm.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190358529


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingJoinMembers = new HashSet<>();
+
+/**
+ * The number of members waiting to hear back a join response.
+ */
+private int numMembersAwaitingJoinResponse = 0;
+
+/**
+ * Map of protocol names to how many members support them.
+ */
+private final Map supportedProtocols = new HashMap<>();
+
+/**
+ * Members who have yet to sync with the group
+ * during the sync group phase.
+ */
+private final Set pendingSyncMembers = new HashSet<>();
+
+/**
+ * The list of topics the group members are 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190358529


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingJoinMembers = new HashSet<>();
+
+/**
+ * The number of members waiting to hear back a join response.
+ */
+private int numMembersAwaitingJoinResponse = 0;
+
+/**
+ * Map of protocol names to how many members support them.
+ */
+private final Map supportedProtocols = new HashMap<>();
+
+/**
+ * Members who have yet to sync with the group
+ * during the sync group phase.
+ */
+private final Set pendingSyncMembers = new HashSet<>();
+
+/**
+ * The list of topics the group members are 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190358529


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingJoinMembers = new HashSet<>();
+
+/**
+ * The number of members waiting to hear back a join response.
+ */
+private int numMembersAwaitingJoinResponse = 0;
+
+/**
+ * Map of protocol names to how many members support them.
+ */
+private final Map supportedProtocols = new HashMap<>();
+
+/**
+ * Members who have yet to sync with the group
+ * during the sync group phase.
+ */
+private final Set pendingSyncMembers = new HashSet<>();
+
+/**
+ * The list of topics the group members are 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190358529


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingJoinMembers = new HashSet<>();
+
+/**
+ * The number of members waiting to hear back a join response.
+ */
+private int numMembersAwaitingJoinResponse = 0;
+
+/**
+ * Map of protocol names to how many members support them.
+ */
+private final Map supportedProtocols = new HashMap<>();
+
+/**
+ * Members who have yet to sync with the group
+ * during the sync group phase.
+ */
+private final Set pendingSyncMembers = new HashSet<>();
+
+/**
+ * The list of topics the group members are 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190356950


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingJoinMembers = new HashSet<>();
+
+/**
+ * The number of members waiting to hear back a join response.
+ */
+private int numMembersAwaitingJoinResponse = 0;
+
+/**
+ * Map of protocol names to how many members support them.
+ */
+private final Map supportedProtocols = new HashMap<>();
+
+/**
+ * Members who have yet to sync with the group
+ * during the sync group phase.
+ */
+private final Set pendingSyncMembers = new HashSet<>();
+
+/**
+ * The list of topics the group members are 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190339738


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingJoinMembers = new HashSet<>();
+
+/**
+ * The number of members waiting to hear back a join response.
+ */
+private int numMembersAwaitingJoinResponse = 0;
+
+/**
+ * Map of protocol names to how many members support them.
+ */
+private final Map supportedProtocols = new HashMap<>();
+
+/**
+ * Members who have yet to sync with the group
+ * during the sync group phase.
+ */
+private final Set pendingSyncMembers = new HashSet<>();
+
+/**
+ * The list of topics the group members are 

[GitHub] [kafka] rondagostino commented on a diff in pull request #13686: MINOR: Create the MetadataNode classes to introspect MetadataImage

2023-05-10 Thread via GitHub


rondagostino commented on code in PR #13686:
URL: https://github.com/apache/kafka/pull/13686#discussion_r1190240987


##
metadata/src/main/java/org/apache/kafka/image/node/ProducerIdsImageNode.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.image.ProducerIdsImage;
+
+import java.util.Collection;
+import java.util.Collections;
+
+
+public class ProducerIdsImageNode implements MetadataNode {
+/**
+ * The name of this node.
+ */
+public static final String NAME = "producerIds";
+
+/**
+ * The producer IDs image.
+ */
+private final ProducerIdsImage image;
+
+public ProducerIdsImageNode(ProducerIdsImage image) {
+this.image = image;
+}
+
+@Override
+public Collection childNames() {
+return Collections.singletonList("nextProducerId");
+}
+
+@Override
+public MetadataNode child(String name) {
+if (name.equals("nextProducerId")) {
+return new MetadataLeafNode(image.highestSeenProducerId() + "");

Review Comment:
   Is the highest seen the next one, or is the highest seen 1 less than the 
next one?  It seems this code as written is correct (i.e. the highest seen the 
next one) since the original `toString()` method was invoking 
`"ProducerIdsImage(highestSeenProducerId=" + nextProducerId + ")"`.  But just 
want to confirm.



##
metadata/src/main/java/org/apache/kafka/image/node/MetadataImageNode.java:
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.image.MetadataImage;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+
+public class MetadataImageNode implements MetadataNode {
+/**
+ * The name of this node.
+ */
+public final static String NAME = "image";
+
+/**
+ * The metadata image.
+ */
+private final MetadataImage image;
+
+public MetadataImageNode(MetadataImage image) {
+this.image = image;
+}
+
+public MetadataImage image() {
+return image;
+}
+
+@Override
+public Collection childNames() {
+return Arrays.asList(
+ProvenanceNode.NAME,
+FeaturesImageNode.NAME,
+ClusterImageNode.NAME,
+TopicsImageNode.NAME,
+ConfigurationsImageNode.NAME,
+ClientQuotasImageNode.NAME,
+ProducerIdsImageNode.NAME,
+AclsImageNode.NAME,
+ScramImageNode.NAME
+);
+}
+
+@Override
+public MetadataNode child(String name) {

Review Comment:
   Since we have to keep these two methods in sync, is it worth setting up a 
static map and using that?  Perhaps not since the child names have to be 
sorted... but figured I would raise it as a possibility.



##
metadata/src/main/java/org/apache/kafka/image/node/ClientQuotasImageNode.java:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r119078


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingJoinMembers = new HashSet<>();
+
+/**
+ * The number of members waiting to hear back a join response.
+ */
+private int numMembersAwaitingJoinResponse = 0;
+
+/**
+ * Map of protocol names to how many members support them.

Review Comment:
   nit: Map of protocol names to the number of members that support them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190331659


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group

Review Comment:
   nit: have -> are*



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190330383


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned

Review Comment:
   nit:- of -> from*



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190329439


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.

Review Comment:
   nit: a* client id or a* group instance id



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190316117


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupState.java:
##
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Represents all states that a generic group can be in, as well as the states 
that a group must
+ * be in to transition to a particular state.
+ */
+public enum GenericGroupState {
+
+/**
+ * Group has no more members, but lingers until all offsets have expired. 
This state
+ * also represents groups which use Kafka only for offset commits and have 
no members.
+ *
+ * action: respond normally to join group from new members
+ * respond to sync group with UNKNOWN_MEMBER_ID
+ * respond to heartbeat with UNKNOWN_MEMBER_ID
+ * respond to leave group with UNKNOWN_MEMBER_ID
+ * respond to offset commit with UNKNOWN_MEMBER_ID
+ * allow offset fetch requests
+ * transition: last offsets removed in periodic expiration task => DEAD
+ * join group from a new member => PREPARING_REBALANCE
+ * group is removed by partition emigration => DEAD
+ * group is removed by expiration => DEAD
+ */
+EMPTY(),
+
+/**
+ * Group is preparing to rebalance

Review Comment:
   same with most of the other comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190315524


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupState.java:
##
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Represents all states that a generic group can be in, as well as the states 
that a group must
+ * be in to transition to a particular state.
+ */
+public enum GenericGroupState {
+
+/**
+ * Group has no more members, but lingers until all offsets have expired. 
This state
+ * also represents groups which use Kafka only for offset commits and have 
no members.
+ *
+ * action: respond normally to join group from new members
+ * respond to sync group with UNKNOWN_MEMBER_ID
+ * respond to heartbeat with UNKNOWN_MEMBER_ID
+ * respond to leave group with UNKNOWN_MEMBER_ID
+ * respond to offset commit with UNKNOWN_MEMBER_ID
+ * allow offset fetch requests
+ * transition: last offsets removed in periodic expiration task => DEAD
+ * join group from a new member => PREPARING_REBALANCE
+ * group is removed by partition emigration => DEAD
+ * group is removed by expiration => DEAD
+ */
+EMPTY(),
+
+/**
+ * Group is preparing to rebalance

Review Comment:
   nit: missing a full stop



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190313449


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java:
##
@@ -0,0 +1,936 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+

Review Comment:
   nit: remove extra line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190312512


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingJoinMembers = new HashSet<>();
+
+/**
+ * The number of members waiting to hear back a join response.
+ */
+private int numMembersAwaitingJoinResponse = 0;
+
+/**
+ * Map of protocol names to how many members support them.
+ */
+private final Map supportedProtocols = new HashMap<>();
+
+/**
+ * Members who have yet to sync with the group
+ * during the sync group phase.
+ */
+private final Set pendingSyncMembers = new HashSet<>();
+
+/**
+ * The list of topics the group members are 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190308136


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingJoinMembers = new HashSet<>();
+
+/**
+ * The number of members waiting to hear back a join response.
+ */
+private int numMembersAwaitingJoinResponse = 0;
+
+/**
+ * Map of protocol names to how many members support them.
+ */
+private final Map supportedProtocols = new HashMap<>();
+
+/**
+ * Members who have yet to sync with the group
+ * during the sync group phase.
+ */
+private final Set pendingSyncMembers = new HashSet<>();
+
+/**
+ * The list of topics the group members are 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190306648


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingJoinMembers = new HashSet<>();
+
+/**
+ * The number of members waiting to hear back a join response.
+ */
+private int numMembersAwaitingJoinResponse = 0;
+
+/**
+ * Map of protocol names to how many members support them.
+ */
+private final Map supportedProtocols = new HashMap<>();
+
+/**
+ * Members who have yet to sync with the group
+ * during the sync group phase.
+ */
+private final Set pendingSyncMembers = new HashSet<>();
+
+/**
+ * The list of topics the group members are 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190300876


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingJoinMembers = new HashSet<>();
+
+/**
+ * The number of members waiting to hear back a join response.

Review Comment:
   nit: awaiting a join response?* OR waiting for a join response



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190295837


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PREPARING_REBALANCE;
+
+/**
+ *
+ * This class holds metadata for a generic group.

Review Comment:
   would it help to also mention what a generic group is here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


rreddy-22 commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190294950


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+

Review Comment:
   nit: we can remove this extra line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13701: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics …

2023-05-10 Thread via GitHub


vamossagar12 commented on PR #13701:
URL: https://github.com/apache/kafka/pull/13701#issuecomment-1542610652

   Thanks @viktorsomogyi . I am assuming there would be a PR against trunk as 
well(could find only against 3.3, 3.4, 3.5)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13701: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics …

2023-05-10 Thread via GitHub


vamossagar12 commented on PR #13701:
URL: https://github.com/apache/kafka/pull/13701#issuecomment-1542609157

   LGTM, @viktorsomogyi . I see 3 PRs with more or less the same changes. I am 
assuming those would be closed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe opened a new pull request, #13703: MINOR: Standardize controller log4j output for replaying records

2023-05-10 Thread via GitHub


cmccabe opened a new pull request, #13703:
URL: https://github.com/apache/kafka/pull/13703

   Standardize controller log4j output for replaying important records. The log 
message should include word "replayed" to make it clear that this is a record 
replay. Log the replay of records for ACLs, client quotas, and producer IDs, 
which were previously not logged. Also fix a case where we weren't logging 
changes to broker registrations.
   
   AclControlManager, ClientQuotaControlManager, and ProducerIdControlManager 
didn't previously have a log4j logger object, so this PR adds one. It also 
converts them to using Builder objects. This makes junit tests more readable 
because we don't need to specify paramaters where the test can use the default 
(like LogContexts).
   
   Throw an exception in replay if we get another TopicRecord for a topic which 
already exists.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13702: KAFKA-14985: attempt to fix ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() test by bumping episilon to 8 f

2023-05-10 Thread via GitHub


machi1990 commented on PR #13702:
URL: https://github.com/apache/kafka/pull/13702#issuecomment-1542588642

   Thanks @divijvaidya thanks. 
   I'll close this PR in favor of https://github.com/apache/kafka/pull/12045


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 closed pull request #13702: KAFKA-14985: attempt to fix ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() test by bumping episilon to 8 from 7

2023-05-10 Thread via GitHub


machi1990 closed pull request #13702: KAFKA-14985: attempt to fix 
ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() 
test  by bumping episilon to 8 from 7
URL: https://github.com/apache/kafka/pull/13702


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14985) ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() test is flaky

2023-05-10 Thread Manyanda Chitimbo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721490#comment-17721490
 ] 

Manyanda Chitimbo commented on KAFKA-14985:
---

Thanks [~divijvaidya] 

> ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
>  test is flaky
> 
>
> Key: KAFKA-14985
> URL: https://issues.apache.org/jira/browse/KAFKA-14985
> Project: Kafka
>  Issue Type: Test
>Reporter: Manyanda Chitimbo
>Assignee: Manyanda Chitimbo
>Priority: Major
>
> The test sometimes fails with the following error
> {code:java}
> Gradle Test Run :core:test > Gradle Test Executor 14 > ConnectionQuotasTest > 
> testListenerConnectionRateLimitWhenActualRateAboveLimit() FAILED
>     java.util.concurrent.ExecutionException: 
> org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 
> 37.47891810856393 (600 connections / 16.009 sec) ==> expected: <30.0> but 
> was: <37.47891810856393>
>         at 
> java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
>         at 
> kafka.network.ConnectionQuotasTest.$anonfun$testListenerConnectionRateLimitWhenActualRateAboveLimit$3(ConnectionQuotasTest.scala:412)
>         at scala.collection.immutable.List.foreach(List.scala:333)
>         at 
> kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit(ConnectionQuotasTest.scala:412)
>         Caused by:
>         org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 
> 37.47891810856393 (600 connections / 16.009 sec) ==> expected: <30.0> but 
> was: <37.47891810856393>
>             at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>             at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>             at 
> app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
>             at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:86)
>             at 
> app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1021)
>             at 
> app//kafka.network.ConnectionQuotasTest.acceptConnectionsAndVerifyRate(ConnectionQuotasTest.scala:904)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on pull request #13702: KAFKA-14985: attempt to fix ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() test by bumping episilon to 8

2023-05-10 Thread via GitHub


divijvaidya commented on PR #13702:
URL: https://github.com/apache/kafka/pull/13702#issuecomment-1542584930

   @machi1990 I am afraid just changing the bound is not going to help. Please 
see open PR https://github.com/apache/kafka/pull/12045 associated with this fix 
for details and let me know what you think about the root cause fix there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-05-10 Thread via GitHub


kirktrue commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1190226530


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
 }
 }
 
+private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   The code appears to check `inFlightAsyncCommits` only in the case where the 
incoming `offsets` map is empty. Wouldn't we want to always check that?
   
   @philipnee's suggestion makes sense to me:
   
   ```java
   if (offsets.isEmpty() && inflightCommit.get() == 0)
   return true;
   ```
   
   `sendOffsetCommitRequest` already handles the case where the incoming map is 
empty, so perhaps that could be left as is?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14985) ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() test is flaky

2023-05-10 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya resolved KAFKA-14985.
--
Resolution: Duplicate

Resolving as duplicate of existing open JIRA.

> ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
>  test is flaky
> 
>
> Key: KAFKA-14985
> URL: https://issues.apache.org/jira/browse/KAFKA-14985
> Project: Kafka
>  Issue Type: Test
>Reporter: Manyanda Chitimbo
>Assignee: Manyanda Chitimbo
>Priority: Major
>
> The test sometimes fails with the following error
> {code:java}
> Gradle Test Run :core:test > Gradle Test Executor 14 > ConnectionQuotasTest > 
> testListenerConnectionRateLimitWhenActualRateAboveLimit() FAILED
>     java.util.concurrent.ExecutionException: 
> org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 
> 37.47891810856393 (600 connections / 16.009 sec) ==> expected: <30.0> but 
> was: <37.47891810856393>
>         at 
> java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
>         at 
> kafka.network.ConnectionQuotasTest.$anonfun$testListenerConnectionRateLimitWhenActualRateAboveLimit$3(ConnectionQuotasTest.scala:412)
>         at scala.collection.immutable.List.foreach(List.scala:333)
>         at 
> kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit(ConnectionQuotasTest.scala:412)
>         Caused by:
>         org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 
> 37.47891810856393 (600 connections / 16.009 sec) ==> expected: <30.0> but 
> was: <37.47891810856393>
>             at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>             at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>             at 
> app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
>             at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:86)
>             at 
> app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1021)
>             at 
> app//kafka.network.ConnectionQuotasTest.acceptConnectionsAndVerifyRate(ConnectionQuotasTest.scala:904)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14985) ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() test is flaky

2023-05-10 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721486#comment-17721486
 ] 

Divij Vaidya commented on KAFKA-14985:
--

We already have an open Jira (and a pending PR) for this: 
[https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-12319?filter=allopenissues]
 

Resolving this as duplicate. [~manyanda], in future, please search for existing 
Jira before creating new ones. 

> ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
>  test is flaky
> 
>
> Key: KAFKA-14985
> URL: https://issues.apache.org/jira/browse/KAFKA-14985
> Project: Kafka
>  Issue Type: Test
>Reporter: Manyanda Chitimbo
>Assignee: Manyanda Chitimbo
>Priority: Major
>
> The test sometimes fails with the following error
> {code:java}
> Gradle Test Run :core:test > Gradle Test Executor 14 > ConnectionQuotasTest > 
> testListenerConnectionRateLimitWhenActualRateAboveLimit() FAILED
>     java.util.concurrent.ExecutionException: 
> org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 
> 37.47891810856393 (600 connections / 16.009 sec) ==> expected: <30.0> but 
> was: <37.47891810856393>
>         at 
> java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
>         at 
> kafka.network.ConnectionQuotasTest.$anonfun$testListenerConnectionRateLimitWhenActualRateAboveLimit$3(ConnectionQuotasTest.scala:412)
>         at scala.collection.immutable.List.foreach(List.scala:333)
>         at 
> kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit(ConnectionQuotasTest.scala:412)
>         Caused by:
>         org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 
> 37.47891810856393 (600 connections / 16.009 sec) ==> expected: <30.0> but 
> was: <37.47891810856393>
>             at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>             at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>             at 
> app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
>             at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:86)
>             at 
> app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1021)
>             at 
> app//kafka.network.ConnectionQuotasTest.acceptConnectionsAndVerifyRate(ConnectionQuotasTest.scala:904)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kirktrue commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-05-10 Thread via GitHub


kirktrue commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1190210077


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1132,6 +1136,7 @@ public void onSuccess(Void value) {
 if (interceptors != null)
 interceptors.onCommit(offsets);
 completedOffsetCommits.add(new OffsetCommitCompletion(cb, 
offsets, null));
+inFlightAsyncCommits.decrementAndGet();

Review Comment:
   Can we wrap the `decrementAndGet` calls in a `try`/`finally` block in case 
something weird happens in the code that comes before it?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -1145,6 +1150,7 @@ public void onFailure(RuntimeException e) {
 if (commitException instanceof FencedInstanceIdException) {
 asyncCommitFenced.set(true);
 }
+inFlightAsyncCommits.decrementAndGet();

Review Comment:
   I'm not sure if we need to wrap the `decrementAndGet` call in a 
`try`/`finally` block here, too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190204060


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,967 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.CompletingRebalance;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Dead;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Empty;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PreparingRebalance;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.GroupMetadata} that is used
+ * by the new group coordinator (KIP-848). Offset management will be handled
+ * by a different component.
+ *
+ * This class holds group metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The initial group state.
+ */
+private final GenericGroupState initialState;

Review Comment:
   it's only used to initializer state. removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190199871


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java:
##
@@ -0,0 +1,876 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.CompletingRebalance;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Dead;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Empty;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PreparingRebalance;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Stable;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class GenericGroupTest {
+private final String protocolType = "consumer";
+private final String groupInstanceId = "groupInstanceId";
+private final String memberId = "memberId";
+private final String clientId = "clientId";
+private final String clientHost = "clientHost";
+private final int rebalanceTimeoutMs = 6;
+private final int sessionTimeoutMs = 1;
+
+
+private GenericGroup group = null;
+
+@BeforeEach
+public void initialize() {
+group = new GenericGroup(new LogContext(), "groupId", Empty, 
Time.SYSTEM);

Review Comment:
   i'll keep this as is since this variable is used for all tests.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java:
##
@@ -0,0 +1,876 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190181383


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,967 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.CompletingRebalance;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Dead;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Empty;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PreparingRebalance;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.GroupMetadata} that is used
+ * by the new group coordinator (KIP-848). Offset management will be handled
+ * by a different component.
+ *
+ * This class holds group metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The initial group state.
+ */
+private final GenericGroupState initialState;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The lock used to synchronize the group.
+ */
+private final Lock lock = new ReentrantLock();
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingMembers = new HashSet<>();
+
+/**
+   

[GitHub] [kafka] jlprat commented on a diff in pull request #13170: MINOR: Remove unused methods in CoreUtils

2023-05-10 Thread via GitHub


jlprat commented on code in PR #13170:
URL: https://github.com/apache/kafka/pull/13170#discussion_r1190178039


##
core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala:
##
@@ -223,7 +204,7 @@ class CoreUtilsTest extends Logging {
 val map = new ConcurrentHashMap[Int, AtomicInteger]().asScala
 implicit val executionContext = 
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(nThreads))

Review Comment:
   As you are cleaning up some minor problems, feel free to type this `implicit 
val` as this is the recommended way:
   ```suggestion
   implicit val executionContext: ExecutionContext = 
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(nThreads))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190177045


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,967 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.CompletingRebalance;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Dead;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Empty;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PreparingRebalance;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.GroupMetadata} that is used
+ * by the new group coordinator (KIP-848). Offset management will be handled
+ * by a different component.
+ *
+ * This class holds group metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The initial group state.
+ */
+private final GenericGroupState initialState;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The lock used to synchronize the group.
+ */
+private final Lock lock = new ReentrantLock();
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingMembers = new HashSet<>();
+
+/**
+   

[GitHub] [kafka] machi1990 commented on pull request #13702: KAFKA-14985: attempt to fix ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() test by bumping episilon to 8 f

2023-05-10 Thread via GitHub


machi1990 commented on PR #13702:
URL: https://github.com/apache/kafka/pull/13702#issuecomment-1542524201

   @rajinisivaram @apovzner can you've a look once you've some time? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 opened a new pull request, #13702: KAFKA-14985: attempt to fix ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() test by bumping episilon to 8

2023-05-10 Thread via GitHub


machi1990 opened a new pull request, #13702:
URL: https://github.com/apache/kafka/pull/13702

   The test was observed to be failing with
   ```
   ConnectionQuotasTest > 
testListenerConnectionRateLimitWhenActualRateAboveLimit() FAILED
   java.util.concurrent.ExecutionException: 
org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 
37.47891810856393 (600 connections / 16.009 sec) ==> expected: <30.0> but was: 
<37.47891810856393>
   at 
java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
   at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
   at 
kafka.network.ConnectionQuotasTest.$anonfun$testListenerConnectionRateLimitWhenActualRateAboveLimit$3(ConnectionQuotasTest.scala:412)
   at scala.collection.immutable.List.foreach(List.scala:333)
   at 
kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit(ConnectionQuotasTest.scala:412)
Caused by:
   org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but 
got 37.47891810856393 (600 connections / 16.009 sec) ==> expected: <30.0> but 
was: <37.47891810856393>
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   at 
app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
   at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:86)
   at 
app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1021)
   at 
app//kafka.network.ConnectionQuotasTest.acceptConnectionsAndVerifyRate(ConnectionQuotasTest.scala:904)
   ```
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14985) ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() test is flaky

2023-05-10 Thread Manyanda Chitimbo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manyanda Chitimbo reassigned KAFKA-14985:
-

Assignee: Manyanda Chitimbo

> ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
>  test is flaky
> 
>
> Key: KAFKA-14985
> URL: https://issues.apache.org/jira/browse/KAFKA-14985
> Project: Kafka
>  Issue Type: Test
>Reporter: Manyanda Chitimbo
>Assignee: Manyanda Chitimbo
>Priority: Major
>
> The test sometimes fails with the following error
> {code:java}
> Gradle Test Run :core:test > Gradle Test Executor 14 > ConnectionQuotasTest > 
> testListenerConnectionRateLimitWhenActualRateAboveLimit() FAILED
>     java.util.concurrent.ExecutionException: 
> org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 
> 37.47891810856393 (600 connections / 16.009 sec) ==> expected: <30.0> but 
> was: <37.47891810856393>
>         at 
> java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
>         at 
> kafka.network.ConnectionQuotasTest.$anonfun$testListenerConnectionRateLimitWhenActualRateAboveLimit$3(ConnectionQuotasTest.scala:412)
>         at scala.collection.immutable.List.foreach(List.scala:333)
>         at 
> kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit(ConnectionQuotasTest.scala:412)
>         Caused by:
>         org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 
> 37.47891810856393 (600 connections / 16.009 sec) ==> expected: <30.0> but 
> was: <37.47891810856393>
>             at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>             at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>             at 
> app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
>             at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:86)
>             at 
> app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1021)
>             at 
> app//kafka.network.ConnectionQuotasTest.acceptConnectionsAndVerifyRate(ConnectionQuotasTest.scala:904)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190175577


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,967 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.CompletingRebalance;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Dead;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Empty;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PreparingRebalance;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.GroupMetadata} that is used
+ * by the new group coordinator (KIP-848). Offset management will be handled
+ * by a different component.
+ *
+ * This class holds group metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The initial group state.
+ */
+private final GenericGroupState initialState;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The lock used to synchronize the group.
+ */
+private final Lock lock = new ReentrantLock();
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingMembers = new HashSet<>();
+
+/**
+   

[GitHub] [kafka] machi1990 commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

2023-05-10 Thread via GitHub


machi1990 commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1542514663

   @ableegoldman @showuon can you've a look at this draft PR once you've 
sometime? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14985) ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() test is flaky

2023-05-10 Thread Manyanda Chitimbo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721469#comment-17721469
 ] 

Manyanda Chitimbo commented on KAFKA-14985:
---

A quick fix will be to bump the epsilon to a bigger value e.g 8 on 
[https://github.com/apovzner/kafka/blob/508a754f397b5a1939c44dfcba72ba996bc912c5/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala#L400]
 but I am not sure if that's enough to make the test  resilient , what do you 
think [~apovzner] ?

> ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
>  test is flaky
> 
>
> Key: KAFKA-14985
> URL: https://issues.apache.org/jira/browse/KAFKA-14985
> Project: Kafka
>  Issue Type: Test
>Reporter: Manyanda Chitimbo
>Priority: Major
>
> The test sometimes fails with the following error
> {code:java}
> Gradle Test Run :core:test > Gradle Test Executor 14 > ConnectionQuotasTest > 
> testListenerConnectionRateLimitWhenActualRateAboveLimit() FAILED
>     java.util.concurrent.ExecutionException: 
> org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 
> 37.47891810856393 (600 connections / 16.009 sec) ==> expected: <30.0> but 
> was: <37.47891810856393>
>         at 
> java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
>         at 
> kafka.network.ConnectionQuotasTest.$anonfun$testListenerConnectionRateLimitWhenActualRateAboveLimit$3(ConnectionQuotasTest.scala:412)
>         at scala.collection.immutable.List.foreach(List.scala:333)
>         at 
> kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit(ConnectionQuotasTest.scala:412)
>         Caused by:
>         org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 
> 37.47891810856393 (600 connections / 16.009 sec) ==> expected: <30.0> but 
> was: <37.47891810856393>
>             at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>             at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>             at 
> app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
>             at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:86)
>             at 
> app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1021)
>             at 
> app//kafka.network.ConnectionQuotasTest.acceptConnectionsAndVerifyRate(ConnectionQuotasTest.scala:904)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13663: KAFKA-14500; [2/N] Rewrite GroupMetadata in Java

2023-05-10 Thread via GitHub


jeffkbkim commented on code in PR #13663:
URL: https://github.com/apache/kafka/pull/13663#discussion_r1190154104


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -0,0 +1,967 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.generic;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.SyncGroupResponseData;
+import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.CompletingRebalance;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Dead;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.Empty;
+import static 
org.apache.kafka.coordinator.group.generic.GenericGroupState.PreparingRebalance;
+
+/**
+ * Java rewrite of {@link kafka.coordinator.group.GroupMetadata} that is used
+ * by the new group coordinator (KIP-848). Offset management will be handled
+ * by a different component.
+ *
+ * This class holds group metadata for a generic group.
+ */
+public class GenericGroup {
+
+/**
+ * Empty generation.
+ */
+public static final int NO_GENERATION = -1;
+
+/**
+ * Protocol with empty name.
+ */
+public static final String NO_PROTOCOL_NAME = "";
+
+/**
+ * No leader.
+ */
+public static final String NO_LEADER = "";
+
+/**
+ * Delimiter used to join a randomly generated UUID
+ * with client id or group instance id.
+ */
+private static final String MEMBER_ID_DELIMITER = "-";
+
+/**
+ * The slf4j log context, used to create new loggers.
+ */
+private final LogContext logContext;
+
+/**
+ * The slf4j logger.
+ */
+private final Logger log;
+
+/**
+ * The group id.
+ */
+private final String groupId;
+
+/**
+ * The initial group state.
+ */
+private final GenericGroupState initialState;
+
+/**
+ * The time.
+ */
+private final Time time;
+
+/**
+ * The lock used to synchronize the group.
+ */
+private final Lock lock = new ReentrantLock();
+
+/**
+ * The current group state.
+ */
+private GenericGroupState state;
+
+/**
+ * The timestamp of when the group transitioned
+ * to its current state.
+ */
+private Optional currentStateTimestamp;
+
+/**
+ * The protocol type used for rebalance.
+ */
+private Optional protocolType = Optional.empty();
+
+/**
+ * The protocol name used for rebalance.
+ */
+private Optional protocolName = Optional.empty();
+
+/**
+ * The generation id.
+ */
+private int generationId = 0;
+
+/**
+ * The id of the group's leader.
+ */
+private Optional leaderId = Optional.empty();
+
+/**
+ * The members of the group.
+ */
+private final Map members = new HashMap<>();
+
+/**
+ * The static members of the group.
+ */
+private final Map staticMembers = new HashMap<>();
+
+/**
+ * Members who have yet to (re)join the group
+ * during the join group phase.
+ */
+private final Set pendingMembers = new HashSet<>();
+
+/**
+   

[GitHub] [kafka] dajac commented on pull request #13694: MINOR: add test tag for testDeadToDeadIllegalTransition

2023-05-10 Thread via GitHub


dajac commented on PR #13694:
URL: https://github.com/apache/kafka/pull/13694#issuecomment-1542496068

   Let me re-trigger the tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on pull request #13694: MINOR: add test tag for testDeadToDeadIllegalTransition

2023-05-10 Thread via GitHub


jeffkbkim commented on PR #13694:
URL: https://github.com/apache/kafka/pull/13694#issuecomment-1542491308

   @dajac the test failures look unrelated but i've noticed that sometimes the 
same tests fail altogether


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

2023-05-10 Thread via GitHub


erikvanoosten commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1542480034

   > Hey @erikvanoosten, Thanks for the PR! Could you add checks for 
inflightCommits count gets set to 0 in a few of the callback testing function 
like `testAsyncCommitCallbacksInvokedPriorToSyncCommitCompletion` ?
   
   Sure, I'll try. I am new in this code base and these changes are not from 
me, but I'll try nonetheless.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13664: KAFKA-14961: harden DefaultBackgroundThreadTest.testStartupAndTearDown test

2023-05-10 Thread via GitHub


machi1990 commented on PR #13664:
URL: https://github.com/apache/kafka/pull/13664#issuecomment-1542460521

   Thanks @philipnee this should be good to be merged @vvcephei thanks!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi opened a new pull request, #13701: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics …

2023-05-10 Thread via GitHub


viktorsomogyi opened a new pull request, #13701:
URL: https://github.com/apache/kafka/pull/13701

   …(#13690)
   
   Reviewers: Chris Egerton , Viktor Somogyi-Vass 

   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14985) ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() test is flaky

2023-05-10 Thread Manyanda Chitimbo (Jira)
Manyanda Chitimbo created KAFKA-14985:
-

 Summary: 
ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() 
test is flaky
 Key: KAFKA-14985
 URL: https://issues.apache.org/jira/browse/KAFKA-14985
 Project: Kafka
  Issue Type: Test
Reporter: Manyanda Chitimbo


The test sometimes fails with the following error
{code:java}
Gradle Test Run :core:test > Gradle Test Executor 14 > ConnectionQuotasTest > 
testListenerConnectionRateLimitWhenActualRateAboveLimit() FAILED
    java.util.concurrent.ExecutionException: 
org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 
37.47891810856393 (600 connections / 16.009 sec) ==> expected: <30.0> but was: 
<37.47891810856393>
        at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
        at 
kafka.network.ConnectionQuotasTest.$anonfun$testListenerConnectionRateLimitWhenActualRateAboveLimit$3(ConnectionQuotasTest.scala:412)
        at scala.collection.immutable.List.foreach(List.scala:333)
        at 
kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit(ConnectionQuotasTest.scala:412)
        Caused by:
        org.opentest4j.AssertionFailedError: Expected rate (30 +- 7), but got 
37.47891810856393 (600 connections / 16.009 sec) ==> expected: <30.0> but was: 
<37.47891810856393>
            at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
            at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
            at 
app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
            at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:86)
            at 
app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1021)
            at 
app//kafka.network.ConnectionQuotasTest.acceptConnectionsAndVerifyRate(ConnectionQuotasTest.scala:904)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] machi1990 commented on pull request #13700: KAFKA-14959: remove delayed queue and exempt sensors during ClientQuota and ClientRequestQuota managers shutdown

2023-05-10 Thread via GitHub


machi1990 commented on PR #13700:
URL: https://github.com/apache/kafka/pull/13700#issuecomment-1542448724

   I am wondering if these sensors that are dynamically created should be 
removed as well: 
https://github.com/apache/kafka/blob/ca11a87e86e8e3c65043f747f35cae770b1efb7c/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L393-L402
 looking for suggestion here


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 commented on pull request #13700: KAFKA-14959: remove delayed queue and exempt sensors during ClientQuota and ClientRequestQuota managers shutdown

2023-05-10 Thread via GitHub


machi1990 commented on PR #13700:
URL: https://github.com/apache/kafka/pull/13700#issuecomment-1542446635

   @rajinisivaram @dajac please review when you've the chance


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] machi1990 opened a new pull request, #13700: KAFKA-14959: remove delayed queue and exempt sensors during ClientQuota and ClientRequestQuota managers shutdown

2023-05-10 Thread via GitHub


machi1990 opened a new pull request, #13700:
URL: https://github.com/apache/kafka/pull/13700

   Follows up on 
https://github.com/apache/kafka/pull/13623#discussion_r1182592921
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14984) DynamicBrokerReconfigurationTest.testThreadPoolResize() test is flaky

2023-05-10 Thread Manyanda Chitimbo (Jira)
Manyanda Chitimbo created KAFKA-14984:
-

 Summary: DynamicBrokerReconfigurationTest.testThreadPoolResize() 
test is flaky 
 Key: KAFKA-14984
 URL: https://issues.apache.org/jira/browse/KAFKA-14984
 Project: Kafka
  Issue Type: Test
Reporter: Manyanda Chitimbo


The test sometimes fails with the below log 
{code:java}
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() failed, 
log available in 
.../core/build/reports/testOutput/kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize().test.stdoutGradle
 Test Run :core:test > Gradle Test Executor 6 > 
DynamicBrokerReconfigurationTest > testThreadPoolResize() FAILED
    org.opentest4j.AssertionFailedError: Invalid threads: expected 6, got 8: 
List(data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-0, 
data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-0, 
data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, 
data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0, 
data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, 
data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, 
data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0, 
data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0) ==> 
expected:  but was: 
        at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
        at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
        at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
        at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
        at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
        at 
app//kafka.server.DynamicBrokerReconfigurationTest.verifyThreads(DynamicBrokerReconfigurationTest.scala:1634)
        at 
app//kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:872)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #13690: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics

2023-05-10 Thread via GitHub


C0urante commented on PR #13690:
URL: https://github.com/apache/kafka/pull/13690#issuecomment-1542437079

   We don't have this automated (to my knowledge). For backports, we can 
cherry-pick and push directly to the branch. PRs aren't necessary but if you'd 
like a second pair of eyes to, e.g., double-check merge conflict resolution, 
you can file one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on pull request #13690: KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics

2023-05-10 Thread via GitHub


viktorsomogyi commented on PR #13690:
URL: https://github.com/apache/kafka/pull/13690#issuecomment-1542434744

   @C0urante thanks. I'll cherry-pick it to 3.3 soon and open a PR. Btw does 
Kafka have an automated way or do you usually do it manually by git cherry-pick 
and creating PR either via git or gh?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >