[jira] [Commented] (KAFKA-17084) Network Degrade Test fails in System Tests

2024-07-06 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863442#comment-17863442
 ] 

Chia-Ping Tsai commented on KAFKA-17084:


It pass on my local


{code:java}
docker exec ducker01 bash -c "cd /opt/kafka-dev && ducktape --cluster-file 
/opt/kafka-dev/tests/docker/build/cluster.json  
./tests/kafkatest/tests/core/network_degrade_test.py "
/usr/local/lib/python3.9/dist-packages/paramiko/transport.py:236: 
CryptographyDeprecationWarning: Blowfish has been deprecated and will be 
removed in a future release
  "class": algorithms.Blowfish,
[INFO:2024-07-05 18:26:22,698]: starting test run with session id 
2024-07-05--001...
[INFO:2024-07-05 18:26:22,698]: running 4 tests...
[INFO:2024-07-05 18:26:22,699]: Triggering test 1 of 4...
[INFO:2024-07-05 18:26:22,705]: RunnerClient: Loading test \{'directory': 
'/opt/kafka-dev/tests/kafkatest/tests/core', 'file_name': 
'network_degrade_test.py', 'cls_name': 'NetworkDegradeTest', 'method_name': 
'test_latency', 'injected_args': {'task_name': 'latency-100-rate-1000', 
'device_name': 'eth0', 'latency_ms': 50, 'rate_limit_kbit': 1000}}
[INFO:2024-07-05 18:26:22,712]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000:
 on run 1/1
[INFO:2024-07-05 18:26:22,713]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000:
 Setting up...
[INFO:2024-07-05 18:26:33,111]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000:
 Running...
[INFO:2024-07-05 18:26:53,298]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000:
 Tearing down...
[INFO:2024-07-05 18:27:02,302]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000:
 PASS
[WARNING - 2024-07-05 18:27:02,302 - runner_client - log - lineno:294]: 
RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000:
 Test requested 5 nodes, used only 4
[WARNING:2024-07-05 18:27:02,303]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000:
 Test requested 5 nodes, used only 4
[INFO:2024-07-05 18:27:02,305]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000:
 Data: None
[INFO:2024-07-05 18:27:02,313]: 
~
[INFO:2024-07-05 18:27:02,313]: Triggering test 2 of 4...
[INFO:2024-07-05 18:27:02,320]: RunnerClient: Loading test \{'directory': 
'/opt/kafka-dev/tests/kafkatest/tests/core', 'file_name': 
'network_degrade_test.py', 'cls_name': 'NetworkDegradeTest', 'method_name': 
'test_latency', 'injected_args': {'task_name': 'latency-100', 'device_name': 
'eth0', 'latency_ms': 50, 'rate_limit_kbit': 0}}
[INFO:2024-07-05 18:27:02,323]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100.device_name=eth0.latency_ms=50.rate_limit_kbit=0:
 on run 1/1
[INFO:2024-07-05 18:27:02,324]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100.device_name=eth0.latency_ms=50.rate_limit_kbit=0:
 Setting up...
[INFO:2024-07-05 18:27:13,280]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100.device_name=eth0.latency_ms=50.rate_limit_kbit=0:
 Running...
[INFO:2024-07-05 18:27:33,398]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100.device_name=eth0.latency_ms=50.rate_limit_kbit=0:
 Tearing down...
[INFO:2024-07-05 18:27:42,431]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100.device_name=eth0.latency_ms=50.rate_limit_kbit=0:
 PASS
[WARNING - 2024-07-05 18:27:42,432 - runner_client - log - lineno:294]: 
RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100.device_name=eth0.latency_ms=50.rate_limit_kbit=0:
 Test requested 5 nodes, used only 4
[WARNING:2024-07-05 18:27:42,433]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100.device_name

Re: [PR] KAFKA-13403 Fix KafkaServer crashes when deleting topics due to the race in log deletion [kafka]

2024-07-06 Thread via GitHub


soarez merged PR #11438:
URL: https://github.com/apache/kafka/pull/11438


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Move related getters to RemoteLogManagerConfig [kafka]

2024-07-06 Thread via GitHub


brandboat commented on PR #16538:
URL: https://github.com/apache/kafka/pull/16538#issuecomment-2211728398

   gentle ping @chia7712 , @kamalcph. Could you take a look when you are 
available ? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16684: Remove cache in responseData [kafka]

2024-07-06 Thread via GitHub


m1a2st commented on PR #16532:
URL: https://github.com/apache/kafka/pull/16532#issuecomment-2211750460

   @chia7712, Thanks for your comments, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-17090: Add reminder to CreateTopicsResult#config for null values of type and documentation [kafka]

2024-07-06 Thread via GitHub


mingyen066 opened a new pull request, #16539:
URL: https://github.com/apache/kafka/pull/16539

   Add reminder to the documentation of CreateTopicsResult#config to let users 
know that it is possible to obtain `type` and `documentation` with `null` 
values even if they are not defined
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16791: Add thread detection to ClusterTestExtensions [kafka]

2024-07-06 Thread via GitHub


FrankYang0529 commented on PR #16499:
URL: https://github.com/apache/kafka/pull/16499#issuecomment-2211788959

   > Could you please have a individual class to implement the thread detection?
   
   Thanks for the suggestion. I add `DetectThreadLeak` and use 
`DetectThreadLeakTest` to test it. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-17077:The node.id is inconsistent to broker.id when "broker.id.generation.enable=true" [kafka]

2024-07-06 Thread via GitHub


frankvicky opened a new pull request, #16540:
URL: https://github.com/apache/kafka/pull/16540

   We directly change the `broker.id` in `KafkaConfig` when 
`broker.id.generation.enable` is set to `true`. However, this update is NOT 
synchronized with the `node.id` in `KafkaConfig`.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-06 Thread via GitHub


m1a2st commented on PR #16019:
URL: https://github.com/apache/kafka/pull/16019#issuecomment-2212117359

   Thank you @chia7712 for the reminder, rebased.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667579599


