Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


AndrewJSchofield commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1465945556


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,

Review Comment:
   Yes. I anticipate there will be a KIP correction needed here. Once we're 
happy with the PR, I'll pull together a correction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move PasswordEncoder to server-common [kafka]

2024-01-24 Thread via GitHub


showuon commented on code in PR #15246:
URL: https://github.com/apache/kafka/pull/15246#discussion_r1465931069


##
server-common/src/test/java/org/apache/kafka/security/PasswordEncoderTest.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security;
+
+import javax.crypto.SecretKeyFactory;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.server.util.Csv;
+import org.junit.jupiter.api.Test;
+
+import java.security.GeneralSecurityException;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.fail;
+
+class PasswordEncoderTest {
+
+@Test
+public void testEncodeDecode() throws GeneralSecurityException {
+PasswordEncoder encoder = PasswordEncoder.encrypting(new 
Password("password-encoder-secret"),
+Optional.empty(),
+PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM,
+PasswordEncoderConfigs.DEFAULT_KEY_LENGTH,
+PasswordEncoderConfigs.DEFAULT_ITERATIONS);
+String password = "test-password";
+String encoded = encoder.encode(new Password(password));
+Map encodedMap = Csv.parseCsvMap(encoded);
+assertEquals("4096", encodedMap.get(PasswordEncoder.ITERATIONS));
+assertEquals("128", encodedMap.get(PasswordEncoder.KEY_LENGTH));
+String defaultKeyFactoryAlgorithm;
+try {
+SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512");
+defaultKeyFactoryAlgorithm = "PBKDF2WithHmacSHA512";
+
+} catch (Exception e) {
+defaultKeyFactoryAlgorithm = "PBKDF2WithHmacSHA1";
+}
+assertEquals(defaultKeyFactoryAlgorithm, 
encodedMap.get(PasswordEncoder.KEY_FACTORY_ALGORITHM));
+assertEquals("AES/CBC/PKCS5Padding", 
encodedMap.get(PasswordEncoder.CIPHER_ALGORITHM));
+verifyEncodedPassword(encoder, password, encoded);
+}
+
+@Test
+public void testEncoderConfigChange() throws GeneralSecurityException {
+PasswordEncoder encoder = PasswordEncoder.encrypting(new 
Password("password-encoder-secret"),
+Optional.of("PBKDF2WithHmacSHA1"),
+"DES/CBC/PKCS5Padding",
+64,
+1024);
+String password = "test-password";
+String encoded = encoder.encode(new Password(password));
+Map encodedMap = Csv.parseCsvMap(encoded);
+assertEquals("1024", encodedMap.get(PasswordEncoder.ITERATIONS));
+assertEquals("64", encodedMap.get(PasswordEncoder.KEY_LENGTH));
+assertEquals("PBKDF2WithHmacSHA1", 
encodedMap.get(PasswordEncoder.KEY_FACTORY_ALGORITHM));
+assertEquals("DES/CBC/PKCS5Padding", 
encodedMap.get(PasswordEncoder.CIPHER_ALGORITHM));
+
+// Test that decoding works even if PasswordEncoder algorithm, 
iterations etc. are altered
+PasswordEncoder decoder = PasswordEncoder.encrypting(new 
Password("password-encoder-secret"),
+Optional.of("PBKDF2WithHmacSHA1"),
+"AES/CBC/PKCS5Padding",
+128,
+2048);
+assertEquals(password, decoder.decode(encoded).value());
+
+// Test that decoding fails if secret is altered
+PasswordEncoder decoder2 = PasswordEncoder.encrypting(new 
Password("secret-2"),
+Optional.of("PBKDF2WithHmacSHA1"),
+"AES/CBC/PKCS5Padding",
+128,
+1024);
+try {
+decoder2.decode(encoded);
+fail("Expected ConfigException to be thrown");
+} catch (ConfigException expected) {
+}

Review Comment:
   Nit: Can be replaced with `assertThrows(ConfigException.class, () -> ...)`



##
core/src/main/scala/kafka/utils/PasswordEncoder.scala:
##
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for 

[PR] Add groupType enum class [kafka]

2024-01-24 Thread via GitHub


rreddy-22 opened a new pull request, #15259:
URL: https://github.com/apache/kafka/pull/15259

   Add the groupType Enum class that can be used by the ConsumerGroupCommand


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16193) Storing mm2 topics on source side

2024-01-24 Thread Mikhail Filatov (Jira)
Mikhail Filatov created KAFKA-16193:
---

 Summary: Storing mm2 topics on source side
 Key: KAFKA-16193
 URL: https://issues.apache.org/jira/browse/KAFKA-16193
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Mikhail Filatov


We have 2 Kafka clusters and one of them is external. We use Mirror Maker to 
replicate one topic from our Kafka to external one but Mirror Maker also 
creates its service topic on target side (mm2-configs.source.internal, 
mm2-offsets.source.internal, mm2-status.source.internal topics).

We would like to keep such topics on our source side, but as I understand it is 
not configurable and topics created only on target side - 
[https://github.com/apache/kafka/blob/3.5/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L182]

If we can manage workerConfig and specify the side where such topics to keep, 
it could solve the problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16003: Always create the /config/topics ZNode even for topics w… [kafka]

2024-01-24 Thread via GitHub


showuon commented on code in PR #15022:
URL: https://github.com/apache/kafka/pull/15022#discussion_r1465893905


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -757,16 +759,64 @@ class ZkMigrationIntegrationTest {
 }
   }
 
-  def verifyTopicPartitionMetadata(topicName: String, partitions: 
Seq[TopicPartition], zkClient: KafkaZkClient): Unit = {
+  def createTopic(topicName: String, numPartitions: Int, replicationFactor: 
Short, configs: util.Map[String, String], admin: Admin): Unit = {
+val newTopic = new NewTopic(topicName, numPartitions, 
replicationFactor).configs(configs)
+val createTopicResult = 
admin.createTopics(util.Collections.singletonList(newTopic))
+createTopicResult.all.get(60, TimeUnit.SECONDS)
+
+TestUtils.waitUntilTrue(() => {
+  admin.listTopics.names.get.contains(topicName)
+}, s"Unable to find topic $topicName")
+  }
+
+  def verifyTopic(topicName: String, numPartitions: Int, replicationFactor: 
Short, configs: util.Map[String, String], admin: Admin, zkClient: 
KafkaZkClient): Unit = {
+// Verify the changes are in ZK
+verifyZKTopicPartitionMetadata(topicName, numPartitions, 
replicationFactor, zkClient)
+verifyZKTopicConfigs(topicName, configs, zkClient)
+// Verify the changes are in KRaft
+verifyKRaftTopicPartitionMetadata(topicName, numPartitions, 
replicationFactor, admin)
+verifyKRaftTopicConfigs(topicName, configs, admin)
+  }

Review Comment:
   Nice verification for ZK and KRaft.



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -649,29 +650,30 @@ class ZkMigrationIntegrationTest {
 3)
 
   // Alter the metadata
-  log.info("Create new topic with AdminClient")
   admin = zkCluster.createAdminClient()
-  val newTopics = new util.ArrayList[NewTopic]()
-  newTopics.add(new NewTopic(topicName, 2, 3.toShort))
-  val createTopicResult = admin.createTopics(newTopics)
-  createTopicResult.all().get(60, TimeUnit.SECONDS)
+  log.info(s"Create new topic $topic1 with AdminClient with some configs")
+  val topicConfigs = util.Collections.singletonMap("cleanup.policy", 
"compact")
+  createTopic(topic1, 2, 3.toShort, topicConfigs, admin)
+  verifyTopic(topic1, 2, 3.toShort, topicConfigs, admin, zkClient)

Review Comment:
   nit: We can create 2 variables for the 2, and 3. Ex: 
`originalPartitionCount`, `replicationFactor`



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -610,8 +611,8 @@ class ZkMigrationIntegrationTest {
 new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
   ))
   def testNewAndChangedTopicsInDualWrite(zkCluster: ClusterInstance): Unit = {
-// Create a topic in ZK mode
-val topicName = "test"
+val topic1 = "test1"
+val topic2 = "test2"
 var admin = zkCluster.createAdminClient()

Review Comment:
   I can see there are some adminClient is not closed in this test suite. It 
might be good if we can close them in this PR or in a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-24 Thread via GitHub


rreddy-22 commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1465873433


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -452,21 +453,39 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 /**
  * Get the Group List.
  *
- * @param statesFilter The states of the groups we want to list.
- * If empty all groups are returned with their state.
- * @param committedOffset A specified committed offset corresponding to 
this shard
+ * @param statesFilter  The states of the groups we want to list.
+ *  If empty, all groups are returned with their 
state.
+ * @param typesFilter   The types of the groups we want to list.
+ *  If empty, all groups are returned with their 
type.
+ * @param committedOffset   A specified committed offset corresponding to 
this shard.
  *
  * @return A list containing the ListGroupsResponseData.ListedGroup
  */
+public List listGroups(
+Set statesFilter,
+Set typesFilter,
+long committedOffset
+) {
+// Convert typesFilter to lowercase to make the filter 
case-insensitive.
+Set lowerCaseTypesFilter = typesFilter.stream()
+.map(String::toLowerCase)
+.collect(Collectors.toCollection(HashSet::new));
+
+Predicate combinedFilter = group -> {
+boolean stateCheck = statesFilter.isEmpty() || 
statesFilter.contains(group.stateAsString(committedOffset));
+boolean typeCheck = lowerCaseTypesFilter.isEmpty() ||
+
lowerCaseTypesFilter.contains(group.type().toString().toLowerCase());

Review Comment:
   Discussed offline. We realized we were talking about two different GroupType 
files. Made the changes to the server side GroupType and added string to enum 
conversion and comparison



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15608: Assign lastet leader eopch and offset checkpoint to future log when replacing current log [kafka]

2024-01-24 Thread via GitHub


github-actions[bot] commented on PR #14553:
URL: https://github.com/apache/kafka/pull/14553#issuecomment-1909287506

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14412: Transactional StateStores [kafka]

2024-01-24 Thread via GitHub


ableegoldman commented on PR #15137:
URL: https://github.com/apache/kafka/pull/15137#issuecomment-1909050890

   Hey @nicktelford , I'll try to take a look at this here and there and I 
can't speak for the Confluent folks, but I have to say it's pretty intimidating 
to see a PR with 3000 LOC and over 100 files changed...especially one that 
touches on some complicated stuff that's fundamental to Kafka Streams. 
   
   I know you've already tried to break this work up into multiple PRs but for 
your own sake as much as ours, can you take another look at whether you can 
peel off any more into separate PRs? I'm just worried that it's going to be 
really hard for this to make progress as such a large PR, and the longer it's 
stalled the harder it'll be to keep it updated.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16192) Introduce usage of flexible records to coordinators

2024-01-24 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-16192:
---
Description: 
[KIP-915|https://cwiki.apache.org/confluence/display/KAFKA/KIP-915%3A+Txn+and+Group+Coordinator+Downgrade+Foundation]
 introduced flexible versions to the records used for the group and transaction 
coordinators.
However, the KIP did not update the record version used.

For 
[KIP-890|https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense]
 we intend to use flexible fields in the transaction state records. This 
requires a safe way to upgrade from non-flexible version records to flexible 
version records.

This change is implemented via MV.

  was:
[KIP-915| 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-915%3A+Txn+and+Group+Coordinator+Downgrade+Foundation]
 introduced flexible versions to the records used for the group and transaction 
coordinators.
However, the KIP did not update the record version used.

For 
[KIP-890|https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense]
 we intend to use flexible fields in the transaction state records. This 
requires a safe way to upgrade from non-flexible version records to flexible 
version records.

Typically this is done as a message format bump. There may be an option to make 
this change using MV since if the readers of the records are internal and not 
external consumers.


> Introduce usage of flexible records to coordinators
> ---
>
> Key: KAFKA-16192
> URL: https://issues.apache.org/jira/browse/KAFKA-16192
> Project: Kafka
>  Issue Type: Task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> [KIP-915|https://cwiki.apache.org/confluence/display/KAFKA/KIP-915%3A+Txn+and+Group+Coordinator+Downgrade+Foundation]
>  introduced flexible versions to the records used for the group and 
> transaction coordinators.
> However, the KIP did not update the record version used.
> For 
> [KIP-890|https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense]
>  we intend to use flexible fields in the transaction state records. This 
> requires a safe way to upgrade from non-flexible version records to flexible 
> version records.
> This change is implemented via MV.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15307: update/note deprecated configs [kafka]

2024-01-24 Thread via GitHub


ableegoldman commented on code in PR #14360:
URL: https://github.com/apache/kafka/pull/14360#discussion_r1465626991


##
docs/streams/developer-guide/config-streams.html:
##
@@ -1010,6 +1016,18 @@ topology.optimization
+  windowed.inner.class.serde
+  
+
+  
+Serde for the inner class of a windowed record. Must implement the 
org.apache.kafka.common.serialization.Serde interface.
+  
+  
+Note that setting this config in KafkaStreams application would 
result in an error as it is meant to be used only from Plain consumer client.

Review Comment:
   Bear with me here because it's kind of a long story in full and I only half 
remember it...and also this statement is only half true. A more accurate 
statement would be something like this:
   
   "Note that this config is only used by plain consumer/producer clients that 
set a windowed de/serializer type via configs. For Kafka Streams applications 
that deal with windowed types you must pass in the inner serde type when you 
instantiate the windowed serde object for your topology"
   
   In other words, there's no concept of a "default windowed serde" type in 
Streams because you always have to pass in the actual Serde object, and need to 
pass in the window size and inner serde type at that time. The same can also be 
done for plain producer/consumer clients but someone pointed out that the 
console clients have to be configured via properties, which means they can only 
invoke the default constructor based on the class name passed in for the 
key/value de/serializer configs. Which meant we needed to add the window size 
and inner serde type as configs that could be used to configure a windowed 
Serde that was instantiated via default constructor
   
   I guess we should update the docs string in StreamsConfig as well. I assume 
this error slipped in because of the window.size config, which _is_ only needed 
for the consumer client (since the producer client throws away the window size 
when encoding a windowed type). But both producer and consumer need to know the 
inner serde type



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16157) Topic recreation with offline disk doesn't update leadership/shrink ISR correctly

2024-01-24 Thread Gaurav Narula (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17810583#comment-17810583
 ] 

Gaurav Narula commented on KAFKA-16157:
---

Here are some notes from my troubleshooting:

When {{d1}} fails in {{broker-1}}, the controller updates the 
{{BrokerRegistration}} such that it only contains one log directory {{d2}}. On 
handling topic recreation, the controller makes use of an optimisation to 
assign the replica to the only existing log directory {{d2}} 
[[0]|https://github.com/apache/kafka/blob/f1924353126fdf6aad2ba1f8d0c22dade59360b1/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java#L721]

In {{broker-1}}, {{ReplicaManager::getOrCreatePartition}} is invoked when the 
topic is recreated. We branch into the case handling 
{{HostedPartition.Offline}} 
[[1]|https://github.com/apache/kafka/blob/f1924353126fdf6aad2ba1f8d0c22dade59360b1/core/src/main/scala/kafka/server/ReplicaManager.scala#L2774].
 Since {{ReplicaManager::allPartitions}} is keyed by {{TopicPartition}}, it 
doesn't track the {{TopicId}} which is in the offline log directory. We 
therefore return {{None}} and not handle the topic delta correctly. First part 
of the fix would therefore be to modify {{HostedPartition.Offline}} and track 
the topic id with it.

Next part is to handle the log creation correctly. The broker eventually 
invokes {{Partition::createLogIfNotExists}} 
[[2]|https://github.com/apache/kafka/blob/f1924353126fdf6aad2ba1f8d0c22dade59360b1/core/src/main/scala/kafka/cluster/Partition.scala#L877],
 with {{isNew = directoryId == DirectoryId.UNASSIGNED}}. Recall from the 
optimisation in the controller, the directory id will *not* be {{UNASSIGNED}}, 
but point to the UUID for {{d2}} and therefore, {{isNew = false}}. Eventually, 
{{LogManager::getOrCreateLog}} fails when {{isNew = false && 
offlineLogDirs.nonEmpty}} 
[[3]|https://github.com/apache/kafka/blob/f1924353126fdf6aad2ba1f8d0c22dade59360b1/core/src/main/scala/kafka/log/LogManager.scala#L1009].
 Second part of the the fix is therefore to update 
{{Partition::createLogInAssignedDirectoryId}} to invoke 
{{Partition::createLogIfNotExists}} correctly.

CC: [~omnia_h_ibrahim] [~soarez] [~cmccabe] [~pprovenzano]

> Topic recreation with offline disk doesn't update leadership/shrink ISR 
> correctly
> -
>
> Key: KAFKA-16157
> URL: https://issues.apache.org/jira/browse/KAFKA-16157
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod, kraft
>Affects Versions: 3.7.0
>Reporter: Gaurav Narula
>Priority: Blocker
> Fix For: 3.7.0
>
> Attachments: broker.log, broker.log.1, broker.log.10, broker.log.2, 
> broker.log.3, broker.log.4, broker.log.5, broker.log.6, broker.log.7, 
> broker.log.8, broker.log.9
>
>
> In a cluster with 4 brokers, `broker-1..broker-4` with 2 disks `d1` and `d2` 
> in each broker, we perform the following operations:
>  
>  # Create a topic `foo.test` with 10 partitions and RF 4. Let's assume the 
> topic was created with id `rAujIqcjRbu_-E4UxgQT8Q`.
>  # Start a producer in the background to produce to `foo.test`.
>  # Break disk `d1` in `broker-1`. We simulate this by marking the log dir 
> read-only.
>  # Delete topic `foo.test`
>  # Recreate topic `foo.test`. Let's assume the topic was created with id 
> `bgdrsv-1QjCLFEqLOzVCHg`.
>  # Wait for 5 minutes
>  # Describe the recreated topic `foo.test`.
>  
> We observe that `broker-1` is the leader and in-sync for few partitions
>  
>  
> {code:java}
>  
> Topic: foo.test TopicId: bgdrsv-1QjCLFEqLOzVCHg PartitionCount: 10      
> ReplicationFactor: 4    Configs: 
> min.insync.replicas=1,unclean.leader.election.enable=false
>         Topic: foo.test Partition: 0    Leader: 101     Replicas: 
> 101,102,103,104       Isr: 101,102,103,104
>         Topic: foo.test Partition: 1    Leader: 102     Replicas: 
> 102,103,104,101       Isr: 102,103,104
>         Topic: foo.test Partition: 2    Leader: 103     Replicas: 
> 103,104,101,102       Isr: 103,104,102
>         Topic: foo.test Partition: 3    Leader: 104     Replicas: 
> 104,101,102,103       Isr: 104,102,103
>         Topic: foo.test Partition: 4    Leader: 104     Replicas: 
> 104,102,101,103       Isr: 104,102,103
>         Topic: foo.test Partition: 5    Leader: 102     Replicas: 
> 102,101,103,104       Isr: 102,103,104
>         Topic: foo.test Partition: 6    Leader: 101     Replicas: 
> 101,103,104,102       Isr: 101,103,104,102
>         Topic: foo.test Partition: 7    Leader: 103     Replicas: 
> 103,104,102,101       Isr: 103,104,102
>         Topic: foo.test Partition: 8    Leader: 101     Replicas: 
> 101,102,104,103       Isr: 101,102,104,103
>         Topic: foo.test Partition: 9    Leader: 102     Replicas: 
> 102,104

Re: [PR] KAFKA-14404: fix overlap of streams-config sections & describe additional parameters [kafka]

2024-01-24 Thread via GitHub


ableegoldman commented on PR #15162:
URL: https://github.com/apache/kafka/pull/15162#issuecomment-1908994370

   Merged to trunk. Thanks for the PR @AyoubOm -- one quick tip, make sure to 
format the PR title prefix as "KAFKA-14404" rather than [Kafka-14404]. Just 
good to know if you ever submit another PR (and we hope you will!)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14404) Fix & update docs on client configs controlled by Streams

2024-01-24 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-14404:
---
Fix Version/s: 3.8.0

> Fix & update docs on client configs controlled by Streams
> -
>
> Key: KAFKA-14404
> URL: https://issues.apache.org/jira/browse/KAFKA-14404
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ayoub Omari
>Priority: Major
>  Labels: docs, newbie
> Fix For: 3.8.0
>
>
> There are a handful of client configs that can't be set by Streams users for 
> various reasons, such as the group id, but we seem to have missed a few of 
> them in the documentation 
> [here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]:
>  the partitioner assignor (Consumer) and partitioner (Producer).
> This section of the docs also just needs to be cleaned up in general as there 
> is overlap between the [Default 
> Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values]
>  and [Parameters controlled by Kafka 
> Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]
>  sections, and the table of contents is messed up presumably due to an issue 
> with the section headers.
> We should separate these with one section covering (only) configs where 
> Streams sets a different default but this can still be overridden by the 
> user, and the other section covering the configs that Streams hardcodes and 
> users can never override.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14404: fix overlap of streams-config sections & describe additional parameters [kafka]

2024-01-24 Thread via GitHub


ableegoldman merged PR #15162:
URL: https://github.com/apache/kafka/pull/15162


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]

2024-01-24 Thread via GitHub


jolshan commented on code in PR #15248:
URL: https://github.com/apache/kafka/pull/15248#discussion_r1465593791


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java:
##
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.MemberAssignment;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.ArgumentMatchers;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.immutable.Map$;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ConsumerGroupServiceTest {
+public static final String GROUP = "testGroup";
+
+public static final int NUM_PARTITIONS = 10;
+
+private static final List TOPICS = IntStream.range(0, 
5).mapToObj(i -> "testTopic" + i).collect(Collectors.toList());
+
+private static final List TOPIC_PARTITIONS = 
TOPICS.stream()
+.flatMap(topic -> IntStream.range(0, NUM_PARTITIONS).mapToObj(i -> 
new TopicPartition(topic, i)))
+.collect(Collectors.toList());
+
+private final Admin admin = mock(Admin.class);
+
+@Test
+public void testAdminRequestsForDescribeOffsets() {
+String[] args = new String[]{"--bootstrap-server", "localhost:9092", 
"--group", GROUP, "--describe", "--offsets"};
+ConsumerGroupCommand.ConsumerGroupService groupService = 
consumerGroupService(args);
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)),
 any()))
+.thenReturn(describeGroupsResult(ConsumerGroupState.STABLE));
+
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec()),
 any()))
+.thenReturn(listGroupOffsetsResult(GROUP));
+when(admin.listOffsets(offsetsArgMatcher(), any()))
+.thenReturn(listOffsetsResult());
+
+Tuple2, 
Option>> res = 
groupService.collectGroupOffsets(GROUP);
+assertEquals(Some.apply("Stable"), res._1);
+assertTrue(res._2.isDefined());
+assertEquals(TOPIC_PARTITIONS.size(), res._2.get().size());
+

Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]

2024-01-24 Thread via GitHub


jolshan commented on code in PR #15248:
URL: https://github.com/apache/kafka/pull/15248#discussion_r1465593791


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java:
##
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.MemberAssignment;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.ArgumentMatchers;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.immutable.Map$;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ConsumerGroupServiceTest {
+public static final String GROUP = "testGroup";
+
+public static final int NUM_PARTITIONS = 10;
+
+private static final List TOPICS = IntStream.range(0, 
5).mapToObj(i -> "testTopic" + i).collect(Collectors.toList());
+
+private static final List TOPIC_PARTITIONS = 
TOPICS.stream()
+.flatMap(topic -> IntStream.range(0, NUM_PARTITIONS).mapToObj(i -> 
new TopicPartition(topic, i)))
+.collect(Collectors.toList());
+
+private final Admin admin = mock(Admin.class);
+
+@Test
+public void testAdminRequestsForDescribeOffsets() {
+String[] args = new String[]{"--bootstrap-server", "localhost:9092", 
"--group", GROUP, "--describe", "--offsets"};
+ConsumerGroupCommand.ConsumerGroupService groupService = 
consumerGroupService(args);
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)),
 any()))
