[PR] KAFKA-10199: Disable state updater [kafka]

2024-07-07 Thread via GitHub


cadonna opened a new pull request, #16542:
URL: https://github.com/apache/kafka/pull/16542

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] (KAFKA-17089) Incorrect JWT parsing in OAuthBearerUnsecuredJws

2024-07-07 Thread Jira


[ https://issues.apache.org/jira/browse/KAFKA-17089 ]


黃竣陽 deleted comment on KAFKA-17089:
-

was (Author: JIRAUSER305187):
Im interesting in this issue, Could you assign to me?

> Incorrect JWT parsing in OAuthBearerUnsecuredJws
> 
>
> Key: KAFKA-17089
> URL: https://issues.apache.org/jira/browse/KAFKA-17089
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.6.2
>Reporter: Björn Löfroth
>Priority: Major
>
> The documentation for the `OAuthBearerUnsecuredJws.toMap` function correctly 
> describes that the input is Base64URL, but then goes ahead and does a simple 
> base64 decode.
> [https://github.com/apache/kafka/blob/9a7eee60727dc73f09075e971ea35909d2245f19/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java#L295]
>  
> It should probably be 
> ```
> {color:#c678dd}byte{color}{color:#abb2bf}[{color}{color:#abb2bf}]{color} 
> decode {color:#61afef}={color} 
> {color:#d19a66}Base64{color}{color:#abb2bf}.{color}{color:#61afef}getUrlDecoder{color}{color:#abb2bf}({color}{color:#abb2bf}){color}{color:#abb2bf}.{color}{color:#61afef}decode{color}{color:#abb2bf}({color}split{color:#abb2bf}){color}{color:#abb2bf};{color}
> ```
> The error I get when using Confluent Schema Registry clients:
> ```
> org.apache.kafka.common.errors.SerializationException: Error serializing JSON 
> message
>     at 
> io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer.serializeImpl(AbstractKafkaJsonSchemaSerializer.java:171)
>     at 
> io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer.serialize(KafkaJsonSchemaSerializer.java:95)
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1000)
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:947)
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:832)
>     at 
> se.ica.icc.schemaregistry.example.confluent.ProducerJsonExample.main(ProducerJsonExample.java:87)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
>     at java.base/java.lang.Thread.run(Thread.java:833)
> Caused by: 
> io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.exceptions.SchemaRegistryOauthTokenRetrieverException:
>  Error while fetching Oauth Token for Schema Registry: OAuth Token for Schema 
> Registry is Invalid
>     at 
> io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.CachedOauthTokenRetriever.getToken(CachedOauthTokenRetriever.java:74)
>     at 
> io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.OauthCredentialProvider.getBearerToken(OauthCredentialProvider.java:53)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.setAuthRequestHeaders(RestService.java:1336)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.buildConnection(RestService.java:361)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:300)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:409)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:981)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:972)
>     at 
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:574)
>     at 
> io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:571)
>     at 
> io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:554)
>     at 
> io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer.serializeImpl(AbstractKafkaJsonSchemaSerializer.java:151)
>     ... 11 more
> Caused by: 
> org.apache.kafka.common.security.oauthbearer.internals.secured.ValidateException:
>  Could not validate the access token: malformed Base64 URL encoded value
>     at 
> org.apache.kafka.common.security.oauthbearer.internals.secured.LoginAccessTokenValidator.validate(LoginAccessTokenValidator.java:93)
>     at 
> io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.CachedOauthTokenRetriever.getT

Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-07 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667624420


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.utils.Utils.require;
+
+/**
+ * Group configuration related parameters and supporting methods like 
validation, etc. are
+ * defined in this class.
+ */
+public class GroupConfig extends AbstractConfig {
+
+public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
+
+public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
+
+private static final ConfigDef CONFIG = new ConfigDef()
+.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), 
MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)

Review Comment:
   We should validate name when alter config.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.utils.Utils.require;
+
+/**
+ * Group configuration related parameters and supporting methods like 
validation, etc. are
+ * defined in this class.
+ */
+public class GroupConfig extends AbstractConfig {
+
+public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
+
+public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
+
+private static final ConfigDef CONFIG = new ConfigDef()
+.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), 
MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)

Review Comment:
   We should validate name when alter config.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-07 Thread via GitHub


DL1231 commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1667624742


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.utils.Utils.require;
+
+/**
+ * Group configuration related parameters and supporting methods like 
validation, etc. are
+ * defined in this class.
+ */
+public class GroupConfig extends AbstractConfig {
+
+public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
+
+public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
+
+private static final ConfigDef CONFIG = new ConfigDef()
+.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, INT, 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), 
MEDIUM, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)

Review Comment:
   We need to validate config name when altering config.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]

2024-07-07 Thread via GitHub


DL1231 commented on PR #15067:
URL: https://github.com/apache/kafka/pull/15067#issuecomment-2212355209

   @dajac Sorry for the delay, I've updated the PR, PTAL when you get a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15773: Group protocol configuration should be validated [kafka]

2024-07-07 Thread via GitHub


FrankYang0529 opened a new pull request, #16543:
URL: https://github.com/apache/kafka/pull/16543

   * Add `group.local.assignors` to `ConsumerConfig`.
   * If `group.protocol` is not `consumer`, set `group.remote.assignor` and 
`group.local.assignors` as default value.
   * If both `group.remote.assignor` and `group.local.assignors` are set, throw 
`InvalidConfigurationException`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15773) Group protocol configuration should be validated

2024-07-07 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863546#comment-17863546
 ] 

PoAn Yang commented on KAFKA-15773:
---

Hi [~pnee], I just created a draft PR 
[https://github.com/apache/kafka/pull/16543]. Feel free to close it, if you're 
working on it. Thank you.

> Group protocol configuration should be validated
> 
>
> Key: KAFKA-15773
> URL: https://issues.apache.org/jira/browse/KAFKA-15773
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.9.0
>
>
> If the user specifies using the generic group, or not specifying the 
> group.protocol config at all, we should invalidate all group.remote.assignor
>  
> If group.local.assignor and group.remote.assignor are both configured, we 
> should also invalidate the configuration
>  
> This is an optimization/user experience improvement.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-17091: Add @FunctionalInterface to Streams interfaces [kafka]

2024-07-07 Thread via GitHub


raymcdermott opened a new pull request, #16544:
URL: https://github.com/apache/kafka/pull/16544

   ### Goal
   
   Add @FunctionalInterface to Streams interfaces to enable Clojure 1.12 (and 
any other JVM language) to have the correct hints for these SAM interfaces.
   
   The Clojure issue is [here 
](https://ask.clojure.org/index.php/13908/expand-fi-adapting-to-sam-types-not-marked-as-fi)
   
   ### Testing
   Unit tests and system tests all pass locally.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16943: Synchronously verify Connect worker startup failure in InternalTopicsIntegrationTest [kafka]

2024-07-07 Thread via GitHub


C0urante commented on code in PR #16451:
URL: https://github.com/apache/kafka/pull/16451#discussion_r1667658373


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -231,6 +232,8 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
 Thread herderThread;
 
 private final DistributedConfig config;
+
+public static Future herderFuture;

Review Comment:
   I don't think this can be `static`; if it is, we won't be able to use it 
with multiple workers in a single integration test (which we do very 
frequently).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-10199: Close pending active tasks to init on partitions lost [kafka]

2024-07-07 Thread via GitHub


cadonna opened a new pull request, #16545:
URL: https://github.com/apache/kafka/pull/16545

   With enabled state updater tasks that are created but not initialized are 
stored in a set. In each poll iteration the stream thread drains that set, 
intializes the tasks, and adds them to the state updater.
   
   On partition lost, all active tasks are closed.
   
   This commit ensures that active tasks pending initialization in the set 
mentioned above are closed cleanly on partition lost.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Close pending active tasks to init on partitions lost [kafka]

2024-07-07 Thread via GitHub


cadonna commented on code in PR #16545:
URL: https://github.com/apache/kafka/pull/16545#discussion_r1667658734


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -107,6 +108,20 @@ public Set drainPendingTasksToInit() {
 return result;
 }
 
+@Override
+public Set drainPendingActiveTasksToInit() {

Review Comment:
   Helper method for the fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Close pending active tasks to init on partitions lost [kafka]

2024-07-07 Thread via GitHub


cadonna commented on code in PR #16545:
URL: https://github.com/apache/kafka/pull/16545#discussion_r1667658628


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1233,11 +1233,11 @@ private void closeRunningTasksDirty() {
 maybeUnlockTasks(allTaskIds);
 }
 
-private void removeLostActiveTasksFromStateUpdater() {
+private void removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit() {
 if (stateUpdater != null) {
 final Map> futures = new 
LinkedHashMap<>();
 final Map failedTasksDuringCleanClose = 
new HashMap<>();
-final Set tasksToCloseClean = new HashSet<>();
+final Set tasksToCloseClean = new 
HashSet<>(tasks.drainPendingActiveTasksToInit());

Review Comment:
   This is the actual fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17044) Connector deletion can lead to resource leak during a long running connector startup

2024-07-07 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863559#comment-17863559
 ] 

Chris Egerton commented on KAFKA-17044:
---

[~bgoyal] I think you should reconsider the implementation of your connector. 
Instead of blocking in {{{}start{}}}, you can perform retries in a separate 
thread, and whenever a new set of task configurations needs to be generated 
(e.g., when a db connection has finally been established), invoke 
[context.requestTaskReconfiguration|https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/connector/ConnectorContext.html#requestTaskReconfiguration()]
 to signal to the runtime that {{taskConfigs}} should be called again.

> Connector deletion can lead to resource leak during a long running connector 
> startup
> 
>
> Key: KAFKA-17044
> URL: https://issues.apache.org/jira/browse/KAFKA-17044
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Bhagyashree
>Priority: Major
>
> We have identified a gap in the shutdown flow for the connector worker. If 
> the connector is in 
> [INIT|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L403-L404]
>  state and still executing the 
> [WorkerConnector::doStart|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L207-L218]
>  method, a DELETE API call would invoke the 
> [WorkerConnector::shutdown|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L294-L298]
>  and [notify() 
> |https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L297]but
>  the connector worker would not shutdown immediately. This happens because 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  is a blocking call and the control reaches 
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
>  in 
> [doRun()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L151]
>  after the 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  call has completed. This results in a gap in the delete flow where the 
> connector is not immediately shutdown leaving the resources running. 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  keeps running and only when the execution of 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  completes, we reach at the point of 
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
>  and then 
> [doShutdown()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L183]
>  of the connector worker is invoked.
> This seems similar to what has been identified for connector tasks as part of 
> https://issues.apache.org/jira/browse/KAFKA-14725.
> *Steps to repro*
> 1. Start a connector with time taking operation in 
> [connector.start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  call
> 2. Call DELETE API to delete this connector
> 3. The connector would be deleted only after the 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  completes.
> The issue was observed when a connector was configured to retry a db 
> connection for sometime. 
> {*}Current Behaviour{*}: The connector did not shutdown until the 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  method completed.
> {*}Expected Behaviou{*}r: The connector should abort what it is doing and 
> shutdown as requested by the Delete call.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-17044) Connector deletion can lead to resource leak during a long running connector startup

2024-07-07 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863559#comment-17863559
 ] 

Chris Egerton edited comment on KAFKA-17044 at 7/7/24 11:31 AM:


[~bgoyal] I think you should reconsider the implementation of your connector. 
Instead of blocking in {{{}start{}}}, you can perform retries in a separate 
thread, and whenever a new set of task configurations needs to be generated 
(e.g., when a db connection has finally been established), invoke 
[context.requestTaskReconfiguration|https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/connector/ConnectorContext.html#requestTaskReconfiguration()]
 to signal to the runtime that {{taskConfigs}} should be called again.

It's possible for us to interrupt connector threads when they appear blocked, 
but this doesn't guarantee much because ultimately it's up to your connector, 
the libraries it calls, and even your JVM to respond to thread interrupts 
correctly and in several cases this simply doesn't happen.


was (Author: chrisegerton):
[~bgoyal] I think you should reconsider the implementation of your connector. 
Instead of blocking in {{{}start{}}}, you can perform retries in a separate 
thread, and whenever a new set of task configurations needs to be generated 
(e.g., when a db connection has finally been established), invoke 
[context.requestTaskReconfiguration|https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/connector/ConnectorContext.html#requestTaskReconfiguration()]
 to signal to the runtime that {{taskConfigs}} should be called again.

 

It's possible for us to interrupt connector threads when they appear blocked, 
but this doesn't guarantee much because ultimately it's up to your connector, 
the libraries it calls, and even your JVM to respond to thread interrupts 
correctly and in several cases this simply doesn't happen.

> Connector deletion can lead to resource leak during a long running connector 
> startup
> 
>
> Key: KAFKA-17044
> URL: https://issues.apache.org/jira/browse/KAFKA-17044
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Bhagyashree
>Priority: Major
>
> We have identified a gap in the shutdown flow for the connector worker. If 
> the connector is in 
> [INIT|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L403-L404]
>  state and still executing the 
> [WorkerConnector::doStart|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L207-L218]
>  method, a DELETE API call would invoke the 
> [WorkerConnector::shutdown|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L294-L298]
>  and [notify() 
> |https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L297]but
>  the connector worker would not shutdown immediately. This happens because 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  is a blocking call and the control reaches 
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
>  in 
> [doRun()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L151]
>  after the 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  call has completed. This results in a gap in the delete flow where the 
> connector is not immediately shutdown leaving the resources running. 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  keeps running and only when the execution of 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  completes, we reach at the point of 
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
>  and then 
> [doShutdown()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L183]
>  of the connector worker is invoked.
> This seems similar to what has been identified for connector tasks as part of 
> https://issues.apache.org/jira/browse/KAFKA-14725.
> *Steps to repro*
> 1. Start a

[jira] [Comment Edited] (KAFKA-17044) Connector deletion can lead to resource leak during a long running connector startup

2024-07-07 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863559#comment-17863559
 ] 

Chris Egerton edited comment on KAFKA-17044 at 7/7/24 11:31 AM:


[~bgoyal] I think you should reconsider the implementation of your connector. 
Instead of blocking in {{{}start{}}}, you can perform retries in a separate 
thread, and whenever a new set of task configurations needs to be generated 
(e.g., when a db connection has finally been established), invoke 
[context.requestTaskReconfiguration|https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/connector/ConnectorContext.html#requestTaskReconfiguration()]
 to signal to the runtime that {{taskConfigs}} should be called again.

 

It's possible for us to interrupt connector threads when they appear blocked, 
but this doesn't guarantee much because ultimately it's up to your connector, 
the libraries it calls, and even your JVM to respond to thread interrupts 
correctly and in several cases this simply doesn't happen.


was (Author: chrisegerton):
[~bgoyal] I think you should reconsider the implementation of your connector. 
Instead of blocking in {{{}start{}}}, you can perform retries in a separate 
thread, and whenever a new set of task configurations needs to be generated 
(e.g., when a db connection has finally been established), invoke 
[context.requestTaskReconfiguration|https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/connector/ConnectorContext.html#requestTaskReconfiguration()]
 to signal to the runtime that {{taskConfigs}} should be called again.

> Connector deletion can lead to resource leak during a long running connector 
> startup
> 
>
> Key: KAFKA-17044
> URL: https://issues.apache.org/jira/browse/KAFKA-17044
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Bhagyashree
>Priority: Major
>
> We have identified a gap in the shutdown flow for the connector worker. If 
> the connector is in 
> [INIT|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L403-L404]
>  state and still executing the 
> [WorkerConnector::doStart|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L207-L218]
>  method, a DELETE API call would invoke the 
> [WorkerConnector::shutdown|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L294-L298]
>  and [notify() 
> |https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L297]but
>  the connector worker would not shutdown immediately. This happens because 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  is a blocking call and the control reaches 
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
>  in 
> [doRun()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L151]
>  after the 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  call has completed. This results in a gap in the delete flow where the 
> connector is not immediately shutdown leaving the resources running. 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  keeps running and only when the execution of 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  completes, we reach at the point of 
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
>  and then 
> [doShutdown()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L183]
>  of the connector worker is invoked.
> This seems similar to what has been identified for connector tasks as part of 
> https://issues.apache.org/jira/browse/KAFKA-14725.
> *Steps to repro*
> 1. Start a connector with time taking operation in 
> [connector.start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  call
> 2. Call DELETE API to delete this connector
> 3. The connector would be deleted only af

Re: [PR] KAFKA-16731: Added share group metrics class. [kafka]

2024-07-07 Thread via GitHub


omkreddy merged PR #16488:
URL: https://github.com/apache/kafka/pull/16488


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17073) Deprecate ReplicaVerificationTool in 3.9

2024-07-07 Thread Dongjin Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863568#comment-17863568
 ] 

Dongjin Lee commented on KAFKA-17073:
-

KIP: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=311627623

> Deprecate ReplicaVerificationTool in 3.9
> 
>
> Key: KAFKA-17073
> URL: https://issues.apache.org/jira/browse/KAFKA-17073
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Dongjin Lee
>Priority: Minor
>  Labels: need-kip
> Fix For: 3.9.0
>
>
> see discussion 
> https://lists.apache.org/thread/6zz7xwps8lq2lxfo5bhyl4cggh64c5py
> In short, the tool is useless and so it is good time to deprecate it in 3.9. 
> That enables us to remove it from 4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12899) Support --bootstrap-server in ReplicaVerificationTool

2024-07-07 Thread Dongjin Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjin Lee resolved KAFKA-12899.
-
Resolution: Won't Fix

Replaced by KAFKA-17073

> Support --bootstrap-server in ReplicaVerificationTool
> -
>
> Key: KAFKA-12899
> URL: https://issues.apache.org/jira/browse/KAFKA-12899
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Minor
>  Labels: needs-kip
>
> kafka.tools.ReplicaVerificationTool still uses --broker-list, breaking 
> consistency with other (already migrated) tools.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [WIP] KAFKA-12899: Support --bootstrap-server in ReplicaVerificationTool [kafka]

2024-07-07 Thread via GitHub


dongjinleekr closed pull request #10827: [WIP] KAFKA-12899: Support 
--bootstrap-server in ReplicaVerificationTool
URL: https://github.com/apache/kafka/pull/10827


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] KAFKA-12899: Support --bootstrap-server in ReplicaVerificationTool [kafka]

2024-07-07 Thread via GitHub


dongjinleekr commented on PR #10827:
URL: https://github.com/apache/kafka/pull/10827#issuecomment-2212445900

   Closes this PR, since 
[KAFKA-12899](https://issues.apache.org/jira/browse/KAFKA-12899) has been 
replaced by [KAFKA-17073](https://issues.apache.org/jira/browse/KAFKA-17073).
   
   Thank you to everyone who participated in the discussion. Let's continue on 
KAFKA-17073! :bowing_man:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17092: Revisit `KafkaConsumerTest#testBeginningOffsetsTimeout` for AsyncConsumer [kafka]

2024-07-07 Thread via GitHub


chia7712 commented on PR #16541:
URL: https://github.com/apache/kafka/pull/16541#issuecomment-2212494290

   @m1a2st Thanks for this patch. I know this patch is based on my comments on 
the jira. However, this patch can has a side effect: the mock time can get 
advanced even though the response is consumed by non-LIST_OFFSET request. That 
is not the case we want to verify. The story should have following chapters:
   
   1. the LIST_OFFSET gets response but the response carries error ( it proves 
RPC is good)
   2. the incomplete request can get timeout
   
   WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Wait all brokers to start running [kafka]

2024-07-07 Thread via GitHub


chia7712 merged PR #16537:
URL: https://github.com/apache/kafka/pull/16537


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Move related getters to RemoteLogManagerConfig [kafka]

2024-07-07 Thread via GitHub


chia7712 merged PR #16538:
URL: https://github.com/apache/kafka/pull/16538


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15773) Group protocol configuration should be validated

2024-07-07 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-15773:
--

Assignee: PoAn Yang

> Group protocol configuration should be validated
> 
>
> Key: KAFKA-15773
> URL: https://issues.apache.org/jira/browse/KAFKA-15773
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: PoAn Yang
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.9.0
>
>
> If the user specifies using the generic group, or not specifying the 
> group.protocol config at all, we should invalidate all group.remote.assignor
>  
> If group.local.assignor and group.remote.assignor are both configured, we 
> should also invalidate the configuration
>  
> This is an optimization/user experience improvement.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17081 Tweak GroupCoordinatorConfig: re-introduce local attributes and validation [kafka]

2024-07-07 Thread via GitHub


chia7712 merged PR #16524:
URL: https://github.com/apache/kafka/pull/16524


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-17081) Tweak GroupCoordinatorConfig: re-introduce local attributes and validation