##
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##
@@ -393,6 +398,81 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
   }
 
+  @ClusterTest(
+types = Array(Type.KRAFT),
+serverProperties = Array(
+  new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", 
value = "classic,consumer"),
+  new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+  new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+  new ClusterConfigProperty(key = "group.consumer.heartbeat.interval.ms", 
value = "5000")
+),
+features = Array(
+  new ClusterFeature(feature = Features.GROUP_VERSION, version = 1)
+)

Review Comment:
   Done



##
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##
@@ -393,6 +398,81 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
   }
 
+  @ClusterTest(
+types = Array(Type.KRAFT),
+serverProperties = Array(
+  new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", 
value = "classic,consumer"),
+  new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+  new ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
+  new ClusterConfigProperty(key = "group.consumer.heartbeat.interval.ms", 
value = "5000")
+),
+features = Array(
+  new ClusterFeature(feature = Features.GROUP_VERSION, version = 1)
+)
+  )
+  def testUpdateConsumerGroupHeartbeatConfigSuccessful(): Unit = {
+val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+val admin = cluster.createAdminClient()
+val newHeartbeatIntervalMs = 1
+val instanceId = "instanceId"
+val consumerGroupId = "grp"
+
+// Creates the __consumer_offsets topics because it won't be created 
automatically
+// in this test because it does not use FindCoordinator API.
+TestUtils.createOffsetsTopicWithAdmin(
+  admin = admin,
+  brokers = raftCluster.brokers.values().asScala.toSeq,
+  controllers = raftCluster.controllers().values().asScala.toSeq
+)
+
+// Heartbeat request to join the group. Note that the member subscribes
+// to an nonexistent topic.
+var consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+  new ConsumerGroupHeartbeatRequestData()
+.setGroupId(consumerGroupId)
+.setInstanceId(instanceId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(5 * 60 * 1000)
+.setSubscribedTopicNames(List("foo").asJava)
+.setTopicPartitions(List.empty.asJava)
+).build()
+
+// Send the request until receiving a successful response. There is a delay
+// here because the group coordinator is loaded in the background.
+var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+TestUtils.waitUntilTrue(() => {
+  consumerGroupHeartbeatResponse = 
connectAndReceive(consumerGroupHeartbeatRequest)
+  consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+}, msg = s"Could not join the group successfully. Last response 
$consumerGroupHeartbeatResponse.")
+
+// Verify the response.
+assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+assertEquals(5000, consumerGroupHeartbeatResponse.data.heartbeatIntervalMs)
+
+// Alter consumer heartbeat interval config
+val resource = new ConfigResource(ConfigResource.Type.GROUP, 
consumerGroupId)
+val op = new AlterConfigOp(new 
ConfigEntry(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 
newHeartbeatIntervalMs.toString),
+  OpType.SET)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667581915


##
core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala:
##
@@ -566,6 +574,45 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
 }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip848"))
+  def testDynamicGroupConfigChange(quorum: String): Unit = {
+val newSessionTimeoutMs = 5
+val consumerGroupId = "group-foo"
+val admin = createAdminClient()
+try {
+  val resource = new ConfigResource(ConfigResource.Type.GROUP, 
consumerGroupId)
+  val op = new AlterConfigOp(new 
ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
newSessionTimeoutMs.toString),
+OpType.SET)
+  admin.incrementalAlterConfigs(Map(resource -> 
List(op).asJavaCollection).asJava).all.get
+} finally {
+  admin.close()
+}
+
+TestUtils.retry(1) {
+  brokers.head.groupCoordinator.groupMetadataTopicConfigs()
+  val configOpt = 
brokerServers.head.groupCoordinator.groupConfig(consumerGroupId)
+  assertTrue(configOpt.isPresent)
+}
+
+val groupConfig = 
brokerServers.head.groupCoordinator.groupConfig(consumerGroupId).get()
+assertEquals(newSessionTimeoutMs, groupConfig.sessionTimeoutMs)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip848"))
+  def testIncrementalAlterDefaultGroupConfig(quorum: String): Unit = {

Review Comment:
   Consider aligning with `testIncrementalAlterDefaultTopicConfig`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667582518


##
core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala:
##
@@ -566,6 +574,45 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
 }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip848"))
+  def testDynamicGroupConfigChange(quorum: String): Unit = {
+val newSessionTimeoutMs = 5
+val consumerGroupId = "group-foo"
+val admin = createAdminClient()
+try {
+  val resource = new ConfigResource(ConfigResource.Type.GROUP, 
consumerGroupId)
+  val op = new AlterConfigOp(new 
ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
newSessionTimeoutMs.toString),
+OpType.SET)
+  admin.incrementalAlterConfigs(Map(resource -> 
List(op).asJavaCollection).asJava).all.get
+} finally {
+  admin.close()
+}
+
+TestUtils.retry(1) {
+  brokers.head.groupCoordinator.groupMetadataTopicConfigs()
+  val configOpt = 
brokerServers.head.groupCoordinator.groupConfig(consumerGroupId)
+  assertTrue(configOpt.isPresent)
+}
+
+val groupConfig = 
brokerServers.head.groupCoordinator.groupConfig(consumerGroupId).get()
+assertEquals(newSessionTimeoutMs, groupConfig.sessionTimeoutMs)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip848"))
+  def testIncrementalAlterDefaultGroupConfig(quorum: String): Unit = {
+val admin = createAdminClient()
+try {
+  val resource = new ConfigResource(ConfigResource.Type.GROUP, "")
+  val op = new AlterConfigOp(new 
ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "20"), 
OpType.SET)
+  val future = admin.incrementalAlterConfigs(Map(resource -> 
List(op).asJavaCollection).asJava).all
+  TestUtils.assertFutureExceptionTypeEquals(future, 
classOf[InvalidRequestException])

Review Comment:
   Consider aligning with testIncrementalAlterDefaultTopicConfig



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667582875


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -489,6 +489,45 @@ class KafkaApisTest extends Logging {
 testKraftForwarding(ApiKeys.ELECT_LEADERS, requestBuilder)
   }
 
+  @Test
+  def testIncrementalConsumerGroupAlterConfigs(): Unit = {
+val authorizer: Authorizer = mock(classOf[Authorizer])
+
+val consumerGroupId = "consumer_group_1"
+val resource = new ConfigResource(ConfigResource.Type.GROUP, 
consumerGroupId)
+
+authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, 
ResourceType.GROUP,
+  consumerGroupId, AuthorizationResult.ALLOWED)
+
+val requestHeader = new RequestHeader(ApiKeys.INCREMENTAL_ALTER_CONFIGS,
+  ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion, clientId, 0)
+
+val incrementalAlterConfigsRequest = 
getIncrementalConsumerGroupAlterConfigRequestBuilder(
+  Seq(resource)).build(requestHeader.apiVersion)
+val request = buildRequest(incrementalAlterConfigsRequest,
+  fromPrivilegedListener = true, requestHeader = Option(requestHeader))
+
+when(controller.isActive).thenReturn(true)
+
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+  any[Long])).thenReturn(0)
+when(adminManager.incrementalAlterConfigs(any(), 
ArgumentMatchers.eq(false)))
+  .thenReturn(Map(resource -> ApiError.NONE))
+
+createKafkaApis(authorizer = 
Some(authorizer)).handleIncrementalAlterConfigsRequest(request)
+val response = verifyNoThrottling[IncrementalAlterConfigsResponse](request)
+verifyIncrementalAlterConfigResult(response, Map(consumerGroupId -> 
Errors.NONE ))
+verify(authorizer, times(1)).authorize(any(), any())
+verify(adminManager).incrementalAlterConfigs(any(), anyBoolean())
+  }
+
+  private def 
getIncrementalConsumerGroupAlterConfigRequestBuilder(configResources: 
Seq[ConfigResource]): IncrementalAlterConfigsRequest.Builder = {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667582981


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -489,6 +489,45 @@ class KafkaApisTest extends Logging {
 testKraftForwarding(ApiKeys.ELECT_LEADERS, requestBuilder)
   }
 
+  @Test
+  def testIncrementalConsumerGroupAlterConfigs(): Unit = {
+val authorizer: Authorizer = mock(classOf[Authorizer])
+
+val consumerGroupId = "consumer_group_1"
+val resource = new ConfigResource(ConfigResource.Type.GROUP, 
consumerGroupId)
+
+authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, 
ResourceType.GROUP,
+  consumerGroupId, AuthorizationResult.ALLOWED)
+
+val requestHeader = new RequestHeader(ApiKeys.INCREMENTAL_ALTER_CONFIGS,
+  ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion, clientId, 0)
+
+val incrementalAlterConfigsRequest = 
getIncrementalConsumerGroupAlterConfigRequestBuilder(
+  Seq(resource)).build(requestHeader.apiVersion)
+val request = buildRequest(incrementalAlterConfigsRequest,
+  fromPrivilegedListener = true, requestHeader = Option(requestHeader))
+
+when(controller.isActive).thenReturn(true)
+
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+  any[Long])).thenReturn(0)
+when(adminManager.incrementalAlterConfigs(any(), 
ArgumentMatchers.eq(false)))
+  .thenReturn(Map(resource -> ApiError.NONE))
+
+createKafkaApis(authorizer = 
Some(authorizer)).handleIncrementalAlterConfigsRequest(request)
+val response = verifyNoThrottling[IncrementalAlterConfigsResponse](request)
+verifyIncrementalAlterConfigResult(response, Map(consumerGroupId -> 
Errors.NONE ))

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667583107


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.utils.Utils.require;
+
+/**
+ * Group configuration related parameters and supporting methods like 
validation, etc. are
+ * defined in this class.
+ */
+public class GroupConfig extends AbstractConfig {
+
+public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
+
+public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
+
+private static final ConfigDef CONFIG = new ConfigDef()
+.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), 
MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
+.define(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, 
atLeast(1), MEDIUM, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC);
+public GroupConfig(Map props) {

Review Comment:
   Done



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.utils.Utils.require;
+
+/**
+ * Group configuration related parameters and supporting methods like 
validation, etc. are
+ * defined in this class.
+ */
+public class GroupConfig extends AbstractConfig {
+
+public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
+
+public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
+
+private static final ConfigDef CONFIG = new ConfigDef()
+.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), 
MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
+.define(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, 
atLeast(1), MEDIUM, 
GroupCoordinat

Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667583406


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.utils.Utils.require;
+
+/**
+ * Group configuration related parameters and supporting methods like 
validation, etc. are
+ * defined in this class.
+ */
+public class GroupConfig extends AbstractConfig {
+
+public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
+
+public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
+
+private static final ConfigDef CONFIG = new ConfigDef()
+.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), 
MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
+.define(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, 
atLeast(1), MEDIUM, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC);
+public GroupConfig(Map props) {
+super(CONFIG, props, false);
+}
+
+public static Set configNames() {
+return new HashSet<>(CONFIG.names());
+}
+
+/**
+ * Check that property names are valid
+ */
+public static void validateNames(Properties props) {
+Set names = configNames();
+for (Object name : props.keySet()) {
+if (!names.contains(name)) {
+throw new InvalidConfigurationException("Unknown group config 
name: " + name);
+}
+}
+}
+
+/**
+ * Validates the values of the given properties.
+ */
+public static void validateValues(Map valueMaps, Properties 
groupConfigBounds) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667586553


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.errors.InvalidRequestException;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The group config manager is responsible for config modification and 
cleaning.
+ */
+public class GroupConfigManager implements AutoCloseable {
+
+private final GroupConfig defaultConfig;
+
+private final Map configMap;
+
+private static final Properties GROUP_CONFIG_BOUNDS = new Properties();
+
+public GroupConfigManager(
+Map defaultConfig,
+int consumerGroupMinSessionTimeoutMs,
+int consumerGroupMaxSessionTimeoutMs,
+int consumerGroupMinHeartbeatIntervalMs,
+int consumerGroupMaxHeartbeatIntervalMs
+) {
+this.configMap = new ConcurrentHashMap<>();
+this.defaultConfig = new GroupConfig(defaultConfig);
+
GROUP_CONFIG_BOUNDS.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
 consumerGroupMinSessionTimeoutMs);
+
GROUP_CONFIG_BOUNDS.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG,
 consumerGroupMaxSessionTimeoutMs);
+
GROUP_CONFIG_BOUNDS.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
 consumerGroupMinHeartbeatIntervalMs);
+
GROUP_CONFIG_BOUNDS.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
 consumerGroupMaxHeartbeatIntervalMs);
+}
+
+/**
+ * Update the configuration of the provided group.
+ *
+ * @param groupId   The group id.
+ * @param newGroupConfigThe new group config.
+ */
+public void updateGroupConfig(String groupId, Properties newGroupConfig) {
+if (null == groupId || groupId.isEmpty()) {
+throw new InvalidRequestException("Group name can't be empty.");
+}
+
+// Validate the configuration
+validate(newGroupConfig, defaultConfig.originals());

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667587404


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -640,7 +640,6 @@ private void transitionTo(
 );
 load();
 break;
-

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667588486


##
core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala:
##
@@ -114,6 +114,18 @@ class DynamicConfigPublisher(
 s"${resource.name()} with new configuration: 
${toLoggableProps(resource, props).mkString(",")} " +
 s"in $deltaName", t)
 })