+.thenReturn(describeGroupsResult(ConsumerGroupState.STABLE));
+
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec()),
 any()))
+.thenReturn(listGroupOffsetsResult(GROUP));
+when(admin.listOffsets(offsetsArgMatcher(), any()))
+.thenReturn(listOffsetsResult());
+
+Tuple2, 
Option>> res = 
groupService.collectGroupOffsets(GROUP);
+assertEquals(Some.apply("Stable"), res._1);
+assertTrue(res._2.isDefined());
+assertEquals(TOPIC_PARTITIONS.size(), res._2.get().size());
+

Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]

2024-01-24 Thread via GitHub


nizhikov commented on code in PR #15248:
URL: https://github.com/apache/kafka/pull/15248#discussion_r1465587429


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java:
##
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.MemberAssignment;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.ArgumentMatchers;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.immutable.Map$;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ConsumerGroupServiceTest {
+public static final String GROUP = "testGroup";
+
+public static final int NUM_PARTITIONS = 10;
+
+private static final List TOPICS = IntStream.range(0, 
5).mapToObj(i -> "testTopic" + i).collect(Collectors.toList());
+
+private static final List TOPIC_PARTITIONS = 
TOPICS.stream()
+.flatMap(topic -> IntStream.range(0, NUM_PARTITIONS).mapToObj(i -> 
new TopicPartition(topic, i)))
+.collect(Collectors.toList());
+
+private final Admin admin = mock(Admin.class);
+
+@Test
+public void testAdminRequestsForDescribeOffsets() {
+String[] args = new String[]{"--bootstrap-server", "localhost:9092", 
"--group", GROUP, "--describe", "--offsets"};
+ConsumerGroupCommand.ConsumerGroupService groupService = 
consumerGroupService(args);
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)),
 any()))