2024-07-07 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-17081.

Fix Version/s: 3.9.0
   Resolution: Fixed

> Tweak GroupCoordinatorConfig: re-introduce local attributes and validation
> --
>
> Key: KAFKA-17081
> URL: https://issues.apache.org/jira/browse/KAFKA-17081
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Minor
> Fix For: 3.9.0
>
>
> see discussion: 
> https://github.com/apache/kafka/pull/16458#issuecomment-220683



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16684: Remove cache in responseData [kafka]

2024-07-07 Thread via GitHub


chia7712 commented on code in PR #16532:
URL: https://github.com/apache/kafka/pull/16532#discussion_r1667732886


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##
@@ -3665,6 +3670,16 @@ public void 
testWhenFetchResponseReturnsALeaderShipChangeErrorAndNewLeaderInform
 // Validate subscription is still valid & fetch-able for tp1.
 assertTrue(subscriptions.isFetchable(tp1));
 }
+
+@Test
+public void testFetcherDontCacheAnyData() {
+short version = 17;
+FetchResponse fetchResponse = fetchResponse(tidp0, records, 
Errors.NONE, 100L, -1L, 0L, 0);
+fetchResponse.responseData(topicNames, version)
+.forEach((topicPartition, partitionData) -> 
assertEquals(records, partitionData.records()));
+fetchResponse.responseData(new HashMap<>(), version)

Review Comment:
   `Collections.emptyMap()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16791: Add thread detection to ClusterTestExtensions [kafka]

2024-07-07 Thread via GitHub


chia7712 commented on code in PR #16499:
URL: https://github.com/apache/kafka/pull/16499#discussion_r1667734868


##
core/src/test/java/kafka/test/junit/DetectThreadLeak.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import org.apache.kafka.test.TestUtils;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DetectThreadLeak {

Review Comment:
   The purpose of this class is accurate, so maybe we can simplify it to a 
`FunctionalInterface`. For example:
   
   ```java
   interface DetectThreadLeak {
   // return the new threads after `DetectThreadLeak` is created
   List newThreads();
   }
   ```
   
   Also, we can create `DetectThreadLeak` with filter
   ```java
   DetectThreadLeak.filter(Predicator);
   ```



##
core/src/test/java/kafka/test/junit/DetectThreadLeak.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import org.apache.kafka.test.TestUtils;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DetectThreadLeak {
+private Set threads;
+private Set expectedThreadNames = new HashSet<>(
+Arrays.asList(
+"Attach Listener", "client-metrics-reaper", "executor-", 
"feature-zk-node-event-process-thread",
+"ForkJoinPool", "JMX", "junit-", 
"metrics-meter-tick-thread", "pool-", "process reaper", "RMI",
+"scala-"
+)
+);
+
+public DetectThreadLeak(Optional> overrideExpectedThreadNames) 
{
+overrideExpectedThreadNames.ifPresent(strings -> expectedThreadNames = 
strings);
+System.out.println("Expected thread names: " + expectedThreadNames);
+}
+
+public void beforeEach() {
+Thread.getAllStackTraces().keySet().forEach(thread -> {

Review Comment:
   Maybe we can store the  instead of whole thread?



##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -82,7 +84,9 @@
  * SomeIntegrationTest will be instantiated, lifecycle methods (before/after) 
will be run, and "someTest" will be invoked.
  *
  */
-public class ClusterTestExtensions implements 
TestTemplateInvocationContextProvider {
+public class ClusterTestExtensions implements 
TestTemplateInvocationContextProvider, BeforeEachCallback, AfterEachCallback {

Review Comment:
   Maybe this hook should be move to `RaftClusterInvocationContext` and 
`ZkClusterInvocationContext`. The tests using those clusters are what we care 
about.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR:Topic command integration test migrate to new test infra [kafka]

2024-07-07 Thread via GitHub


chia7712 commented on code in PR #16127:
URL: https://github.com/apache/kafka/pull/16127#discussion_r1667735984


##
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java:
##
@@ -86,652 +85,952 @@
 
 @Tag("integration")
 @SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
-public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTestHarness implements Logging, RackAwareTest {
+@ExtendWith(ClusterTestExtensions.class)
+public class TopicCommandIntegrationTest {
 private final short defaultReplicationFactor = 1;
 private final int defaultNumPartitions = 1;
-private TopicCommand.TopicService topicService;
-private Admin adminClient;
-private String bootstrapServer;
-private String testTopicName;
-private Buffer scalaBrokers;
-private Seq scalaControllers;
 
-/**
- * Implementations must override this method to return a set of 
KafkaConfigs. This method will be invoked for every
- * test and should not reuse previous configurations unless they select 
their ports randomly when servers are started.
- *
- * Note the replica fetch max bytes is set to `1` in order to throttle the 
rate of replication for test
- * `testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress`.
- */
-@Override
-public scala.collection.Seq generateConfigs() {
-Map rackInfo = new HashMap<>();
-rackInfo.put(0, "rack1");
-rackInfo.put(1, "rack2");
-rackInfo.put(2, "rack2");
-rackInfo.put(3, "rack1");
-rackInfo.put(4, "rack3");
-rackInfo.put(5, "rack3");
-
-List brokerConfigs = ToolsTestUtils
-.createBrokerProperties(6, zkConnectOrNull(), rackInfo, 
defaultNumPartitions, defaultReplicationFactor);
-
-List configs = new ArrayList<>();
-for (Properties props : brokerConfigs) {
-props.put(REPLICA_FETCH_MAX_BYTES_CONFIG, "1");
-configs.add(KafkaConfig.fromProps(props));
-}
-return JavaConverters.asScalaBuffer(configs).toSeq();
-}
+private final ClusterInstance clusterInstance;
 
 private TopicCommand.TopicCommandOptions 
buildTopicCommandOptionsWithBootstrap(String... opts) {
+String bootstrapServer = clusterInstance.bootstrapServers();
 String[] finalOptions = Stream.concat(Arrays.stream(opts),
 Stream.of("--bootstrap-server", bootstrapServer)
 ).toArray(String[]::new);
 return new TopicCommand.TopicCommandOptions(finalOptions);
 }
 
-@BeforeEach
-public void setUp(TestInfo info) {
-super.setUp(info);
-// create adminClient
-Properties props = new Properties();
-bootstrapServer = bootstrapServers(listenerName());
-props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
-adminClient = Admin.create(props);
-topicService = new TopicCommand.TopicService(props, 
Optional.of(bootstrapServer));
-testTopicName = String.format("%s-%s", 
info.getTestMethod().get().getName(), 
org.apache.kafka.test.TestUtils.randomString(10));
-scalaBrokers = brokers();
-scalaControllers = controllerServers();
+static List generate1() {
+Map serverProp = new HashMap<>();
+serverProp.put(REPLICA_FETCH_MAX_BYTES_CONFIG, "1"); // if config name 
error, no exception throw
+
+Map> rackInfo = new HashMap<>();
+Map infoPerBroker1 = new HashMap<>();
+infoPerBroker1.put("broker.rack", "rack1");
+Map infoPerBroker2 = new HashMap<>();
+infoPerBroker2.put("broker.rack", "rack2");
+Map infoPerBroker3 = new HashMap<>();
+infoPerBroker3.put("broker.rack", "rack2");
+Map infoPerBroker4 = new HashMap<>();
+infoPerBroker4.put("broker.rack", "rack1");
+Map infoPerBroker5 = new HashMap<>();
+infoPerBroker5.put("broker.rack", "rack3");
+Map infoPerBroker6 = new HashMap<>();
+infoPerBroker6.put("broker.rack", "rack3");
+
+rackInfo.put(0, infoPerBroker1);
+rackInfo.put(1, infoPerBroker2);
+rackInfo.put(2, infoPerBroker3);
+rackInfo.put(3, infoPerBroker4);
+rackInfo.put(4, infoPerBroker5);
+rackInfo.put(5, infoPerBroker6);
+
+return Collections.singletonList(ClusterConfig.defaultBuilder()
+.setBrokers(6)
+.setServerProperties(serverProp)
+.setPerServerProperties(rackInfo)
+.build()
+);
 }
 
-@AfterEach
-public void close() throws Exception {
-if (topicService != null)
-topicService.close();
-if (adminClient != null)
-adminClient.close();
+TopicCommandIntegrationTest(ClusterInstance clusterInstance) {
+this.clusterInstance = clusterInstance;
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"z

Re: [PR] KAFKA-16684: Remove cache in responseData [kafka]

2024-07-07 Thread via GitHub


m1a2st commented on PR #16532:
URL: https://github.com/apache/kafka/pull/16532#issuecomment-2212596673

   @chia7712, Thanks for your comments, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17092: Revisit `KafkaConsumerTest#testBeginningOffsetsTimeout` for AsyncConsumer [kafka]

2024-07-07 Thread via GitHub


m1a2st commented on PR #16541:
URL: https://github.com/apache/kafka/pull/16541#issuecomment-2212635839

   It's a good idea, I will do that


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-07 Thread via GitHub


chia7712 commented on PR #16019:
URL: https://github.com/apache/kafka/pull/16019#issuecomment-2212670744

   @m1a2st please take a look at build error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-17085) Streams Cooperative Rebalance Upgrade Test fails in System Tests