+case GROUP =>
+  // Apply changes to a group's dynamic configuration.
+  
dynamicConfigHandlers.get(ConfigType.GROUP).foreach(groupConfigHandler =>
+try {
+  info(s"Updating group ${resource.name()} with new 
configuration : " +
+toLoggableProps(resource, props).mkString(","))
+  groupConfigHandler.processConfigChanges(resource.name(), 
props)

Review Comment:
   I have removed validate in updateGroupConfig.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667588626


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.utils.Utils.require;
+
+/**
+ * Group configuration related parameters and supporting methods like 
validation, etc. are
+ * defined in this class.
+ */
+public class GroupConfig extends AbstractConfig {
+
+public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
+
+public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
+
+private static final ConfigDef CONFIG = new ConfigDef()
+.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), 
MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
+.define(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, 
atLeast(1), MEDIUM, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC);
+public GroupConfig(Map props) {
+super(CONFIG, props, false);
+}
+
+public static Set configNames() {
+return new HashSet<>(CONFIG.names());
+}
+
+/**
+ * Check that property names are valid
+ */
+public static void validateNames(Properties props) {
+Set names = configNames();
+for (Object name : props.keySet()) {
+if (!names.contains(name)) {
+throw new InvalidConfigurationException("Unknown group config 
name: " + name);
+}
+}
+}
+
+/**
+ * Validates the values of the given properties.
+ */
+public static void validateValues(Map valueMaps, Properties 
groupConfigBounds) {
+int consumerHeartbeatInterval = (Integer) 
valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
+int consumerSessionTimeout = (Integer) 
valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
+require(consumerHeartbeatInterval >= (int) 
groupConfigBounds.get(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG),
+CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + "must be greater than or 
equals to" +
+
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
+require(consumerHeartbeatInterval <= (int) 
groupConfigBounds.get(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG),
+CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + "must be less than or 
equals to" +
+
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
+require(consumerSessionTimeout >= (int) 
groupConfigBounds.get(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG),
+CONSUMER_SESSION_TIMEOUT_MS_CONFIG + "must be greater than or 
equals to" +
+
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
+require(consumerSessionTimeout <= (int) 
groupConfigBounds.get(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG),
+CONSUMER_SESSION_TIMEOUT_MS_CONFIG + "must be greater than or 
equals to" +
+
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
+

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apa

[PR] KAFKA-17092: Revisit `KafkaConsumerTest#testBeginningOffsetsTimeout` for AsyncConsumer [kafka]

2024-07-06 Thread via GitHub


m1a2st opened a new pull request, #16541:
URL: https://github.com/apache/kafka/pull/16541

   https://issues.apache.org/jira/browse/KAFKA-17092 
   
   The root cause is `AsyncConsumer` will send FIND_COORDINATOR request first. 
If we are lucky today and the prepared responses are ready before handling the 
FIND_COORDINATOR request, the mock time can get advanced to trigger the timeout 
exception.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17089) Incorrect JWT parsing in OAuthBearerUnsecuredJws

2024-07-06 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-17089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863532#comment-17863532
 ] 

黃竣陽 commented on KAFKA-17089:
-

Im interesting in this issue, Could you assign to me?

> Incorrect JWT parsing in OAuthBearerUnsecuredJws
> 
>
> Key: KAFKA-17089
> URL: https://issues.apache.org/jira/browse/KAFKA-17089
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.6.2
>Reporter: Björn Löfroth
>Priority: Major
>
> The documentation for the `OAuthBearerUnsecuredJws.toMap` function correctly 
> describes that the input is Base64URL, but then goes ahead and does a simple 
> base64 decode.
> [https://github.com/apache/kafka/blob/9a7eee60727dc73f09075e971ea35909d2245f19/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java#L295]
>  
> It should probably be 
> ```
> {color:#c678dd}byte{color}{color:#abb2bf}[{color}{color:#abb2bf}]{color} 
> decode {color:#61afef}={color} 
> {color:#d19a66}Base64{color}{color:#abb2bf}.{color}{color:#61afef}getUrlDecoder{color}{color:#abb2bf}({color}{color:#abb2bf}){color}{color:#abb2bf}.{color}{color:#61afef}decode{color}{color:#abb2bf}({color}split{color:#abb2bf}){color}{color:#abb2bf};{color}
> ```
> The error I get when using Confluent Schema Registry clients:
> ```
> org.apache.kafka.common.errors.SerializationException: Error serializing JSON 
> message
>     at 
> io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer.serializeImpl(AbstractKafkaJsonSchemaSerializer.java:171)
>     at 
> io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer.serialize(KafkaJsonSchemaSerializer.java:95)
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1000)
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:947)
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:832)
>     at 
> se.ica.icc.schemaregistry.example.confluent.ProducerJsonExample.main(ProducerJsonExample.java:87)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
>     at java.base/java.lang.Thread.run(Thread.java:833)
> Caused by: 
> io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.exceptions.SchemaRegistryOauthTokenRetrieverException:
>  Error while fetching Oauth Token for Schema Registry: OAuth Token for Schema 
> Registry is Invalid
>     at 
> io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.CachedOauthTokenRetriever.getToken(CachedOauthTokenRetriever.java:74)
>     at 
> io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.OauthCredentialProvider.getBearerToken(OauthCredentialProvider.java:53)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.setAuthRequestHeaders(RestService.java:1336)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.buildConnection(RestService.java:361)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:300)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:409)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:981)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:972)
>     at 
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:574)
>     at 
> io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:571)
>     at 
> io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:554)
>     at 
> io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer.serializeImpl(AbstractKafkaJsonSchemaSerializer.java:151)
>     ... 11 more
> Caused by: 
> org.apache.kafka.common.security.oauthbearer.internals.secured.ValidateException:
>  Could not validate the access token: malformed Base64 URL encoded value
>     at 
> org.apache.kafka.common.security.oauthbearer.internals.secured.LoginAccessTokenValidator.validate(LoginAccessTokenValidator.java:93)
>     at 
> io.confluent.kafka.schemar

Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667597502


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.errors.InvalidRequestException;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The group config manager is responsible for config modification and 
cleaning.
+ */
+public class GroupConfigManager implements AutoCloseable {
+
+private final GroupConfig defaultConfig;
+
+private final Map configMap;
+
+private static final Properties GROUP_CONFIG_BOUNDS = new Properties();

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667597539


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.errors.InvalidRequestException;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The group config manager is responsible for config modification and 
cleaning.
+ */
+public class GroupConfigManager implements AutoCloseable {
+
+private final GroupConfig defaultConfig;
+
+private final Map configMap;
+
+private static final Properties GROUP_CONFIG_BOUNDS = new Properties();
+
+public GroupConfigManager(
+Map defaultConfig,
+int consumerGroupMinSessionTimeoutMs,
+int consumerGroupMaxSessionTimeoutMs,
+int consumerGroupMinHeartbeatIntervalMs,
+int consumerGroupMaxHeartbeatIntervalMs

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667597645


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -118,14 +122,26 @@ private GroupCoordinatorConfig createConfig() {
 return GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 
60L, 24);
 }
 
+private GroupConfigManager createConfigManager() {
+Map defaultConfig = new HashMap<>();
+defaultConfig.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
String.valueOf(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT));
+defaultConfig.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 
String.valueOf(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT));
+return new GroupConfigManager(defaultConfig,
+
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT,
+
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT,
+
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT,
+
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-06 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1667597826


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##
@@ -69,7 +70,8 @@ public void setup() {
 processor = new ApplicationEventProcessor(
 new LogContext(),
 requestManagers,
-metadata
+metadata,
+mock(SubscriptionState.class)

Review Comment:
   Add related test cases. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-06 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1667597763


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -213,14 +217,16 @@ private void process(final SubscriptionChangeEvent 
ignored) {
  *  the group is sent out.
  */
 private void process(final UnsubscribeEvent event) {
-if (!requestManagers.heartbeatRequestManager.isPresent()) {
-KafkaException error = new KafkaException("Group membership 
manager not present when processing an unsubscribe event");
-event.future().completeExceptionally(error);
-return;
+if (requestManagers.heartbeatRequestManager.isPresent()) {
+MembershipManager membershipManager = 
requestManagers.heartbeatRequestManager.get().membershipManager();
+CompletableFuture future = membershipManager.leaveGroup();
+future.whenComplete(complete(event.future()));
+} else {
+// If the group membership manager is not present, we can't send 
the leave group request,

Review Comment:
   Update the comment. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17017: AsyncKafkaConsumer#unsubscribe does not clean the assigned partitions [kafka]

2024-07-06 Thread via GitHub


FrankYang0529 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1667597909


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1476,21 +1478,19 @@ public void unsubscribe() {
 acquireAndEnsureOpen();
 try {
 fetchBuffer.retainAll(Collections.emptySet());
-if (groupMetadata.get().isPresent()) {
-Timer timer = time.timer(Long.MAX_VALUE);
-UnsubscribeEvent unsubscribeEvent = new 
UnsubscribeEvent(calculateDeadlineMs(timer));
-applicationEventHandler.add(unsubscribeEvent);
-log.info("Unsubscribing all topics or patterns and assigned 
partitions {}",
+Timer timer = time.timer(Long.MAX_VALUE);
+UnsubscribeEvent unsubscribeEvent = new 
UnsubscribeEvent(calculateDeadlineMs(timer));
+applicationEventHandler.add(unsubscribeEvent);
+log.info("Unsubscribing all topics or patterns and assigned 
partitions {}",
 subscriptions.assignedPartitions());
 
-try {
-processBackgroundEvents(unsubscribeEvent.future(), timer);
-log.info("Unsubscribed all topics or patterns and assigned 
partitions");
-} catch (TimeoutException e) {
-log.error("Failed while waiting for the unsubscribe event 
to complete");
-}
-resetGroupMetadata();
+try {
+processBackgroundEvents(unsubscribeEvent.future(), timer);
+log.info("Unsubscribed all topics or patterns and assigned 
partitions");
+} catch (TimeoutException e) {
+log.error("Failed while waiting for the unsubscribe event to 
complete");
 }
+if (groupMetadata.get().isPresent()) resetGroupMetadata();

Review Comment:
   Change `resetGroupMetadata` to `maybeResetGroupMetadata` and move 
`groupMetadata.get().isPresent()` to it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667598061


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.utils.Utils.require;
+
+/**
+ * Group configuration related parameters and supporting methods like 
validation, etc. are
+ * defined in this class.
+ */
+public class GroupConfig extends AbstractConfig {
+
+public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
+
+public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
+
+private static final ConfigDef CONFIG = new ConfigDef()
+.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), 
MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
+.define(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, 
atLeast(1), MEDIUM, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC);
+public GroupConfig(Map props) {
+super(CONFIG, props, false);
+}
+
+public static Set configNames() {
+return new HashSet<>(CONFIG.names());
+}
+
+/**
+ * Check that property names are valid
+ */
+public static void validateNames(Properties props) {
+Set names = configNames();
+for (Object name : props.keySet()) {
+if (!names.contains(name)) {
+throw new InvalidConfigurationException("Unknown group config 
name: " + name);
+}
+}
+}
+
+/**
+ * Validates the values of the given properties.
+ */
+public static void validateValues(Map valueMaps, Properties 
groupConfigBounds) {
+int consumerHeartbeatInterval = (Integer) 
valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
+int consumerSessionTimeout = (Integer) 
valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
+require(consumerHeartbeatInterval >= (int) 
groupConfigBounds.get(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG),
+CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + "must be greater than or 
equals to" +
+
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667598227


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.utils.Utils.require;
+
+/**
+ * Group configuration related parameters and supporting methods like 
validation, etc. are
+ * defined in this class.
+ */
+public class GroupConfig extends AbstractConfig {
+
+public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
+
+public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
+
+private static final ConfigDef CONFIG = new ConfigDef()
+.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), 
MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
+.define(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, 
atLeast(1), MEDIUM, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC);
+public GroupConfig(Map props) {
+super(CONFIG, props, false);
+}
+
+public static Set configNames() {
+return new HashSet<>(CONFIG.names());
+}
+
+/**
+ * Check that property names are valid
+ */
+public static void validateNames(Properties props) {
+Set names = configNames();
+for (Object name : props.keySet()) {
+if (!names.contains(name)) {
+throw new InvalidConfigurationException("Unknown group config 
name: " + name);
+}
+}
+}
+
+/**
+ * Validates the values of the given properties.
+ */
+public static void validateValues(Map valueMaps, Properties 
groupConfigBounds) {
+int consumerHeartbeatInterval = (Integer) 
valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
+int consumerSessionTimeout = (Integer) 
valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
+require(consumerHeartbeatInterval >= (int) 
groupConfigBounds.get(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG),

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667599111


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -135,6 +138,13 @@ public CoordinatorShardBuilder withTim
 return this;
 }
 
+public CoordinatorShardBuilder withGroupConfigManager(
+GroupConfigManager groupConfigManager
+) {
+this.groupConfigManager = groupConfigManager;
+return this;
+}
+

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667600848


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.utils.Utils.require;
+
+/**
+ * Group configuration related parameters and supporting methods like 
validation, etc. are
+ * defined in this class.
+ */
+public class GroupConfig extends AbstractConfig {
+
+public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
+
+public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
+
+private static final ConfigDef CONFIG = new ConfigDef()
+.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), 
MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
+.define(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, 
atLeast(1), MEDIUM, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC);
+public GroupConfig(Map props) {
+super(CONFIG, props, false);
+}
+
+public static Set configNames() {
+return new HashSet<>(CONFIG.names());
+}
+
+/**
+ * Check that property names are valid
+ */
+public static void validateNames(Properties props) {
+Set names = configNames();
+for (Object name : props.keySet()) {
+if (!names.contains(name)) {
+throw new InvalidConfigurationException("Unknown group config 
name: " + name);
+}
+}
+}
+
+/**
+ * Validates the values of the given properties.
+ */
+public static void validateValues(Map valueMaps, Properties 
groupConfigBounds) {
+int consumerHeartbeatInterval = (Integer) 
valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
+int consumerSessionTimeout = (Integer) 
valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
+require(consumerHeartbeatInterval >= (int) 
groupConfigBounds.get(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG),
+CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + "must be greater than or 
equals to" +
+
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
+require(consumerHeartbeatInterval <= (int) 
groupConfigBounds.get(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG),
+CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + "must be less than or 
equals to" +
+
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
+require(consumerSessionTimeout >= (int) 
groupConfigBounds.get(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG),
+CONSUMER_SESSION_TIMEOUT_MS_CONFIG + "must be greater than or 
equals to" +
+
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
+require(consumerSessionTimeout <= (int) 
groupConfigBounds.get(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG),
+CONSUMER_SESSION_TIMEOUT_MS_CONFIG + "must be greater than or 
equals to" +
+
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
+
+}
+
+public static void validate(Properties props, Properties 
groupConfigBounds) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the

Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667601409


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.utils.Utils.require;
+
+/**
+ * Group configuration related parameters and supporting methods like 
validation, etc. are
+ * defined in this class.
+ */
+public class GroupConfig extends AbstractConfig {
+
+public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
+
+public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
+
+private static final ConfigDef CONFIG = new ConfigDef()
+.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), 
MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
+.define(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, 
atLeast(1), MEDIUM, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC);
+public GroupConfig(Map props) {
+super(CONFIG, props, false);
+}
+
+public static Set configNames() {
+return new HashSet<>(CONFIG.names());
+}
+
+/**
+ * Check that property names are valid
+ */
+public static void validateNames(Properties props) {
+Set names = configNames();
+for (Object name : props.keySet()) {
+if (!names.contains(name)) {
+throw new InvalidConfigurationException("Unknown group config 
name: " + name);
+}
+}
+}
+
+/**
+ * Validates the values of the given properties.
+ */
+public static void validateValues(Map valueMaps, Properties 
groupConfigBounds) {
+int consumerHeartbeatInterval = (Integer) 
valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
+int consumerSessionTimeout = (Integer) 
valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
+require(consumerHeartbeatInterval >= (int) 
groupConfigBounds.get(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG),
+CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + "must be greater than or 
equals to" +
+
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
+require(consumerHeartbeatInterval <= (int) 
groupConfigBounds.get(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG),
+CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + "must be less than or 
equals to" +
+
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
+require(consumerSessionTimeout >= (int) 
groupConfigBounds.get(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG),
+CONSUMER_SESSION_TIMEOUT_MS_CONFIG + "must be greater than or 
equals to" +
+
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
+require(consumerSessionTimeout <= (int) 
groupConfigBounds.get(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG),
+CONSUMER_SESSION_TIMEOUT_MS_CONFIG + "must be greater than or 
equals to" +
+
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
+
+}
+
+public static void validate(Properties props, Properties 
groupConfigBounds) {
+validateNames(props);
+Map valueMaps = CONFIG.parse(props);
+validateValues(valueMaps, groupConfigBounds);
+}
+
+/**
+  

Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667582518


##
core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala:
##
@@ -566,6 +574,45 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
 }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip848"))
+  def testDynamicGroupConfigChange(quorum: String): Unit = {
+val newSessionTimeoutMs = 5
+val consumerGroupId = "group-foo"
+val admin = createAdminClient()
+try {
+  val resource = new ConfigResource(ConfigResource.Type.GROUP, 
consumerGroupId)
+  val op = new AlterConfigOp(new 
ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
newSessionTimeoutMs.toString),
+OpType.SET)
+  admin.incrementalAlterConfigs(Map(resource -> 
List(op).asJavaCollection).asJava).all.get
+} finally {
+  admin.close()
+}
+
+TestUtils.retry(1) {
+  brokers.head.groupCoordinator.groupMetadataTopicConfigs()
+  val configOpt = 
brokerServers.head.groupCoordinator.groupConfig(consumerGroupId)
+  assertTrue(configOpt.isPresent)
+}
+
+val groupConfig = 
brokerServers.head.groupCoordinator.groupConfig(consumerGroupId).get()
+assertEquals(newSessionTimeoutMs, groupConfig.sessionTimeoutMs)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip848"))
+  def testIncrementalAlterDefaultGroupConfig(quorum: String): Unit = {
+val admin = createAdminClient()
+try {
+  val resource = new ConfigResource(ConfigResource.Type.GROUP, "")
+  val op = new AlterConfigOp(new 
ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "20"), 
OpType.SET)
+  val future = admin.incrementalAlterConfigs(Map(resource -> 
List(op).asJavaCollection).asJava).all
+  TestUtils.assertFutureExceptionTypeEquals(future, 
classOf[InvalidRequestException])

Review Comment:
   Consider aligning with `testIncrementalAlterDefaultTopicConfig`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667604362


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.errors.InvalidRequestException;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The group config manager is responsible for config modification and 
cleaning.
+ */
+public class GroupConfigManager implements AutoCloseable {
+
+private final GroupConfig defaultConfig;
+
+private final Map configMap;
+
+private static final Properties GROUP_CONFIG_BOUNDS = new Properties();
+
+public GroupConfigManager(
+Map defaultConfig,
+int consumerGroupMinSessionTimeoutMs,
+int consumerGroupMaxSessionTimeoutMs,
+int consumerGroupMinHeartbeatIntervalMs,
+int consumerGroupMaxHeartbeatIntervalMs
+) {
+this.configMap = new ConcurrentHashMap<>();
+this.defaultConfig = new GroupConfig(defaultConfig);
+
GROUP_CONFIG_BOUNDS.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
 consumerGroupMinSessionTimeoutMs);
+
GROUP_CONFIG_BOUNDS.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG,
 consumerGroupMaxSessionTimeoutMs);
+
GROUP_CONFIG_BOUNDS.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
 consumerGroupMinHeartbeatIntervalMs);
+
GROUP_CONFIG_BOUNDS.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
 consumerGroupMaxHeartbeatIntervalMs);
+}
+
+/**
+ * Update the configuration of the provided group.
+ *
+ * @param groupId   The group id.
+ * @param newGroupConfigThe new group config.
+ */
+public void updateGroupConfig(String groupId, Properties newGroupConfig) {
+if (null == groupId || groupId.isEmpty()) {
+throw new InvalidRequestException("Group name can't be empty.");

Review Comment:
   Consider aligning with `ClientMetricsConfigs` 
   ```
   if (subscriptionName == null || subscriptionName.isEmpty()) {
  throw new InvalidRequestException("Subscription name can't be empty");
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667605107


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.utils.Utils.require;
+
+/**
+ * Group configuration related parameters and supporting methods like 
validation, etc. are
+ * defined in this class.
+ */
+public class GroupConfig extends AbstractConfig {
+
+public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
+
+public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
+
+private static final ConfigDef CONFIG = new ConfigDef()
+.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), 
MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
+.define(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, 
atLeast(1), MEDIUM, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC);
+public GroupConfig(Map props) {
+super(CONFIG, props, false);
+}
+
+public static Set configNames() {

Review Comment:
   It is also used in `testFromPropsInvalid`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667605626


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.errors.InvalidRequestException;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The group config manager is responsible for config modification and 
cleaning.
+ */
+public class GroupConfigManager implements AutoCloseable {
+
+private final GroupConfig defaultConfig;
+
+private final Map configMap;
+
+private static final Properties GROUP_CONFIG_BOUNDS = new Properties();
+
+public GroupConfigManager(
+Map defaultConfig,
+int consumerGroupMinSessionTimeoutMs,
+int consumerGroupMaxSessionTimeoutMs,
+int consumerGroupMinHeartbeatIntervalMs,
+int consumerGroupMaxHeartbeatIntervalMs
+) {
+this.configMap = new ConcurrentHashMap<>();
+this.defaultConfig = new GroupConfig(defaultConfig);
+
GROUP_CONFIG_BOUNDS.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
 consumerGroupMinSessionTimeoutMs);
+
GROUP_CONFIG_BOUNDS.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG,
 consumerGroupMaxSessionTimeoutMs);
+
GROUP_CONFIG_BOUNDS.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
 consumerGroupMinHeartbeatIntervalMs);
+
GROUP_CONFIG_BOUNDS.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
 consumerGroupMaxHeartbeatIntervalMs);
+}
+
+/**
+ * Update the configuration of the provided group.
+ *
+ * @param groupId   The group id.
+ * @param newGroupConfigThe new group config.
+ */
+public void updateGroupConfig(String groupId, Properties newGroupConfig) {
+if (null == groupId || groupId.isEmpty()) {
+throw new InvalidRequestException("Group name can't be empty.");
+}
+
+// Validate the configuration
+validate(newGroupConfig, defaultConfig.originals());
+
+final GroupConfig newConfig = GroupConfig.fromProps(
+defaultConfig.originals(),
+newGroupConfig
+);
+configMap.put(groupId, newConfig);
+}
+
+/**
+ * Get the group config if it exists, otherwise return None.
+ *
+ * @param groupId  The group id.
+ * @return The group config.
+ */
+public Optional groupConfig(String groupId) {
+return Optional.ofNullable(configMap.get(groupId));
+}
+
+public static void validate(Properties newGroupConfig, Map 
configuredProps) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14094: Support for first leader bootstrapping the voter set [kafka]

2024-07-06 Thread via GitHub


ahuang98 commented on code in PR #16518:
URL: https://github.com/apache/kafka/pull/16518#discussion_r1667614605


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSetOffset.java:
##
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+public class VoterSetOffset {

Review Comment:
   I was on the fence about it, I thought this might be a bit more 
explicit/clear as a return type



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16355) ConcurrentModificationException in InMemoryTimeOrderedKeyValueBuffer.evictWhile

2024-07-06 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang reassigned KAFKA-16355:
-

Assignee: (was: PoAn Yang)

> ConcurrentModificationException in 
> InMemoryTimeOrderedKeyValueBuffer.evictWhile
> ---
>
> Key: KAFKA-16355
> URL: https://issues.apache.org/jira/browse/KAFKA-16355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Mickael Maison
>Priority: Major
>
> While a Streams application was restoring its state after an outage, it hit 
> the following:
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_16, processor=KSTREAM-SOURCE-00, topic=, 
> partition=16, offset=454875695, 
> stacktrace=java.util.ConcurrentModificationException
> at java.base/java.util.TreeMap$PrivateEntryIterator.remove(TreeMap.java:1507)
> at 
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.evictWhile(InMemoryTimeOrderedKeyValueBuffer.java:423)
> at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.enforceConstraints(KTableSuppressProcessorSupplier.java:178)
> at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.process(KTableSuppressProcessorSupplier.java:165)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$4(MeteredWindowStore.java:181)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:124)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:99)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:158)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:252)
> at 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:302)
> at 
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:179)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:173)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:47)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$put$5(MeteredWindowStore.java:201)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:200)
> at 
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$WindowStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:201)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:138)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
> at 
> org.apache.kafka.streams.kstream.internals.KStream