+.thenReturn(describeGroupsResult(ConsumerGroupState.STABLE));
+
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec()),
 any()))
+.thenReturn(listGroupOffsetsResult(GROUP));
+when(admin.listOffsets(offsetsArgMatcher(), any()))
+.thenReturn(listOffsetsResult());
+
+Tuple2, 
Option>> 
statesAndAssignments = groupService.collectGroupOffsets(GROUP);
+assertEquals(Some.apply("Stable"), statesAndAssignments._1);
+assertTrue(statesAndAssignments._2.isDefined());
+assertEqu

Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]

2024-01-24 Thread via GitHub


nizhikov commented on code in PR #15248:
URL: https://github.com/apache/kafka/pull/15248#discussion_r1465586005


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java:
##
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.MemberAssignment;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.ArgumentMatchers;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.immutable.Map$;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ConsumerGroupServiceTest {
+public static final String GROUP = "testGroup";
+
+public static final int NUM_PARTITIONS = 10;
+
+private static final List TOPICS = IntStream.range(0, 
5).mapToObj(i -> "testTopic" + i).collect(Collectors.toList());
+
+private static final List TOPIC_PARTITIONS = 
TOPICS.stream()
+.flatMap(topic -> IntStream.range(0, NUM_PARTITIONS).mapToObj(i -> 
new TopicPartition(topic, i)))
+.collect(Collectors.toList());
+
+private final Admin admin = mock(Admin.class);
+
+@Test
+public void testAdminRequestsForDescribeOffsets() {
+String[] args = new String[]{"--bootstrap-server", "localhost:9092", 
"--group", GROUP, "--describe", "--offsets"};
+ConsumerGroupCommand.ConsumerGroupService groupService = 
consumerGroupService(args);
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)),
 any()))
+.thenReturn(describeGroupsResult(ConsumerGroupState.STABLE));
+
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec()),
 any()))
+.thenReturn(listGroupOffsetsResult(GROUP));
+when(admin.listOffsets(offsetsArgMatcher(), any()))
+.thenReturn(listOffsetsResult());
+
+Tuple2, 
Option>> res = 
groupService.collectGroupOffsets(GROUP);
+assertEquals(Some.apply("Stable"), res._1);
+assertTrue(res._2.isDefined());
+assertEquals(TOPIC_PARTITIONS.size(), res._2.get().size());

Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]

2024-01-24 Thread via GitHub


jolshan commented on code in PR #15248:
URL: https://github.com/apache/kafka/pull/15248#discussion_r1465585471


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java:
##
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.MemberAssignment;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.ArgumentMatchers;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.immutable.Map$;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ConsumerGroupServiceTest {
+public static final String GROUP = "testGroup";
+
+public static final int NUM_PARTITIONS = 10;
+
+private static final List TOPICS = IntStream.range(0, 
5).mapToObj(i -> "testTopic" + i).collect(Collectors.toList());
+
+private static final List TOPIC_PARTITIONS = 
TOPICS.stream()
+.flatMap(topic -> IntStream.range(0, NUM_PARTITIONS).mapToObj(i -> 
new TopicPartition(topic, i)))
+.collect(Collectors.toList());
+
+private final Admin admin = mock(Admin.class);
+
+@Test
+public void testAdminRequestsForDescribeOffsets() {
+String[] args = new String[]{"--bootstrap-server", "localhost:9092", 
"--group", GROUP, "--describe", "--offsets"};
+ConsumerGroupCommand.ConsumerGroupService groupService = 
consumerGroupService(args);
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)),
 any()))
+.thenReturn(describeGroupsResult(ConsumerGroupState.STABLE));
+
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec()),
 any()))
+.thenReturn(listGroupOffsetsResult(GROUP));
+when(admin.listOffsets(offsetsArgMatcher(), any()))
+.thenReturn(listOffsetsResult());
+
+Tuple2, 
Option>> 
statesAndAssignments = groupService.collectGroupOffsets(GROUP);
+assertEquals(Some.apply("Stable"), statesAndAssignments._1);
+assertTrue(statesAndAssignments._2.isDefined());
+assertEqua

Re: [PR] Minor update to KafkaApisTest [kafka]

2024-01-24 Thread via GitHub


chb2ab commented on code in PR #15257:
URL: https://github.com/apache/kafka/pull/15257#discussion_r1465584550


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -4296,9 +4296,9 @@ class KafkaApisTest extends Logging {
 Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)))
 })
 
-val fetchData = Map(tidp -> new FetchRequest.PartitionData(Uuid.ZERO_UUID, 
0, 0, 1000,
+val fetchData = Map(tidp -> new FetchRequest.PartitionData(topicId, 0, 0, 
1000,
   Optional.empty())).asJava
-val fetchDataBuilder = Map(tp -> new 
FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 1000,
+val fetchDataBuilder = Map(tp -> new FetchRequest.PartitionData(topicId, 
0, 0, 1000,

Review Comment:
   updated, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]

2024-01-24 Thread via GitHub


nizhikov commented on code in PR #15248:
URL: https://github.com/apache/kafka/pull/15248#discussion_r1465574942


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java:
##
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.MemberAssignment;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.ArgumentMatchers;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.immutable.Map$;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ConsumerGroupServiceTest {
+public static final String GROUP = "testGroup";
+
+public static final int NUM_PARTITIONS = 10;
+
+private static final List TOPICS = IntStream.range(0, 
5).mapToObj(i -> "testTopic" + i).collect(Collectors.toList());
+
+private static final List TOPIC_PARTITIONS = 
TOPICS.stream()
+.flatMap(topic -> IntStream.range(0, NUM_PARTITIONS).mapToObj(i -> 
new TopicPartition(topic, i)))
+.collect(Collectors.toList());
+
+private final Admin admin = mock(Admin.class);
+
+@Test
+public void testAdminRequestsForDescribeOffsets() {
+String[] args = new String[]{"--bootstrap-server", "localhost:9092", 
"--group", GROUP, "--describe", "--offsets"};
+ConsumerGroupCommand.ConsumerGroupService groupService = 
consumerGroupService(args);
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)),
 any()))
+.thenReturn(describeGroupsResult(ConsumerGroupState.STABLE));
+
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec()),
 any()))
+.thenReturn(listGroupOffsetsResult(GROUP));
+when(admin.listOffsets(offsetsArgMatcher(), any()))
+.thenReturn(listOffsetsResult());
+
+Tuple2, 
Option>> 
statesAndAssignments = groupService.collectGroupOffsets(GROUP);
+assertEquals(Some.apply("Stable"), statesAndAssignments._1);
+assertTrue(statesAndAssignments._2.isDefined());
+assertEqu

Re: [PR] MINOR: Store separate output per test case instead of suite [kafka]

2024-01-24 Thread via GitHub


gharris1727 commented on PR #15258:
URL: https://github.com/apache/kafka/pull/15258#issuecomment-1908946690

   #14795 :+1:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