2024-07-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-17085:
---

Assignee: Matthias J. Sax

> Streams Cooperative Rebalance Upgrade Test fails in System Tests
> 
>
> Key: KAFKA-17085
> URL: https://issues.apache.org/jira/browse/KAFKA-17085
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Affects Versions: 3.8.0
>Reporter: Josep Prat
>Assignee: Matthias J. Sax
>Priority: Critical
>
> StreamsCooperativeRebalanceUpgradeTest fails on system tests when upgrading 
> from: 2.1.1, 2.2.2 and 2.3.1.
> Tests that fail:
>  
> {noformat}
> Module: kafkatest.tests.streams.streams_cooperative_rebalance_upgrade_test
> Class:  StreamsCooperativeRebalanceUpgradeTest
> Method: test_upgrade_to_cooperative_rebalance
> Arguments:
> {
>   "upgrade_from_version": "2.1.1"
> }
>  
> {noformat}
> and
>  
> {noformat}
> Module: kafkatest.tests.streams.streams_cooperative_rebalance_upgrade_test
> Class:  StreamsCooperativeRebalanceUpgradeTest
> Method: test_upgrade_to_cooperative_rebalance
> Arguments:
> {
>   "upgrade_from_version": "2.2.2"
> }
> {noformat}
> and
>  
>  
> {noformat}
> Module: kafkatest.tests.streams.streams_cooperative_rebalance_upgrade_test
> Class:  StreamsCooperativeRebalanceUpgradeTest
> Method: test_upgrade_to_cooperative_rebalance
> Arguments:
> {
>   "upgrade_from_version": "2.3.1"
> }
> {noformat}
>  
> Failure for 2.1.1 is:
> {noformat}
> TimeoutError("Never saw 'first_bounce_phase-Processed [0-9]* records so far' 
> message ubuntu@worker28")
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py",
>  line 101, in test_upgrade_to_cooperative_rebalance
> self.maybe_upgrade_rolling_bounce_and_verify(processors,
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py",
>  line 182, in maybe_upgrade_rolling_bounce_and_verify
> stdout_monitor.wait_until(verify_processing_msg,
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
>  line 736, in wait_until
> return wait_until(lambda: self.acct.ssh("tail -c +%d %s | grep '%s'" % 
> (self.offset + 1, self.log, pattern),
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Never saw 'first_bounce_phase-Processed [0-9]* 
> records so far' message ubuntu@worker28{noformat}
> Failure for 2.2.2 is:
> {noformat}
> TimeoutError("Never saw 'first_bounce_phase-Processed [0-9]* records so far' 
> message ubuntu@worker5")
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py",
>  line 101, in test_upgrade_to_cooperative_rebalance
> self.maybe_upgrade_rolling_bounce_and_verify(processors,
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py",
>  line 182, in maybe_upgrade_rolling_bounce_and_verify
> stdout_monitor.wait_until(verify_processing_msg,
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
>  line 736, in wait_until
> return wait_until(lambda: self.acct.ssh("tail -c +%d %s | grep '%s'" % 
> (self.offset + 1, self.log, pattern),
>   File 

[jira] [Commented] (KAFKA-13499) Avoid restoring outdated records

2024-07-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863639#comment-17863639
 ] 

Matthias J. Sax commented on KAFKA-13499:
-

It's been a while since I filed these tickets... Not even sure if I did look at 
(ie remember KAFKA-7934) when I filing this ticket. – So not sure if they are 
_substantively_ different or are the same.

The main difference addressing your second question is, that stream-stream join 
state stores are not exposed via IQ, and thus we can be more aggressive and 
restore less data compared to windowed and sessions stores for which we need to 
restore a longer history to make the data available for IQ queries.

> Avoid restoring outdated records
> 
>
> Key: KAFKA-13499
> URL: https://issues.apache.org/jira/browse/KAFKA-13499
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Danica Fine
>Priority: Major
>
> Kafka Streams has the config `windowstore.changelog.additional.retention.ms` 
> to allow for an increase retention time.
> While an increase retention time can be useful, it can also lead to 
> unnecessary restore cost, especially for stream-stream joins. Assume a 
> stream-stream join with 1h window size and a grace period of 1h. For this 
> case, we only need 2h of data to restore. If we lag, the 
> `windowstore.changelog.additional.retention.ms` helps to prevent the broker 
> from truncating data too early. However, if we don't lag and we need to 
> restore, we restore everything from the changelog.
> Instead of doing a seek-to-beginning, we could use the timestamp index to 
> seek the first offset older than the 2h "window" of data that we need to 
> restore, to avoid unnecessary work.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16955: fix synchronization of streams threadState [kafka]

2024-07-07 Thread via GitHub


mjsax commented on PR #16337:
URL: https://github.com/apache/kafka/pull/16337#issuecomment-2212782921

   > Oh nevermind, 3.8 release is still ongoing duh. Need to wait for it to 
finish before backporting this.
   
   Yes. Also not sure how severe this one is? Might also be not the end of the 
world if not back-ported for 3.8.1? -- I did not plan to backport, but if you 
want to cherry-pick is later, be my guest :) 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Minor: Consumer group response should set error msg [kafka]

2024-07-07 Thread via GitHub


ulysses-you commented on PR #16497:
URL: https://github.com/apache/kafka/pull/16497#issuecomment-2212785942

   thank you @lianetm , addressed comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-07 Thread via GitHub


chia7712 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1667895877


##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+private static final String VERSION = "version";
+private static final String DATA = "data";
+private static final String KEY = "key";
+private static final String VALUE = "value";
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
+
+byte[] key = consumerRecord.key();
+if (Objects.nonNull(key)) {
+short keyVersion = ByteBuffer.wrap(key).getShort();
+Optional transactionLogKey = 
readToTransactionLogKey(ByteBuffer.wrap(key));
+settingKeyNode(json, transactionLogKey, keyVersion);
+} else {
+json.put(KEY, "NULL");
+}
+
+byte[] value = consumerRecord.value();
+if (Objects.nonNull(value)) {
+short valueVersion = ByteBuffer.wrap(value).getShort();
+Optional transactionLogValue = 
readToTransactionLogValue(ByteBuffer.wrap(value));

Review Comment:
   Maybe we can inline those methods. for example:
   ```java
   JsonNode dataNode = 
readToTransactionLogValue(ByteBuffer.wrap(value))
   .map(s -> TransactionLogValueJsonConverter.write(s, 
valueVersion))
   .orElseGet(() -> new TextNode("unknown"));
   json.putObject(VALUE)
   .put(VERSION, valueVersion)
   .set(DATA, dataNode);
   ```



##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.Transaction

Re: [PR] KAFKA-17090: Add reminder to CreateTopicsResult#config for null values of type and documentation [kafka]

2024-07-07 Thread via GitHub


chia7712 commented on code in PR #16539:
URL: https://github.com/apache/kafka/pull/16539#discussion_r1667901856


##
clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java:
##
@@ -64,6 +64,7 @@ public KafkaFuture all() {
  * If broker returned an error for topic configs, throw appropriate 
exception. For example,
  * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is 
thrown if user does not
  * have permission to describe topic configs.
+ * Note that it is possible to obtain type and documentation with null 
values even if they are not defined

Review Comment:
   The protocol does not carry the `type` and `documentation`, so those fields 
must be "null".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17061 Improve the performance of isReplicaOnline [kafka]

2024-07-07 Thread via GitHub


chia7712 commented on PR #16529:
URL: https://github.com/apache/kafka/pull/16529#issuecomment-2212920344

   @ocadaruma Could you please update flame graph according to latest commit? 
It would be nice to see the difference (benefit) introduced by this patch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-07 Thread via GitHub


m1a2st commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1667909229


##
build.gradle:
##
@@ -2106,6 +2106,8 @@ project(':tools') {
 implementation project(':server-common')
 implementation project(':connect:runtime')
 implementation project(':tools:tools-api')
+implementation project(':transaction-coordinator')
+implementation project(':group-coordinator')

Review Comment:
   we need `:transaction-coordinator'`, but don't need `:group-coordinator'` in 
this PR, I will delete it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17061 Improve the performance of isReplicaOnline [kafka]

2024-07-07 Thread via GitHub


ocadaruma commented on PR #16529:
URL: https://github.com/apache/kafka/pull/16529#issuecomment-2212926390

   @chia7712 Sounds good.
   The flame graph was taken on our production env, so applying the patch and 
retest might take time.
   Let me try reproducing on test env


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-07 Thread via GitHub


m1a2st commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1667926640


##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+private static final String VERSION = "version";
+private static final String DATA = "data";
+private static final String KEY = "key";
+private static final String VALUE = "value";
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
+
+byte[] key = consumerRecord.key();
+if (Objects.nonNull(key)) {
+short keyVersion = ByteBuffer.wrap(key).getShort();
+Optional transactionLogKey = 
readToTransactionLogKey(ByteBuffer.wrap(key));
+settingKeyNode(json, transactionLogKey, keyVersion);
+} else {
+json.put(KEY, "NULL");

Review Comment:
   @chia7712, Thanks for your comments, If this value is null, We should print 
which is better
   ```json
   {"key":"NULL","value":"NULL"}
   ```
   or
   ```json
   {"key":null,"value":null}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Move related getters to RemoteLogManagerConfig [kafka]

2024-07-07 Thread via GitHub


kamalcph commented on PR #16538:
URL: https://github.com/apache/kafka/pull/16538#issuecomment-2212967033

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-07-07 Thread via GitHub


chia7712 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1667968408


##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+private static final String VERSION = "version";
+private static final String DATA = "data";
+private static final String KEY = "key";
+private static final String VALUE = "value";
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
+
+byte[] key = consumerRecord.key();
+if (Objects.nonNull(key)) {
+short keyVersion = ByteBuffer.wrap(key).getShort();
+Optional transactionLogKey = 
readToTransactionLogKey(ByteBuffer.wrap(key));
+settingKeyNode(json, transactionLogKey, keyVersion);
+} else {
+json.put(KEY, "NULL");

Review Comment:
   > {"key":null,"value":null}
   
   I prefer this output, as it follows JSON rule



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17093) KafkaConsumer.seekToEnd should return LSO

2024-07-07 Thread kangning.li (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863664#comment-17863664
 ] 

kangning.li commented on KAFKA-17093:
-

I have run it on both the trunk branch and the 3.6 branch, and both returned 
LSO, which meets expectations.   

Other?

> KafkaConsumer.seekToEnd should return LSO 
> --
>
> Key: KAFKA-17093
> URL: https://issues.apache.org/jira/browse/KAFKA-17093
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.6.1
> Environment: Ubuntu,  IntelliJ, Scala   "org.apache.kafka" % 
> "kafka-clients" % "3.6.1"
>Reporter: Tom Kalmijn
>Priority: Major
>
>  
> Expected
> When using a transactional producer then the method 
> KafkaConsumer.seekToEnd(...) of a consumer configured with isolation level 
> "read_committed" should return the LSO. 
> Observed
> The offset returned is always the actual last offset of the partition, which 
> is not the LSO if the latest offsets are occupied by transaction markers.
> Also see this Slack thread:
> https://confluentcommunity.slack.com/archives/C499EFQS0/p1720088282557559



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17090: Add reminder to CreateTopicsResult#config for null values of type and documentation [kafka]

2024-07-07 Thread via GitHub


mingyen066 commented on code in PR #16539:
URL: https://github.com/apache/kafka/pull/16539#discussion_r1667973081


##
clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java:
##
@@ -64,6 +64,7 @@ public KafkaFuture all() {
  * If broker returned an error for topic configs, throw appropriate 
exception. For example,
  * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is 
thrown if user does not
  * have permission to describe topic configs.
+ * Note that it is possible to obtain type and documentation with null 
values even if they are not defined

Review Comment:
   Thank you, I have refined the text. Please take a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17093) KafkaConsumer.seekToEnd should return LSO

2024-07-07 Thread kangning.li (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863672#comment-17863672
 ] 

kangning.li commented on KAFKA-17093:
-

Another possibility is {{{}apiVersion{}}}. If ({{{}apiVersion == 0{}}}) 
{{{}{}}}, consumer transaction configuration will be ignored, and it will 
always return {{HW}}

> KafkaConsumer.seekToEnd should return LSO 
> --
>
> Key: KAFKA-17093
> URL: https://issues.apache.org/jira/browse/KAFKA-17093
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.6.1
> Environment: Ubuntu,  IntelliJ, Scala   "org.apache.kafka" % 
> "kafka-clients" % "3.6.1"
>Reporter: Tom Kalmijn
>Priority: Major
>
>  
> Expected
> When using a transactional producer then the method 
> KafkaConsumer.seekToEnd(...) of a consumer configured with isolation level 
> "read_committed" should return the LSO. 
> Observed
> The offset returned is always the actual last offset of the partition, which 
> is not the LSO if the latest offsets are occupied by transaction markers.
> Also see this Slack thread:
> https://confluentcommunity.slack.com/archives/C499EFQS0/p1720088282557559



--
This message was sent by Atlassian Jira
(v8.20.10#820010)