[jira] [Commented] (KAFKA-15773) Group protocol configuration should be validated

2024-07-06 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863541#comment-17863541
 ] 

PoAn Yang commented on KAFKA-15773:
---

Hi [~pnee], I'm interested in this issue. If you're not working on it, may I 
take it? Thank you.

> Group protocol configuration should be validated
> 
>
> Key: KAFKA-15773
> URL: https://issues.apache.org/jira/browse/KAFKA-15773
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.9.0
>
>
> If the user specifies using the generic group, or not specifying the 
> group.protocol config at all, we should invalidate all group.remote.assignor
>  
> If group.local.assignor and group.remote.assignor are both configured, we 
> should also invalidate the configuration
>  
> This is an optimization/user experience improvement.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16959: ConfigCommand should allow to define both `entity-default` and `entity-name` [kafka]

2024-07-06 Thread via GitHub


m1a2st commented on PR #16381:
URL: https://github.com/apache/kafka/pull/16381#issuecomment-2212341885

   @chia7712, Thanks for your comments, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667622457


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -13426,6 +13429,99 @@ public void 
testNoConversionWhenSizeExceedsClassicMaxGroupSize() throws Exceptio
 assertEquals(Group.GroupType.CONSUMER, 
context.groupMetadataManager.group(groupId).type());
 }
 