junrao commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1465454670


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + 
clientInstanceId);
+unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new 
WindowedCount(),
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags));
+sensorsName.add(unknownSubscriptionRequestCountSensor.name());
+
+Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE 
+ "-" + clientInstanceId);
+throttleCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.THROTTLE, tags));
+sensorsName.add(throttleCount.name());
+
+Sensor pluginExportCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId);
+pluginExportCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_EXPORT, tags));
+sensorsName.add(pluginExportCount.name());
+
+Sensor pluginErrorCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId);
+pluginErrorCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_ERROR, tags));
+sensorsName.add(pluginErrorCount.name());
+
+Sensor pluginExportTime = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId);
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Avg",
+ClientMetricsStats.GROUP_NAME, "Average time broker spent in 
invoking plugin exportMetrics call", tags), new Avg());
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Max",
+ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in 
invoking plugin exportMetrics call", tags), new Max());
+sensorsName.add(pluginExportTime.name());
+}
+
+public void recordUnknownSubscriptionCount(Uuid clientInstanceId) {
+record(UNKNOWN_SUBSCRIPTION_REQUEST, clientInstanceId);
+}
+
+public void recordThrottleCount(Uuid clientInstanceId) {
+record(THROTTLE, clientInstanceId);
+}
+
+public void recordPluginExport(Uuid clientInstanceId, long timeMs) {
+record(PLUGIN_EXPORT, clientInstanceId);
+record(PLUGIN_EXPORT_TIME, clientInstanceId, timeMs);
+}
+
+public void recordPluginErrorCount(Uuid clientInstanceId) {
+record(PLUGI

Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


junrao commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1465476918


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,

Review Comment:
   The KIP includes a tag for software version. Should we correct the KIP?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


junrao commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1465492172


##
server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java:
##
@@ -1023,5 +1177,18 @@ public void 
testCacheExpirationTaskCancelledOnInstanceUpdate() throws UnknownHos
 assertNotNull(instance);
 assertNotNull(instance.expirationTimerTask());
 assertEquals(1, clientMetricsManager.expirationTimer().size());
+// Metrics size should remain same on instance update.
+assertEquals(12, kafkaMetrics.metrics().size());
+assertEquals((double) 1, 
getMetric(ClientMetricsManager.ClientMetricsStats.INSTANCE_COUNT).metricValue());
+}
+
+private KafkaMetric getMetric(String name) throws Exception {
+Optional> metric = 
kafkaMetrics.metrics().entrySet().stream()

Review Comment:
   Hmm, why do we need to iterate the whole map instead of doing a `get` based 
on name on the map?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


junrao commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1465474347


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(

Review Comment:
   The KIP includes a tag with client software version. Is that correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


junrao commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1465446342


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + 
clientInstanceId);
+unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new 
WindowedCount(),
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags));
+sensorsName.add(unknownSubscriptionRequestCountSensor.name());
+
+Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE 
+ "-" + clientInstanceId);
+throttleCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.THROTTLE, tags));
+sensorsName.add(throttleCount.name());
+
+Sensor pluginExportCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId);
+pluginExportCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_EXPORT, tags));
+sensorsName.add(pluginExportCount.name());
+
+Sensor pluginErrorCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId);
+pluginErrorCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_ERROR, tags));
+sensorsName.add(pluginErrorCount.name());
+
+Sensor pluginExportTime = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId);
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Avg",
+ClientMetricsStats.GROUP_NAME, "Average time broker spent in 
invoking plugin exportMetrics call", tags), new Avg());
+
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Max",
+ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in 
invoking plugin exportMetrics call", tags), new Max());
+sensorsName.add(pluginExportTime.name());
+}
+
+public void recordUnknownSubscriptionCount(Uuid clientInstanceId) {
+record(UNKNOWN_SUBSCRIPTION_REQUEST, clientInstanceId);
+}
+
+public void recordThrottleCount(Uuid clientInstanceId) {
+record(THROTTLE, clientInstanceId);
+}
+
+public void recordPluginExport(Uuid clientInstanceId, long timeMs) {
+record(PLUGIN_EXPORT, clientInstanceId);
+record(PLUGIN_EXPORT_TIME, clientInstanceId, timeMs);

Review Comment:
   Since these two metrics are derived from the same source, they can be on the 
same sens

Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


junrao commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1465497737


##
server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java:
##
@@ -299,8 +332,8 @@ public void 
testGetTelemetrySameClientImmediateRetryAfterPushFail() throws Unkno
 // Create new client metrics manager which simulates a new server as 
it will not have any
 // last request information but request should succeed as subscription 
id should match
 // the one with new client instance.
-
-ClientMetricsManager newClientMetricsManager = new 
ClientMetricsManager(clientMetricsReceiverPlugin, 100, time);
+kafkaMetrics = new Metrics();

Review Comment:
   We create a new instance during setup already. Do we need to construct a new 
instance here? Ditto in a few other places.



##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -191,8 +206,11 @@ public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest re
 byte[] metrics = request.data().metrics();
 if (metrics != null && metrics.length > 0) {
 try {
+long exportTimeStartMs = time.milliseconds();

Review Comment:
   If we just want to measure the amount of time passed, `time.hiResClockMs` is 
cheaper.



##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
+ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + 
clientInstanceId);

Review Comment:
   > Total number of metrics requests GetTelemetrySubscriptionsRequests with 
unknown CLIENT_INSTANCE_IDs.
   
   The above is the description of the metric in the KIP. It seems inaccurate 
since it should be for unknown subscriptionId instead of CLIENT_INSTANCE_ID. 
Also, we track it for PushTelemetryRequest 
instead of GetTelemetrySubscriptionsRequest.



##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+

Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-24 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1465427349


##
clients/src/test/java/org/apache/kafka/common/config/provider/AllowedPathsTest.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.internals.AllowedPaths;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class AllowedPathsTest {
+
+private AllowedPaths allowedPaths;
+private String dir;
+private String myFile;
+private String dir2;
+
+@BeforeEach
+public void setup() throws IOException {
+File parent = TestUtils.tempDirectory();

Review Comment:
   nit: Make this a `@TempDir` annotated field so that it automatically gets 
cleaned up.
   https://junit.org/junit5/docs/5.4.1/api/org/junit/jupiter/api/io/TempDir.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-24 Thread via GitHub


mumrah merged PR #14612:
URL: https://github.com/apache/kafka/pull/14612


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Store separate output per test case instead of suite [kafka]

2024-01-24 Thread via GitHub


C0urante opened a new pull request, #15258:
URL: https://github.com/apache/kafka/pull/15258

   Currently, output captured during tests is combined for every test in the 
suite and stored in a single location. This makes it difficult to diagnose CI 
failures since that combined test output is truncated if it exceeds a certain 
length, which often means that output for entire test cases at a time gets 
completely wiped.
   
   The change here causes output to be stored per test case (i.e., method) 
instead of suite (i.e., class), which should make more information available 
and reduce the impact of truncation. If/when it becomes possible to store 
complete test output for failing tests only (this is currently not feasible 
with the Jenkins JUnit plugin), it will also allow us to retain less test 
output since we'll be able to store the output of individual failed cases 
instead of entire suites that may only contain one or two failures.
   
   I'm unsure if this approach works for JUnit 5 or not, so I'm opening this PR 
as a draft for now. If/when I've been able to validate compatibility, I'll mark 
it ready for review.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Minor update to KafkaApisTest [kafka]

2024-01-24 Thread via GitHub


jolshan commented on code in PR #15257:
URL: https://github.com/apache/kafka/pull/15257#discussion_r1465430002


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -4296,9 +4296,9 @@ class KafkaApisTest extends Logging {
 Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)))
 })
 
-val fetchData = Map(tidp -> new FetchRequest.PartitionData(Uuid.ZERO_UUID, 
0, 0, 1000,
+val fetchData = Map(tidp -> new FetchRequest.PartitionData(topicId, 0, 0, 
1000,
   Optional.empty())).asJava
-val fetchDataBuilder = Map(tp -> new 
FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 1000,
+val fetchDataBuilder = Map(tp -> new FetchRequest.PartitionData(topicId, 
0, 0, 1000,

Review Comment:
   FullFetchContext should also set usesTopicIds to true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Minor update to KafkaApisTest [kafka]

2024-01-24 Thread via GitHub


chb2ab opened a new pull request, #15257:
URL: https://github.com/apache/kafka/pull/15257

   @jolshan noticed I was using the ZERO_UUID topicId instead of the actual 
topicId in the testFetchResponseContainsNewLeaderOnNotLeaderOrFollower 
introduced in https://github.com/apache/kafka/pull/1, updating as the 
actual topicId is more correct.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16192) Introduce usage of flexible records to coordinators

2024-01-24 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-16192:
--

 Summary: Introduce usage of flexible records to coordinators
 Key: KAFKA-16192
 URL: https://issues.apache.org/jira/browse/KAFKA-16192
 Project: Kafka
  Issue Type: Task
Reporter: Justine Olshan
Assignee: Justine Olshan


[KIP-915| 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-915%3A+Txn+and+Group+Coordinator+Downgrade+Foundation]
 introduced flexible versions to the records used for the group and transaction 
coordinators.
However, the KIP did not update the record version used.

For 
[KIP-890|https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense]
 we intend to use flexible fields in the transaction state records. This 
requires a safe way to upgrade from non-flexible version records to flexible 
version records.

Typically this is done as a message format bump. There may be an option to make 
this change using MV since if the readers of the records are internal and not 
external consumers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16191) Clean up of consumer client internal events

2024-01-24 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16191:
--
Description: 
There are a few minor issues with the event sub-classes in the 
{{org.apache.kafka.clients.consumer.internals}} package that should be cleaned 
up:
 # Update the names of subclasses to remove "{{{}Application{}}}" or 
"{{{}Background{}}}"
 # Make {{toString()}} final in the base classes and clean up the 
implementations of {{toStringBase()}}
 # Fix minor whitespace inconsistencies

  was:
There are a few minor issues with the event sub-classes in the 
org.apache.kafka.clients.consumer.internals package that should be cleaned up:
  # Update the names of subclasses to remove "Application" or "Background"
 # Make toString() final in the base classes and clean up the implementations 
of toStringBase()
 # Fix minor whitespace inconsistencies


> Clean up of consumer client internal events
> ---
>
> Key: KAFKA-16191
> URL: https://issues.apache.org/jira/browse/KAFKA-16191
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> There are a few minor issues with the event sub-classes in the 
> {{org.apache.kafka.clients.consumer.internals}} package that should be 
> cleaned up:
>  # Update the names of subclasses to remove "{{{}Application{}}}" or 
> "{{{}Background{}}}"
>  # Make {{toString()}} final in the base classes and clean up the 
> implementations of {{toStringBase()}}
>  # Fix minor whitespace inconsistencies



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16191) Clean up of consumer client internal events

2024-01-24 Thread Kirk True (Jira)
Kirk True created KAFKA-16191:
-

 Summary: Clean up of consumer client internal events
 Key: KAFKA-16191
 URL: https://issues.apache.org/jira/browse/KAFKA-16191
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


There are a few minor issues with the event sub-classes in the 
org.apache.kafka.clients.consumer.internals package that should be cleaned up:
  # Update the names of subclasses to remove "Application" or "Background"
 # Make toString() final in the base classes and clean up the implementations 
of toStringBase()
 # Fix minor whitespace inconsistencies



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-24 Thread via GitHub


CalvinConfluent commented on PR #14612:
URL: https://github.com/apache/kafka/pull/14612#issuecomment-1908668593

   @mumrah Thanks! I have verified the tests failing can pass locally. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]

2024-01-24 Thread via GitHub


jolshan commented on code in PR #15248:
URL: https://github.com/apache/kafka/pull/15248#discussion_r1465353561


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java:
##
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.MemberAssignment;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.ArgumentMatchers;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.immutable.Map$;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ConsumerGroupServiceTest {
+public static final String GROUP = "testGroup";
+
+public static final int NUM_PARTITIONS = 10;
+
+private static final List TOPICS = IntStream.range(0, 
5).mapToObj(i -> "testTopic" + i).collect(Collectors.toList());
+
+private static final List TOPIC_PARTITIONS = 
TOPICS.stream()
+.flatMap(topic -> IntStream.range(0, NUM_PARTITIONS).mapToObj(i -> 
new TopicPartition(topic, i)))
+.collect(Collectors.toList());
+
+private final Admin admin = mock(Admin.class);
+
+@Test
+public void testAdminRequestsForDescribeOffsets() {
+String[] args = new String[]{"--bootstrap-server", "localhost:9092", 
"--group", GROUP, "--describe", "--offsets"};
+ConsumerGroupCommand.ConsumerGroupService groupService = 
consumerGroupService(args);
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)),
 any()))
+.thenReturn(describeGroupsResult(ConsumerGroupState.STABLE));
+
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec()),
 any()))