+@Test
+public void testDynamicUpdateSessionTimeout() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+GroupConfigManager configManager = createConfigManager();

Review Comment:
   Done.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -13426,6 +13429,99 @@ public void 
testNoConversionWhenSizeExceedsClassicMaxGroupSize() throws Exceptio
 assertEquals(Group.GroupType.CONSUMER, 
context.groupMetadataManager.group(groupId).type());
 }
 
+@Test
+public void testDynamicUpdateSessionTimeout() {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-06 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667622511


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -13426,6 +13429,99 @@ public void 
testNoConversionWhenSizeExceedsClassicMaxGroupSize() throws Exceptio
 assertEquals(Group.GroupType.CONSUMER, 
context.groupMetadataManager.group(groupId).type());
 }
 
+@Test
+public void testDynamicUpdateSessionTimeout() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+GroupConfigManager configManager = createConfigManager();
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withConsumerGroupAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.addRacks()
+.build())
+.withGroupConfigManager(configManager)
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+Collections.singletonMap(memberId, new 
MemberAssignmentImpl(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+)))
+));
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult result =
+context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(9)
+.setSubscribedTopicNames(Collections.singletonList("foo"))
+.setTopicPartitions(Collections.emptyList()));
+assertEquals(1, result.response().memberEpoch());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+Collections.emptyList(),
+context.sleep(result.response().heartbeatIntervalMs())
+);
+
+// Dynamic update group config
+Properties newGroupConfig = new Properties();
+newGroupConfig.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 5);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org