+.thenReturn(listGroupOffsetsResult(GROUP));
+when(admin.listOffsets(offsetsArgMatcher(), any()))
+.thenReturn(listOffsetsResult());
+
+Tuple2, 
Option>> 
statesAndAssignments = groupService.collectGroupOffsets(GROUP);
+assertEquals(Some.apply("Stable"), statesAndAssignments._1);
+assertTrue(statesAndAssignments._2.isDefined());
+assertEqua

Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]

2024-01-24 Thread via GitHub


jolshan commented on code in PR #15248:
URL: https://github.com/apache/kafka/pull/15248#discussion_r1465353561


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java:
##
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.MemberAssignment;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.ArgumentMatchers;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.immutable.Map$;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ConsumerGroupServiceTest {
+public static final String GROUP = "testGroup";
+
+public static final int NUM_PARTITIONS = 10;
+
+private static final List TOPICS = IntStream.range(0, 
5).mapToObj(i -> "testTopic" + i).collect(Collectors.toList());
+
+private static final List TOPIC_PARTITIONS = 
TOPICS.stream()
+.flatMap(topic -> IntStream.range(0, NUM_PARTITIONS).mapToObj(i -> 
new TopicPartition(topic, i)))
+.collect(Collectors.toList());
+
+private final Admin admin = mock(Admin.class);
+
+@Test
+public void testAdminRequestsForDescribeOffsets() {
+String[] args = new String[]{"--bootstrap-server", "localhost:9092", 
"--group", GROUP, "--describe", "--offsets"};
+ConsumerGroupCommand.ConsumerGroupService groupService = 
consumerGroupService(args);
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)),
 any()))
+.thenReturn(describeGroupsResult(ConsumerGroupState.STABLE));
+
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec()),
 any()))
+.thenReturn(listGroupOffsetsResult(GROUP));
+when(admin.listOffsets(offsetsArgMatcher(), any()))
+.thenReturn(listOffsetsResult());
+
+Tuple2, 
Option>> 
statesAndAssignments = groupService.collectGroupOffsets(GROUP);
+assertEquals(Some.apply("Stable"), statesAndAssignments._1);
+assertTrue(statesAndAssignments._2.isDefined());
+assertEqua

Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]

2024-01-24 Thread via GitHub


jolshan commented on code in PR #15248:
URL: https://github.com/apache/kafka/pull/15248#discussion_r1465346754


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java:
##
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.MemberAssignment;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.ArgumentMatchers;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.immutable.Map$;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ConsumerGroupServiceTest {
+public static final String GROUP = "testGroup";
+
+public static final int NUM_PARTITIONS = 10;
+
+private static final List TOPICS = IntStream.range(0, 
5).mapToObj(i -> "testTopic" + i).collect(Collectors.toList());
+
+private static final List TOPIC_PARTITIONS = 
TOPICS.stream()
+.flatMap(topic -> IntStream.range(0, NUM_PARTITIONS).mapToObj(i -> 
new TopicPartition(topic, i)))
+.collect(Collectors.toList());
+
+private final Admin admin = mock(Admin.class);
+
+@Test
+public void testAdminRequestsForDescribeOffsets() {
+String[] args = new String[]{"--bootstrap-server", "localhost:9092", 
"--group", GROUP, "--describe", "--offsets"};
+ConsumerGroupCommand.ConsumerGroupService groupService = 
consumerGroupService(args);
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)),
 any()))
+.thenReturn(describeGroupsResult(ConsumerGroupState.STABLE));
+
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec()),
 any()))
+.thenReturn(listGroupOffsetsResult(GROUP));
+when(admin.listOffsets(offsetsArgMatcher(), any()))
+.thenReturn(listOffsetsResult());
+
+Tuple2, 
Option>> res = 
groupService.collectGroupOffsets(GROUP);
+assertEquals(Some.apply("Stable"), res._1);
+assertTrue(res._2.isDefined());
+assertEquals(TOPIC_PARTITIONS.size(), res._2.get().size());
+

Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]

2024-01-24 Thread via GitHub


nizhikov commented on code in PR #15248:
URL: https://github.com/apache/kafka/pull/15248#discussion_r1465339215


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java:
##
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.MemberAssignment;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.ArgumentMatchers;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.immutable.Map$;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ConsumerGroupServiceTest {
+public static final String GROUP = "testGroup";
+
+public static final int NUM_PARTITIONS = 10;
+
+private static final List TOPICS = IntStream.range(0, 
5).mapToObj(i -> "testTopic" + i).collect(Collectors.toList());
+
+private static final List TOPIC_PARTITIONS = 
TOPICS.stream()
+.flatMap(topic -> IntStream.range(0, NUM_PARTITIONS).mapToObj(i -> 
new TopicPartition(topic, i)))
+.collect(Collectors.toList());
+
+private final Admin admin = mock(Admin.class);
+
+@Test
+public void testAdminRequestsForDescribeOffsets() {
+String[] args = new String[]{"--bootstrap-server", "localhost:9092", 
"--group", GROUP, "--describe", "--offsets"};
+ConsumerGroupCommand.ConsumerGroupService groupService = 
consumerGroupService(args);
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)),
 any()))
+.thenReturn(describeGroupsResult(ConsumerGroupState.STABLE));
+
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec()),
 any()))
+.thenReturn(listGroupOffsetsResult(GROUP));
+when(admin.listOffsets(offsetsArgMatcher(), any()))
+.thenReturn(listOffsetsResult());
+
+Tuple2, 
Option>> res = 
groupService.collectGroupOffsets(GROUP);
+assertEquals(Some.apply("Stable"), res._1);
+assertTrue(res._2.isDefined());
+assertEquals(TOPIC_PARTITIONS.size(), res._2.get().size());

Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]

2024-01-24 Thread via GitHub


nizhikov commented on code in PR #15248:
URL: https://github.com/apache/kafka/pull/15248#discussion_r1465336810


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java:
##
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.MemberAssignment;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.ArgumentMatchers;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.immutable.Map$;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ConsumerGroupServiceTest {
+public static final String GROUP = "testGroup";
+
+public static final int NUM_PARTITIONS = 10;
+
+private static final List TOPICS = IntStream.range(0, 
5).mapToObj(i -> "testTopic" + i).collect(Collectors.toList());
+
+private static final List TOPIC_PARTITIONS = 
TOPICS.stream()
+.flatMap(topic -> IntStream.range(0, NUM_PARTITIONS).mapToObj(i -> 
new TopicPartition(topic, i)))
+.collect(Collectors.toList());
+
+private final Admin admin = mock(Admin.class);
+
+@Test
+public void testAdminRequestsForDescribeOffsets() {
+String[] args = new String[]{"--bootstrap-server", "localhost:9092", 
"--group", GROUP, "--describe", "--offsets"};
+ConsumerGroupCommand.ConsumerGroupService groupService = 
consumerGroupService(args);
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)),
 any()))
+.thenReturn(describeGroupsResult(ConsumerGroupState.STABLE));
+
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec()),
 any()))
+.thenReturn(listGroupOffsetsResult(GROUP));
+when(admin.listOffsets(offsetsArgMatcher(), any()))
+.thenReturn(listOffsetsResult());
+
+Tuple2, 
Option>> res = 
groupService.collectGroupOffsets(GROUP);
+assertEquals(Some.apply("Stable"), res._1);
+assertTrue(res._2.isDefined());
+assertEquals(TOPIC_PARTITIONS.size(), res._2.get().size());

Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]

2024-01-24 Thread via GitHub


jolshan commented on code in PR #15248:
URL: https://github.com/apache/kafka/pull/15248#discussion_r1465333784


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java:
##
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.MemberAssignment;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.ArgumentMatchers;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.immutable.Map$;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ConsumerGroupServiceTest {
+public static final String GROUP = "testGroup";
+
+public static final int NUM_PARTITIONS = 10;
+
+private static final List TOPICS = IntStream.range(0, 
5).mapToObj(i -> "testTopic" + i).collect(Collectors.toList());
+
+private static final List TOPIC_PARTITIONS = 
TOPICS.stream()
+.flatMap(topic -> IntStream.range(0, NUM_PARTITIONS).mapToObj(i -> 
new TopicPartition(topic, i)))
+.collect(Collectors.toList());
+
+private final Admin admin = mock(Admin.class);
+
+@Test
+public void testAdminRequestsForDescribeOffsets() {
+String[] args = new String[]{"--bootstrap-server", "localhost:9092", 
"--group", GROUP, "--describe", "--offsets"};
+ConsumerGroupCommand.ConsumerGroupService groupService = 
consumerGroupService(args);
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)),
 any()))
+.thenReturn(describeGroupsResult(ConsumerGroupState.STABLE));
+
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec()),
 any()))
+.thenReturn(listGroupOffsetsResult(GROUP));
+when(admin.listOffsets(offsetsArgMatcher(), any()))
+.thenReturn(listOffsetsResult());
+
+Tuple2, 
Option>> res = 
groupService.collectGroupOffsets(GROUP);
+assertEquals(Some.apply("Stable"), res._1);
+assertTrue(res._2.isDefined());
+assertEquals(TOPIC_PARTITIONS.size(), res._2.get().size());
+

Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]

2024-01-24 Thread via GitHub


jolshan commented on code in PR #15248:
URL: https://github.com/apache/kafka/pull/15248#discussion_r1465332064


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java:
##
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.MemberAssignment;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.ArgumentMatchers;
+import scala.Option;
+import scala.Some;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.immutable.Map$;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ConsumerGroupServiceTest {
+public static final String GROUP = "testGroup";
+
+public static final int NUM_PARTITIONS = 10;
+
+private static final List TOPICS = IntStream.range(0, 
5).mapToObj(i -> "testTopic" + i).collect(Collectors.toList());
+
+private static final List TOPIC_PARTITIONS = 
TOPICS.stream()
+.flatMap(topic -> IntStream.range(0, NUM_PARTITIONS).mapToObj(i -> 
new TopicPartition(topic, i)))
+.collect(Collectors.toList());
+
+private final Admin admin = mock(Admin.class);
+
+@Test
+public void testAdminRequestsForDescribeOffsets() {
+String[] args = new String[]{"--bootstrap-server", "localhost:9092", 
"--group", GROUP, "--describe", "--offsets"};
+ConsumerGroupCommand.ConsumerGroupService groupService = 
consumerGroupService(args);
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)),
 any()))
+.thenReturn(describeGroupsResult(ConsumerGroupState.STABLE));
+
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec()),
 any()))
+.thenReturn(listGroupOffsetsResult(GROUP));
+when(admin.listOffsets(offsetsArgMatcher(), any()))
+.thenReturn(listOffsetsResult());
+
+Tuple2, 
Option>> res = 
groupService.collectGroupOffsets(GROUP);

Review Comment:
   is there a way we could say what the tuple contains? like 
`stateAndAssignments`



-- 
This is an automated message from the Apache Git Service

Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]

2024-01-24 Thread via GitHub


nizhikov commented on PR #15248:
URL: https://github.com/apache/kafka/pull/15248#issuecomment-1908611036

   Hello @showuon 
   
   Can you, please, take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14589 [1/3][WIP] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-01-24 Thread via GitHub


nizhikov opened a new pull request, #15256:
URL: https://github.com/apache/kafka/pull/15256

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16190) Member should send full heartbeat when rejoining

2024-01-24 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16190:
---
Description: 
The heartbeat request builder should make sure that all fields are sent in the 
heartbeat request when the consumer rejoins (currently the 
HeartbeatRequestManager request builder is only reset on failure scenarios). 
This should fix the issue that a client that is subscribed to a topic and gets 
fenced, should try to rejoin providing the same subscription it had. 
Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses 
this case given that it does explicitly change the subscription when it gets 
fenced. We should ensure we test a consumer that keeps it same initial 
subscription when it rejoins after being fenced.

  was:
The heartbeat request builder should make sure that all fields are sent in the 
heartbeat request when the consumer rejoins (currently the 
HeartbeatRequestManager request builder is only reset on failure scenarios). 
This should fix the issue that a client that is subscribed to a topic and gets 
fenced, should try to rejoin providing the same subscription it had. 


> Member should send full heartbeat when rejoining
> 
>
> Key: KAFKA-16190
> URL: https://issues.apache.org/jira/browse/KAFKA-16190
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: client-transitions-issues, newbie
>
> The heartbeat request builder should make sure that all fields are sent in 
> the heartbeat request when the consumer rejoins (currently the 
> HeartbeatRequestManager request builder is only reset on failure scenarios). 
> This should fix the issue that a client that is subscribed to a topic and 
> gets fenced, should try to rejoin providing the same subscription it had. 
> Note that the existing HeartbeatRequestManagerTest.testHeartbeatState misses 
> this case given that it does explicitly change the subscription when it gets 
> fenced. We should ensure we test a consumer that keeps it same initial 
> subscription when it rejoins after being fenced.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16190) Member should send full heartbeat when rejoining

2024-01-24 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16190:
---
Labels: client-transitions-issues newbie  (was: client-transitions-issues)

> Member should send full heartbeat when rejoining
> 
>
> Key: KAFKA-16190
> URL: https://issues.apache.org/jira/browse/KAFKA-16190
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: client-transitions-issues, newbie
>
> The heartbeat request builder should make sure that all fields are sent in 
> the heartbeat request when the consumer rejoins (currently the 
> HeartbeatRequestManager request builder is only reset on failure scenarios). 
> This should fix the issue that a client that is subscribed to a topic and 
> gets fenced, should try to rejoin providing the same subscription it had. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16134) kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky

2024-01-24 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17810495#comment-17810495
 ] 

Lianet Magrans edited comment on KAFKA-16134 at 1/24/24 4:54 PM:
-

Fix merged in https://github.com/apache/kafka/pull/15215. Passes consistently 
locally, when running repeatedly (used to fail easily before)


was (Author: JIRAUSER300183):
Fix merged in https://github.com/apache/kafka/pull/15215

> kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer is flaky
> --
>
> Key: KAFKA-16134
> URL: https://issues.apache.org/jira/browse/KAFKA-16134
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Stanislav Kozlovski
>Assignee: Lianet Magrans
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The following test is very flaky. It failed 3 times consecutively in Jenkins 
> runs for the 3.7 release candidate.
> kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16135) kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky

2024-01-24 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17810493#comment-17810493
 ] 

Lianet Magrans edited comment on KAFKA-16135 at 1/24/24 4:53 PM:
-

Fix merged in https://github.com/apache/kafka/pull/15215. Passes consistently 
locally, when running repeatedly (used to fail easily before)


was (Author: JIRAUSER300183):
Fix merged in https://github.com/apache/kafka/pull/15215

> kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer is flaky
> ---
>
> Key: KAFKA-16135
> URL: https://issues.apache.org/jira/browse/KAFKA-16135
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Stanislav Kozlovski
>Assignee: Lianet Magrans
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The test
> kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer
> is incredibly flaky - it failed 3 builds in a row for the 3.7 release 
> candidate, but with different JDK versions. Locally it also fails often and 
> requires a few retries to pass
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16134) kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky

2024-01-24 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans resolved KAFKA-16134.

  Assignee: Lianet Magrans
Resolution: Fixed

Fix merged in https://github.com/apache/kafka/pull/15215

> kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer is flaky
> --
>
> Key: KAFKA-16134
> URL: https://issues.apache.org/jira/browse/KAFKA-16134
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Stanislav Kozlovski
>Assignee: Lianet Magrans
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The following test is very flaky. It failed 3 times consecutively in Jenkins 
> runs for the 3.7 release candidate.
> kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16135) kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky

2024-01-24 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans resolved KAFKA-16135.

  Assignee: Lianet Magrans
Resolution: Fixed

Fix merged in https://github.com/apache/kafka/pull/15215

> kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer is flaky
> ---
>
> Key: KAFKA-16135
> URL: https://issues.apache.org/jira/browse/KAFKA-16135
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Stanislav Kozlovski
>Assignee: Lianet Magrans
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The test
> kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String,
>  String).quorum=kraft+kip848.groupProtocol=consumer
> is incredibly flaky - it failed 3 builds in a row for the 3.7 release 
> candidate, but with different JDK versions. Locally it also fails often and 
> requires a few retries to pass
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Fix "No suitable checks publisher found" message during CI build [kafka]

2024-01-24 Thread via GitHub


C0urante merged PR #15247:
URL: https://github.com/apache/kafka/pull/15247


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16190) Member should send full heartbeat when rejoining

2024-01-24 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16190:
--

 Summary: Member should send full heartbeat when rejoining
 Key: KAFKA-16190
 URL: https://issues.apache.org/jira/browse/KAFKA-16190
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Reporter: Lianet Magrans


The heartbeat request builder should make sure that all fields are sent in the 
heartbeat request when the consumer rejoins (currently the 
HeartbeatRequestManager request builder is only reset on failure scenarios). 
This should fix the issue that a client that is subscribed to a topic and gets 
fenced, should try to rejoin providing the same subscription it had. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16189; Extend admin to support ConsumerGroupDescribe API [kafka]

2024-01-24 Thread via GitHub


dajac commented on code in PR #15253:
URL: https://github.com/apache/kafka/pull/15253#discussion_r1465170111


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java:
##
@@ -89,18 +96,42 @@ public AdminApiLookupStrategy 
lookupStrategy() {
 }
 
 @Override
-public DescribeGroupsRequest.Builder buildBatchedRequest(int 
coordinatorId, Set keys) {
-List groupIds = keys.stream().map(key -> {
+public Collection> buildRequest(int 
coordinatorId, Set keys) {
+Set newConsumerGroups = new HashSet<>();
+Set oldConsumerGroups = new HashSet<>();
+
+keys.forEach(key -> {
 if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
-throw new IllegalArgumentException("Invalid transaction 
coordinator key " + key +
+throw new IllegalArgumentException("Invalid group coordinator 
key " + key +
 " when building `DescribeGroups` request");
 }
-return key.idValue;
-}).collect(Collectors.toList());
-DescribeGroupsRequestData data = new DescribeGroupsRequestData()
-.setGroups(groupIds)
-.setIncludeAuthorizedOperations(includeAuthorizedOperations);
-return new DescribeGroupsRequest.Builder(data);
+
+// Be default, we always try with using the new consumer group
+// describe API. If it fails, we fail back to using the classic
+// group API.
+if (useClassicGroupApi.getOrDefault(key.idValue, false)) {
+oldConsumerGroups.add(key);
+} else {
+newConsumerGroups.add(key);
+}
+});
+
+List> requests = new ArrayList<>();
+if (!newConsumerGroups.isEmpty()) {
+ConsumerGroupDescribeRequestData data = new 
ConsumerGroupDescribeRequestData()
+.setGroupIds(newConsumerGroups.stream().map(key -> 
key.idValue).collect(Collectors.toList()))
+.setIncludeAuthorizedOperations(includeAuthorizedOperations);
+requests.add(new RequestAndKeys<>(new 
ConsumerGroupDescribeRequest.Builder(data, true), newConsumerGroups));

Review Comment:
   `true` must be removed here when https://github.com/apache/kafka/pull/15255 
is merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Mark ConsumerGroupDescribe API as stable [kafka]

2024-01-24 Thread via GitHub


dajac opened a new pull request, #15255:
URL: https://github.com/apache/kafka/pull/15255

   This patch marks the ConsumerGroupDescribe API as stable. This is required 
in order to use the admin tools with the new protocol. The patch also removes 
the fields related to the client side assignor as this is not implemented yet.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-24 Thread via GitHub


vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1908434176

   @C0urante , I did some analysis today pertaining to the version that was 
present with `CompletableFuture` 
[here](https://github.com/apache/kafka/blob/062591757a04647eb4837348f59b0e5736b6372f/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java#L312-L330).
   
   I wrote a small program to mimic the behaviour ie returning a future, having 
possibly blocking callback methods chained, from a method 
   
   ```
   package org.example;
   
   import java.time.Duration;
   import java.util.concurrent.*;
   
   public class CompletableFutureTest {
   
   private static Future getFuture() {
   CompletableFuture overallFuture = 
CompletableFuture.completedFuture(null);
   overallFuture = overallFuture.thenRun(() -> {
   try {
   System.out.println("Function 1 - Thread::" + 
Thread.currentThread());
   long initTime = System.currentTimeMillis();
   System.out.println("Function 1 - Initial Time::" + initTime);
   Thread.sleep(Duration.ofSeconds(30).toMillis());
   System.out.println("Function 1 - Time Elapsed::" + 
(System.currentTimeMillis() - initTime) / 1000);
   } catch (InterruptedException e) {
   throw new RuntimeException(e);
   }
   });
   overallFuture = overallFuture.thenRun(() -> {
   long initTime = System.currentTimeMillis();
   System.out.println("Function 2 - Thread::" + 
Thread.currentThread());
   System.out.println("Function 2 - Initial Time::" + initTime);
   try {
   Thread.sleep(Duration.ofSeconds(40).toMillis());
   } catch (InterruptedException e) {
   throw new RuntimeException(e);
   }
   System.out.println("Function 2 - Time Elapsed::" + 
(System.currentTimeMillis() - initTime) / 1000);
   });
   return overallFuture;
   }
   
   public static void main(String[] args) {
   System.out.println("Begin main");
   System.out.println("Main - Thread::" + Thread.currentThread());
   Future future = getFuture();
   try {
   long initTime = System.currentTimeMillis();
   future.get();
   System.out.println("Main - Time Elapsed::" + 
(System.currentTimeMillis() - initTime) / 1000);
   System.out.println("End of main");
   } catch (InterruptedException e) {
   throw new RuntimeException(e);
   } catch (ExecutionException e) {
   throw new RuntimeException(e.getCause());
   }
   }
   }
   ```
   and this is the output I get
   
   ```
   Begin main
   Main - Thread::Thread[main,5,main]
   Function 1 - Thread::Thread[main,5,main]
   Function 1 - Initial Time::1706111085236
   Function 1 - Time Elapsed::30
   Function 2 - Thread::Thread[main,5,main]
   Function 2 - Initial Time::170615248
   Function 2 - Time Elapsed::40
   Main - Time Elapsed::0
   End of main
   ```
   
   So, essentially the callbacks get executed as soon as the `getFuture()` 
method is invoked and by the time we get to the `get()` call in main, the 
future is already resolved and it returns immediately (as you pointed out).
   
   I get a very similar result when I change the definition of `overallFuture` 
to 
   
   ```
   CompletableFuture overallFuture = CompletableFuture.supplyAsync(() -> 
null);
   ```
   
   However, if I change the invocations of callback to use `thenRunAsync`, then 
I get totally different results (which is what we expect):
   
   ```
   Begin main
   Main - Thread::Thread[main,5,main]
   Function 1 - Thread::Thread[ForkJoinPool.commonPool-worker-1,5,main]
   Function 1 - Initial Time::1706111422731
   Function 1 - Time Elapsed::30
   Function 2 - Thread::Thread[ForkJoinPool.commonPool-worker-1,5,main]
   Function 2 - Initial Time::1706111452740
   Function 2 - Time Elapsed::40
   Main - Time Elapsed::70
   End of main
   ```
   
   i.e this time, the `getFuture` method returns immediately and the lambdas 
get invoked when `get` is invoked on the returned future. I think the reason 
for that is that the methods in `CompletableFuture` are executed immediately 
upon invocation. It's only the `xxxAsync` variants that execute on the common 
fork join pool (can be seen in the above printed thread) or if needed on a 
separate thread pool. This behaviour is similar to other reactive libraries 
like `rx-java` IIRC. I believe the offset write happens on the same thread 
which invokes the `set()` method and we would want to continue that behaviour, 
so I am not sure we would want to use the `xxxAsync` variants. I even tried 
using `KafkaFutureImpl` which is Kafka's internal implementation for async 
programming, but I get prett

Re: [PR] KAFKA-15462: Add Group Type Filter for List Group to the Admin Client [kafka]

2024-01-24 Thread via GitHub


dajac commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1465144883


##
clients/src/main/java/org/apache/kafka/common/GroupType.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum GroupType {
+UNKNOWN("unknown"),
+CONSUMER("consumer"),
+CLASSIC("classic");

Review Comment:
   nit: I would use `Unknown`, `Consumer` and `Classic`.



##
clients/src/main/java/org/apache/kafka/common/GroupType.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum GroupType {
+UNKNOWN("unknown"),
+CONSUMER("consumer"),
+CLASSIC("classic");
+
+private final static Map NAME_TO_ENUM = 
Arrays.stream(values())
+.collect(Collectors.toMap(type -> type.name, Function.identity()));
+
+private final String name;
+
+GroupType(String name) {
+this.name = name;
+}
+
+/**
+ * Parse a string into a consumer group type.
+ */
+public static GroupType parse(String name) {

Review Comment:
   nit: Could we make it case insensitive?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move stateDirectory mock in TaskManagerTest to Mockito [kafka]

2024-01-24 Thread via GitHub


clolov commented on code in PR #15254:
URL: https://github.com/apache/kafka/pull/15254#discussion_r1465053043


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -5153,14 +5128,14 @@ private void makeTaskFolders(final String... names) 
throws Exception {
 for (int i = 0; i < names.length; ++i) {
 taskFolders.add(new TaskDirectory(testFolder.newFolder(names[i]), 
null));
 }
-
expect(stateDirectory.listNonEmptyTaskDirectories()).andReturn(taskFolders).once();
+
when(stateDirectory.listNonEmptyTaskDirectories()).thenReturn(taskFolders);
 }
 
 private void writeCheckpointFile(final TaskId task, final 
Map offsets) throws Exception {
 final File checkpointFile = getCheckpointFile(task);
 Files.createFile(checkpointFile.toPath());
 new OffsetCheckpoint(checkpointFile).write(offsets);
-
expect(stateDirectory.checkpointFileFor(task)).andReturn(checkpointFile);
+
lenient().when(stateDirectory.checkpointFileFor(task)).thenReturn(checkpointFile);

Review Comment:
   This is lenient because 
`shouldComputeOffsetSumForRestoringActiveTaskWithStateUpdater` and 
`shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater` do not invoke 
this method. I am equally happy to explicitly mock the required calls only 
within those two tests to maintain the same strictness. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16185) Fix client reconciliation of same assignment received in different epochs

2024-01-24 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16185:
---
Labels: client-transitions-issues  (was: )

> Fix client reconciliation of same assignment received in different epochs 
> --
>
> Key: KAFKA-16185
> URL: https://issues.apache.org/jira/browse/KAFKA-16185
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: client-transitions-issues
>
> Currently, the intention in the client state machine is that the client 
> always reconciles whatever it has pending that has not been removed by the 
> coordinator.
> There is still an edge case where this does not happen, and the client might 
> get stuck JOINING/RECONCILING, with a pending reconciliation (delayed), and 
> it receives the same assignment, but in a new epoch (ex. after being FENCED). 
> First time it receives the assignment it takes no action, as it already has 
> it as pending to reconcile, but when the reconciliation completes it discards 
> the result because the epoch changed. And this is wrong. Note that after 
> sending the assignment with the new epoch one time, the broker continues to 
> send null assignments. 
> Here is a sample sequence leading to the client stuck JOINING:
> - client joins, epoch 0
> - client receives assignment tp1, stuck RECONCILING, epoch 1
> - member gets FENCED on the coord, coord bumps epoch to 2
> - client tries to rejoin (JOINING), epoch 0 provided by the client
> - new member added to the group (group epoch bumped to 3), client receives 
> same assignment that is currently trying to reconcile (tp1), but with epoch 3
> - previous reconciliation completes, but will discard the result because it 
> will notice that the memberHasRejoined (memberEpochOnReconciliationStart != 
> memberEpoch). Client is stuck JOINING, with the server sending null target 
> assignment because it hasn't changed since the last one sent (tp1)
> (We should end up with a test similar to the existing 
> #testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case 
> that the member receives the same assignment after being fenced and rejoining)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16178) AsyncKafkaConsumer doesn't retry joining the group after rediscovering group coordinator

2024-01-24 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16178:
---
Labels: client-transitions-issues consumer-threading-refactor  (was: client 
client-transitions-issues consumer consumer-threading-refactor)

> AsyncKafkaConsumer doesn't retry joining the group after rediscovering group 
> coordinator
> 
>
> Key: KAFKA-16178
> URL: https://issues.apache.org/jira/browse/KAFKA-16178
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Dongnuo Lyu
>Assignee: Philip Nee
>Priority: Critical
>  Labels: client-transitions-issues, consumer-threading-refactor
> Attachments: pkc-devc63jwnj_jan19_0_debug
>
>
> {code:java}
> [2024-01-17 21:34:59,500] INFO [Consumer 
> clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, 
> groupId=consumer-groups-test-0] Discovered group coordinator 
> Coordinator(key='consumer-groups-test-0', nodeId=3, 
> host='b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud', port=9092, 
> errorCode=0, errorMessage='') 
> (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:162)
> [2024-01-17 21:34:59,681] INFO [Consumer 
> clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, 
> groupId=consumer-groups-test-0] GroupHeartbeatRequest failed because the 
> group coordinator 
> Optional[b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud:9092 (id: 
> 2147483644 rack: null)] is incorrect. Will attempt to find the coordinator 
> again and retry in 0ms: This is not the correct coordinator. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:407)
> [2024-01-17 21:34:59,681] INFO [Consumer 
> clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, 
> groupId=consumer-groups-test-0] Group coordinator 
> b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483644 rack: 
> null) is unavailable or invalid due to cause: This is not the correct 
> coordinator.. Rediscovery will be attempted. 
> (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:136)
> [2024-01-17 21:34:59,882] INFO [Consumer 
> clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, 
> groupId=consumer-groups-test-0] Discovered group coordinator 
> Coordinator(key='consumer-groups-test-0', nodeId=3, 
> host='b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud', port=9092, 
> errorCode=0, errorMessage='') 
> (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:162){code}
> Some of the consumers don't consume any message. The logs show that after the 
> consumer starts up and successfully logs in,
>  # The consumer discovers the group coordinator.
>  # The heartbeat to join group fails because "This is not the correct 
> coordinator"
>  # The consumer rediscover the group coordinator.
> Another heartbeat should follow the rediscovery of the group coordinator, but 
> there's no logs showing sign of a heartbeat request. 
> On the server side, there is completely no log about the group id. A 
> suspicion is that the consumer doesn't send a heartbeat request after 
> rediscover the group coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16178) AsyncKafkaConsumer doesn't retry joining the group after rediscovering group coordinator

2024-01-24 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16178:
---
Labels: client client-transitions-issues consumer 
consumer-threading-refactor  (was: consumer-threading-refactor)

> AsyncKafkaConsumer doesn't retry joining the group after rediscovering group 
> coordinator
> 
>
> Key: KAFKA-16178
> URL: https://issues.apache.org/jira/browse/KAFKA-16178
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Dongnuo Lyu
>Assignee: Philip Nee
>Priority: Critical
>  Labels: client, client-transitions-issues, consumer, 
> consumer-threading-refactor
> Attachments: pkc-devc63jwnj_jan19_0_debug
>
>
> {code:java}
> [2024-01-17 21:34:59,500] INFO [Consumer 
> clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, 
> groupId=consumer-groups-test-0] Discovered group coordinator 
> Coordinator(key='consumer-groups-test-0', nodeId=3, 
> host='b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud', port=9092, 
> errorCode=0, errorMessage='') 
> (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:162)
> [2024-01-17 21:34:59,681] INFO [Consumer 
> clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, 
> groupId=consumer-groups-test-0] GroupHeartbeatRequest failed because the 
> group coordinator 
> Optional[b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud:9092 (id: 
> 2147483644 rack: null)] is incorrect. Will attempt to find the coordinator 
> again and retry in 0ms: This is not the correct coordinator. 
> (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:407)
> [2024-01-17 21:34:59,681] INFO [Consumer 
> clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, 
> groupId=consumer-groups-test-0] Group coordinator 
> b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483644 rack: 
> null) is unavailable or invalid due to cause: This is not the correct 
> coordinator.. Rediscovery will be attempted. 
> (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:136)
> [2024-01-17 21:34:59,882] INFO [Consumer 
> clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, 
> groupId=consumer-groups-test-0] Discovered group coordinator 
> Coordinator(key='consumer-groups-test-0', nodeId=3, 
> host='b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud', port=9092, 
> errorCode=0, errorMessage='') 
> (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:162){code}
> Some of the consumers don't consume any message. The logs show that after the 
> consumer starts up and successfully logs in,
>  # The consumer discovers the group coordinator.
>  # The heartbeat to join group fails because "This is not the correct 
> coordinator"
>  # The consumer rediscover the group coordinator.
> Another heartbeat should follow the rediscovery of the group coordinator, but 
> there's no logs showing sign of a heartbeat request. 
> On the server side, there is completely no log about the group id. A 
> suspicion is that the consumer doesn't send a heartbeat request after 
> rediscover the group coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16095: Update list group state type filter to include the states for the new consumer group type [kafka]

2024-01-24 Thread via GitHub


dajac commented on code in PR #15211:
URL: https://github.com/apache/kafka/pull/15211#discussion_r1464905968


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -462,7 +463,9 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 public List listGroups(List 
statesFilter, long committedOffset) {
 Stream groupStream = groups.values(committedOffset).stream();
 if (!statesFilter.isEmpty()) {
-groupStream = groupStream.filter(group -> 
statesFilter.contains(group.stateAsString(committedOffset)));
+List caseInsensitiveFilterList = 
statesFilter.stream().map(String::toLowerCase).collect(Collectors.toList());

Review Comment:
   It would be better to use a Set here.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -462,7 +463,9 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 public List listGroups(List 
statesFilter, long committedOffset) {
 Stream groupStream = groups.values(committedOffset).stream();
 if (!statesFilter.isEmpty()) {
-groupStream = groupStream.filter(group -> 
statesFilter.contains(group.stateAsString(committedOffset)));
+List caseInsensitiveFilterList = 
statesFilter.stream().map(String::toLowerCase).collect(Collectors.toList());
+groupStream = groupStream.filter(group -> 
caseInsensitiveFilterList.contains(group.stateAsString(committedOffset).toLowerCase(

Review Comment:
   It is a tad annoying that we have to lower case the state for every group, 
every time this method is called. Here I wonder if we could store the lower 
cased version of the state in the enum in order to avoid it.
   
   The second think that I wanted to point out is that it may be worth adding a 
method like `inState(Set states, long committedOffset)` to the `Group` 
interface in order to delegate the implementation of the filter to the group 
itself. I think that something like this would be required to implement my 
initial suggestion.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug [kafka]

2024-01-24 Thread via GitHub


dajac commented on PR #15186:
URL: https://github.com/apache/kafka/pull/15186#issuecomment-1908072722

   Merged it to trunk and 3.7. cc @stanislavkozlovski 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug [kafka]

2024-01-24 Thread via GitHub


dajac merged PR #15186:
URL: https://github.com/apache/kafka/pull/15186


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-24 Thread via GitHub


dajac commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1464856230


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -452,21 +453,39 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 /**
  * Get the Group List.
  *
- * @param statesFilter The states of the groups we want to list.
- * If empty all groups are returned with their state.
- * @param committedOffset A specified committed offset corresponding to 
this shard
+ * @param statesFilter  The states of the groups we want to list.
+ *  If empty, all groups are returned with their 
state.
+ * @param typesFilter   The types of the groups we want to list.
+ *  If empty, all groups are returned with their 
type.
+ * @param committedOffset   A specified committed offset corresponding to 
this shard.
  *
  * @return A list containing the ListGroupsResponseData.ListedGroup
  */
+public List listGroups(
+Set statesFilter,
+Set typesFilter,
+long committedOffset
+) {
+// Convert typesFilter to lowercase to make the filter 
case-insensitive.
+Set lowerCaseTypesFilter = typesFilter.stream()
+.map(String::toLowerCase)
+.collect(Collectors.toCollection(HashSet::new));
+
+Predicate combinedFilter = group -> {
+boolean stateCheck = statesFilter.isEmpty() || 
statesFilter.contains(group.stateAsString(committedOffset));
+boolean typeCheck = lowerCaseTypesFilter.isEmpty() ||
+
lowerCaseTypesFilter.contains(group.type().toString().toLowerCase());

Review Comment:
   None of the required changes are on the client side or I don't see them. I 
would prefer to address it in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16107) Ensure consumer does not start fetching from added partitions until onPartitionsAssigned completes

2024-01-24 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16107.
-
  Reviewer: David Jacot
Resolution: Fixed

> Ensure consumer does not start fetching from added partitions until 
> onPartitionsAssigned completes
> --
>
> Key: KAFKA-16107
> URL: https://issues.apache.org/jira/browse/KAFKA-16107
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> In the new consumer implementation, when new partitions are assigned, the 
> subscription state is updated and then the #onPartitionsAssigned triggered. 
> This sequence seems sensible but we need to ensure that no data is fetched 
> until the onPartitionsAssigned completes (where the user could be setting the 
> committed offsets it want to start fetching from).
> We should pause the partitions newly added partitions until 
> onPartitionsAssigned completes, similar to how it's done on revocation to 
> avoid positions getting ahead of the committed offsets.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]

2024-01-24 Thread via GitHub


dajac merged PR #15215:
URL: https://github.com/apache/kafka/pull/15215


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16189; Extend admin to support ConsumerGroupDescribe API [kafka]

2024-01-24 Thread via GitHub


dajac opened a new pull request, #15253:
URL: https://github.com/apache/kafka/pull/15253

   WIP - Needs https://github.com/apache/kafka/pull/15205 and 
https://github.com/apache/kafka/pull/15211.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: cleanup core modules part 1 [kafka]

2024-01-24 Thread via GitHub


jlprat opened a new pull request, #15252:
URL: https://github.com/apache/kafka/pull/15252

   I'm going through the core classes and found some warnings that could be 
cleaned up, this is the first batch of clean ups.
   Even if the code needs to be ported to Java, this might help in the process 
(clearer types, less code, right visibility).
   
   This PR cleans up: admin, api, cluster, common, controller, coordinator and 
log classes:
   - Mark methods and fields private where possible
   - Annotate public methods and fields
   - Remove unused classes and methods
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-24 Thread via GitHub


tinaselenge commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1464770354


##
clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java:
##
@@ -40,7 +42,13 @@ public class FileConfigProvider implements ConfigProvider {
 
 private static final Logger log = 
LoggerFactory.getLogger(FileConfigProvider.class);
 
+public static final String ALLOWED_PATHS_CONFIG = "allowed.paths";
+public static final String ALLOWED_PATHS_DOC = "A comma separated list of 
paths that this config provider is " +
+"allowed to access. If not set, all paths are allowed.";
+private AllowedPaths allowedPaths = null;

Review Comment:
   There is something else to consider if we go down the route of throwing an 
exception(either NPE or IllegalStateException) when configure() is not called 
first.  We end up breaking tests for AbstractConfig, which is used to resolve 
the config variables, by instantiating ConfigProviders with the given 
configProvider configs and then calling get() method 
(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L536).
 We would need to make changes to this class, possibly call configure() first 
when instantiating the CPs which may or may not have further impact somewhere 
else. 
   
   Considering this, I'm not sure what's more valuable between defaulting to 
the previous behaviour of no-op and introducing an exception. What do you both 
think? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


AndrewJSchofield commented on PR #15251:
URL: https://github.com/apache/kafka/pull/15251#issuecomment-1907905085

   I see that you have a differences from the KIP. It sounds like you're 
highlighting situations in which the KIP has small deviations from normal Kafka 
practice. I would support adjusting the KIP to match.
   
   The broker software version tag doesn't seem useful. There is already 
another metric for that specific information.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


AndrewJSchofield commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1464735557


##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");
+metrics.addMetric(instanceCountMetric, instanceCount);
+registeredMetricNames.add(instanceCountMetric);
+}
+
+public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+// If one sensor of the metrics has been registered for the client 
instance,
+// then all other sensors should have been registered; and vice 
versa.
+if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+return;
+}
+
+Map tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(

Review Comment:
   This one should not include the client instance ID. The KIP is expecting a 
simple `ClientMetricsUnknownSubscriptionRequestCount` I believe. As such, I 
don't think it's an instance metric in this case.



##
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##
@@ -493,4 +519,123 @@ public void run() {
 }
 }
 }
+
+// Visible for testing
+final class ClientMetricsStats {
+
+private static final String GROUP_NAME = "ClientMetrics";
+
+// Visible for testing
+static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+static final String THROTTLE = "ClientMetricsThrottle";
+static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+// Names of sensors that are registered through client instances.
+private final Set sensorsName = ConcurrentHashMap.newKeySet();
+// List of metric names which are not specific to a client instance. 
Do not require thread
+// safe structure as it will be populated only in constructor.
+private final List registeredMetricNames = new 
ArrayList<>();
+
+private final Set instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+ClientMetricsStats() {
+Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+"The current number of client metric instances being managed 
by the broker");

Review Comment:
   I think this is a typo in the KIP really, copied to the code, but I would 
say "client metrics instances".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16189) Extend admin to support ConsumerGroupDescribe API

2024-01-24 Thread David Jacot (Jira)
David Jacot created KAFKA-16189:
---

 Summary: Extend admin to support ConsumerGroupDescribe API
 Key: KAFKA-16189
 URL: https://issues.apache.org/jira/browse/KAFKA-16189
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot
Assignee: David Jacot






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


apoorvmittal10 commented on PR #15251:
URL: https://github.com/apache/kafka/pull/15251#issuecomment-1907853039

   @AndrewJSchofield @junrao Please if you can review and provide the feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16186: Broker metrics for client telemetry (KIP-714) [kafka]

2024-01-24 Thread via GitHub


apoorvmittal10 opened a new pull request, #15251:
URL: https://github.com/apache/kafka/pull/15251

   The KIP-714 defines broker metrics 
[here](https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metrics)
 which should help determining the client instances metrics managed by the 
broker. However, there is certain change as per the metrics defined in KIP and 
the implementation which have been highlighted below. We can discuss and either 
amend the KIP or the implementation, though I prefer the former.
   
   There are 2 ways 2 emit metrics in broker i.e. using Kafka Metrics or Yammer 
Metrics. I have used Kafka Metrics as want to have minimal dependency on Yammer 
(though open for suggestions).
   
   **Changes/Query** | Metric Name | Type | Group | Tags
   -- | -- | -- | -- | --
   What should be the name of the tag for broker's version and how can we fetch 
that as I do not see any placeholder in RequestContext?  | 
ClientMetricsInstanceCount | Gauge | ClientMetrics | version: broker's software 
version
   The metric is defined as Meter which according to KafkaMetrics requires 
another metric called Rate hence 2 metrics are emitted for same as 
`ClientMetricsPluginErrorCount` and `ClientMetricsPluginErrorRate` | 
ClientMetricsPluginErrorCount | Meter | ClientMetrics | 
client_instance_idreason (reason for the failure)
   The metric is defined as Meter which according to KafkaMetrics requires 
another metric called Rate hence 2 metrics are emitted for same as 
`ClientMetricsPluginExportCount` and `ClientMetricsPluginExportRate` | 
ClientMetricsPluginExportCount | Meter | ClientMetrics | client_instance_id
   The metric is defined as Meter which according to KafkaMetrics requires 
another metric called Rate hence 2 metrics are emitted for same as 
`ClientMetricsThrottleCount` and `ClientMetricsThrottleRate` | 
ClientMetricsThrottleCount | Meter | ClientMetrics | client_instance_id
   The metric is defined as Meter which according to KafkaMetrics requires 
another metric called Rate hence 2 metrics are emitted for same as 
`ClientMetricsUnknownSubscriptionRequestCount` and 
`ClientMetricsUnknownSubscriptionRequestRate` | 
ClientMetricsUnknownSubscriptionRequestCount | Meter | ClientMetrics | client 
version: client's software version
   The metric is defined as Histogram but generally we use `Avg` and `Max` as 
metrics to capture `latency` related metrics (which requires Histogram) hence I 
have 2 metrics for same as `ClientMetricsPluginExportTimeAvg` and 
`ClientMetricsPluginExportTimeMax` | ClientMetricsPluginExportTime | Histogram 
| ClientMetrics | client_instance_id
   
   
   https://github.com/apache/kafka/assets/2861565/32172c3c-b098-49f7-b5d8-5eaabf73fcb1";>
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14577) Move ConsoleProducer to tools

2024-01-24 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17810294#comment-17810294
 ] 

Mickael Maison commented on KAFKA-14577:


Marking this as blocked by 
[KAFKA-16188|https://issues.apache.org/jira/browse/KAFKA-16188]

> Move ConsoleProducer to tools
> -
>
> Key: KAFKA-14577
> URL: https://issues.apache.org/jira/browse/KAFKA-14577
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Alexandre Dupriez
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16188) Delete deprecated kafka.common.MessageReader

2024-01-24 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-16188:
--

 Summary: Delete deprecated kafka.common.MessageReader
 Key: KAFKA-16188
 URL: https://issues.apache.org/jira/browse/KAFKA-16188
 Project: Kafka
  Issue Type: Task
Reporter: Mickael Maison
Assignee: Mickael Maison
 Fix For: 4.0.0


[KIP-641|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866569]
 introduced org.apache.kafka.tools.api.RecordReader and deprecated 
kafka.common.MessageReader in Kafka 3.5.0.

We should delete kafka.common.MessageReader in Kafka 4.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]

2024-01-24 Thread via GitHub


nizhikov commented on PR #15248:
URL: https://github.com/apache/kafka/pull/15248#issuecomment-1907703286

   CI is OK.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org