Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-04-26 Thread via GitHub


FrankYang0529 commented on PR #15821:
URL: https://github.com/apache/kafka/pull/15821#issuecomment-2080374723

   Rely on https://github.com/apache/kafka/pull/15766.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-04-26 Thread via GitHub


FrankYang0529 opened a new pull request, #15821:
URL: https://github.com/apache/kafka/pull/15821

   By using `ClusterTestExtensions`, `ListConsumerGroupTest` get get away from 
`KafkaServerTestHarness` dependency.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14892) Harmonize package names in storage module

2024-04-26 Thread Giuseppe Calabrese (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841398#comment-17841398
 ] 

Giuseppe Calabrese edited comment on KAFKA-14892 at 4/27/24 5:16 AM:
-

[~abhijeetkumar] [~chia7712]  any ideas about it? **


was (Author: JIRAUSER305174):
[~abhijeetkumar] [~chia7712]  any ideas about it? **

> Harmonize package names in storage module
> -
>
> Key: KAFKA-14892
> URL: https://issues.apache.org/jira/browse/KAFKA-14892
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Priority: Major
> Fix For: 3.8.0
>
>
> We currently have:
>  # org.apache.kafka.server.log.remote.storage: public api in storage-api 
> module
>  # org.apache.kafka.server.log.remote: private api in storage module
>  # org.apache.kafka.storage.internals.log: private api in storage module
> A way to make this consistent could be:
>  # org.apache.kafka.storage.* or org.apache.kafka.storage.api.*: public api 
> in storage-api module
>  # org.apache.kafka.storage.internals.log.remote: private api in storage 
> module
>  # org.apache.kafka.storage.internals.log: private api in storage module 
> (stays the same)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [WIP] Kafka 15434: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-04-26 Thread via GitHub


abhijeetk88 opened a new pull request, #15820:
URL: https://github.com/apache/kafka/pull/15820

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14892) Harmonize package names in storage module

2024-04-26 Thread Giuseppe Calabrese (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841398#comment-17841398
 ] 

Giuseppe Calabrese commented on KAFKA-14892:


[~abhijeetkumar] [~chia7712]  any ideas about it? **

> Harmonize package names in storage module
> -
>
> Key: KAFKA-14892
> URL: https://issues.apache.org/jira/browse/KAFKA-14892
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Priority: Major
> Fix For: 3.8.0
>
>
> We currently have:
>  # org.apache.kafka.server.log.remote.storage: public api in storage-api 
> module
>  # org.apache.kafka.server.log.remote: private api in storage module
>  # org.apache.kafka.storage.internals.log: private api in storage module
> A way to make this consistent could be:
>  # org.apache.kafka.storage.* or org.apache.kafka.storage.api.*: public api 
> in storage-api module
>  # org.apache.kafka.storage.internals.log.remote: private api in storage 
> module
>  # org.apache.kafka.storage.internals.log: private api in storage module 
> (stays the same)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16514: backdoor for modifying internal leave.group.on.close config [kafka]

2024-04-26 Thread via GitHub


ableegoldman commented on code in PR #15819:
URL: https://github.com/apache/kafka/pull/15819#discussion_r1581729152


##
clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java:
##
@@ -83,6 +84,14 @@ public GroupRebalanceConfig(AbstractConfig config, 
ProtocolType protocolType) {
 }
 }
 
+public void setLeaveGroupOnClose(final boolean leaveGroupOnClose) {

Review Comment:
   Damn. Didn't realize this class was public. I'll refactor to avoid changing 
APIs here, thanks for pointing it out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-26 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1581724166


##
core/src/test/java/kafka/test/ClusterConfigTest.java:
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.test.annotation.Type;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+
+public class ClusterConfigTest {
+
+@Test
+public void testClusterConfigBuilder() throws IOException {
+File trustStoreFile = TestUtils.tempFile();
+
+ClusterConfig clusterConfig = ClusterConfig.builder()
+.setType(Type.KRAFT)
+.setBrokers(1)
+.setControllers(1)
+.setName("builder-test")
+.setAutoStart(true)
+.setSecurityProtocol(SecurityProtocol.PLAINTEXT)
+.setListenerName("EXTERNAL")
+.setTrustStoreFile(trustStoreFile)
+.setMetadataVersion(MetadataVersion.IBP_0_8_0)
+.setServerProperties(Collections.singletonMap("broker", 
"broker_value"))
+.setConsumerProperties(Collections.singletonMap("consumer", 
"consumer_value"))
+.setProducerProperties(Collections.singletonMap("producer", 
"producer_value"))
+
.setAdminClientProperties(Collections.singletonMap("admin_client", 
"admin_client_value"))
+
.setSaslClientProperties(Collections.singletonMap("sasl_client", 
"sasl_client_value"))
+
.setSaslServerProperties(Collections.singletonMap("sasl_server", 
"sasl_server_value"))
+.setPerBrokerProperties(Collections.singletonMap(0, 
Collections.singletonMap("broker_0", "broker_0_value")))
+.build();
+
+Assertions.assertEquals(Type.KRAFT, clusterConfig.clusterType());

Review Comment:
   Sign... But we may still overlook add fields in `equals()` and `hashCode`. 
I'll address the test as you provided in this  comment. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-26 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1581723855


##
core/src/test/java/kafka/test/ClusterConfigTest.java:
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.test.annotation.Type;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+
+public class ClusterConfigTest {
+
+@Test
+public void testClusterConfigBuilder() throws IOException {
+File trustStoreFile = TestUtils.tempFile();
+
+ClusterConfig clusterConfig = ClusterConfig.builder()
+.setType(Type.KRAFT)
+.setBrokers(1)
+.setControllers(1)
+.setName("builder-test")
+.setAutoStart(true)
+.setSecurityProtocol(SecurityProtocol.PLAINTEXT)
+.setListenerName("EXTERNAL")
+.setTrustStoreFile(trustStoreFile)
+.setMetadataVersion(MetadataVersion.IBP_0_8_0)
+.setServerProperties(Collections.singletonMap("broker", 
"broker_value"))
+.setConsumerProperties(Collections.singletonMap("consumer", 
"consumer_value"))
+.setProducerProperties(Collections.singletonMap("producer", 
"producer_value"))
+
.setAdminClientProperties(Collections.singletonMap("admin_client", 
"admin_client_value"))
+
.setSaslClientProperties(Collections.singletonMap("sasl_client", 
"sasl_client_value"))
+
.setSaslServerProperties(Collections.singletonMap("sasl_server", 
"sasl_server_value"))
+.setPerBrokerProperties(Collections.singletonMap(0, 
Collections.singletonMap("broker_0", "broker_0_value")))
+.build();
+
+Assertions.assertEquals(Type.KRAFT, clusterConfig.clusterType());

Review Comment:
   Sorry, I missed the important part in this test. I'm going to override 
`equals()` and `hashCode` in ClusterConfig instead of using reflect method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16629) add broker-related tests to ConfigCommandIntegrationTest

2024-04-26 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16629:
--

Assignee: 黃竣陽  (was: Chia-Ping Tsai)

> add broker-related tests to ConfigCommandIntegrationTest
> 
>
> Key: KAFKA-16629
> URL: https://issues.apache.org/jira/browse/KAFKA-16629
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Major
>
> [https://github.com/apache/kafka/pull/15645] will rewrite the 
> ConfigCommandIntegrationTest by java and new test infra. However, it still 
> lacks of enough tests for broker-related configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16629) add broker-related tests to ConfigCommandIntegrationTest

2024-04-26 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-16629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841391#comment-17841391
 ] 

黃竣陽 commented on KAFKA-16629:
-

I will handle this issue.

> add broker-related tests to ConfigCommandIntegrationTest
> 
>
> Key: KAFKA-16629
> URL: https://issues.apache.org/jira/browse/KAFKA-16629
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> [https://github.com/apache/kafka/pull/15645] will rewrite the 
> ConfigCommandIntegrationTest by java and new test infra. However, it still 
> lacks of enough tests for broker-related configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Add Integration tests for the the KIP-966 [kafka]

2024-04-26 Thread via GitHub


artemlivshits commented on code in PR #15759:
URL: https://github.com/apache/kafka/pull/15759#discussion_r1581642961


##
core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java:
##
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.integration;
+
+import kafka.server.KafkaBroker;
+import kafka.server.KafkaConfig;
+import kafka.utils.Logging;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.mutable.HashMap;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class EligibleLeaderReplicasIntegrationTest extends 
kafka.integration.KafkaServerTestHarness implements Logging {
+private String bootstrapServer;
+private String testTopicName;
+private Admin adminClient;
+@Override
+public Seq generateConfigs() {
+List brokerConfigs = new ArrayList<>();
+
brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs(
+5,
+zkConnectOrNull(),
+true,
+true,
+scala.Option.empty(),
+scala.Option.empty(),
+scala.Option.empty(),
+true,
+false,
+false,
+false,
+new HashMap<>(),
+1,
+false,
+1,
+(short) 4,
+0,
+false
+)));
+List configs = new ArrayList<>();
+for (Properties props : brokerConfigs) {
+props.put(KafkaConfig.ElrEnabledProp(), "true");
+configs.add(KafkaConfig.fromProps(props));
+}
+return JavaConverters.asScalaBuffer(configs).toSeq();
+}
+
+@Override
+public Seq kraftControllerConfigs() {
+Properties properties = new Properties();
+properties.put(KafkaConfig.ElrEnabledProp(), "true");
+return new 
scala.collection.mutable.ListBuffer().addOne(properties);
+}
+
+@BeforeEach
+public void setUp(TestInfo info) {
+super.setUp(info);
+// create adminClient
+Properties props = new Properties();
+bootstrapServer = bootstrapServers(listenerName());
+props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 

Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-26 Thread via GitHub


chia7712 commented on PR #15744:
URL: https://github.com/apache/kafka/pull/15744#issuecomment-2080249371

   @mumrah I'm sorry but could you please fix the conflicts again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP]KAFKA-16614:Disallow @ClusterTemplate("") [kafka]

2024-04-26 Thread via GitHub


chia7712 commented on code in PR #15800:
URL: https://github.com/apache/kafka/pull/15800#discussion_r1581630236


##
core/src/test/java/kafka/test/ClusterTestExtensionsUnitTest.java:
##
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.test.annotation.ClusterTemplate;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import java.lang.reflect.Method;
+import java.util.function.Consumer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ClusterTestExtensionsUnitTest {
+@Test
+void testProcessClusterTemplate() throws Exception {
+ClusterTestExtensions ext = mock(ClusterTestExtensions.class);
+ExtensionContext context = mock(ExtensionContext.class);
+Consumer testInvocations = 
mock(Consumer.class);
+ClusterTemplate annot = mock(ClusterTemplate.class);
+when(annot.value()).thenReturn("");
+
+Method method = 
ClusterTestExtensions.class.getDeclaredMethod("processClusterTemplate",

Review Comment:
   Maybe we can make `processClusterTemplate` be package-private, and then we 
just test that the method can throw exception when value is empty



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16514: backdoor for modifying internal leave.group.on.close config [kafka]

2024-04-26 Thread via GitHub


mjsax commented on code in PR #15819:
URL: https://github.com/apache/kafka/pull/15819#discussion_r1581626291


##
clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java:
##
@@ -83,6 +84,14 @@ public GroupRebalanceConfig(AbstractConfig config, 
ProtocolType protocolType) {
 }
 }
 
+public void setLeaveGroupOnClose(final boolean leaveGroupOnClose) {

Review Comment:
   Looks like a public API change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-6527) Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig

2024-04-26 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-6527.
---
Resolution: Fixed

It is enabled by https://github.com/apache/kafka/pull/15796

> Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig
> 
>
> Key: KAFKA-6527
> URL: https://issues.apache.org/jira/browse/KAFKA-6527
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: TaiJuWu
>Priority: Blocker
>  Labels: flakey, flaky-test
> Fix For: 3.8.0
>
>
> {code:java}
> java.lang.AssertionError: Log segment size increase not applied
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:355)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testDefaultTopicConfig(DynamicBrokerReconfigurationTest.scala:348)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] enable test, check if continues failing in CI [kafka]

2024-04-26 Thread via GitHub


chia7712 commented on PR #13953:
URL: https://github.com/apache/kafka/pull/13953#issuecomment-2080246965

   close as that is resolved by #15796


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] enable test, check if continues failing in CI [kafka]

2024-04-26 Thread via GitHub


chia7712 closed pull request #13953: enable test, check if continues failing in 
CI
URL: https://github.com/apache/kafka/pull/13953


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-6527) Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig

2024-04-26 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-6527:
-

Assignee: TaiJuWu

> Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig
> 
>
> Key: KAFKA-6527
> URL: https://issues.apache.org/jira/browse/KAFKA-6527
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: TaiJuWu
>Priority: Blocker
>  Labels: flakey, flaky-test
> Fix For: 3.8.0
>
>
> {code:java}
> java.lang.AssertionError: Log segment size increase not applied
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:355)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testDefaultTopicConfig(DynamicBrokerReconfigurationTest.scala:348)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-6527:Enable DynamicBrokerReconfigurationTest.testDefaultTopicConfig [kafka]

2024-04-26 Thread via GitHub


chia7712 merged PR #15796:
URL: https://github.com/apache/kafka/pull/15796


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-6527:Enable DynamicBrokerReconfigurationTest.testDefaultTopicConfig [kafka]

2024-04-26 Thread via GitHub


chia7712 commented on PR #15796:
URL: https://github.com/apache/kafka/pull/15796#issuecomment-2080246076

   Let's ship it to run it in the QA ASAP


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]

2024-04-26 Thread via GitHub


chia7712 commented on PR #15645:
URL: https://github.com/apache/kafka/pull/15645#issuecomment-2080244534

   @nizhikov I'm ok to accept this PR to be a java rewriting, and I file 
https://issues.apache.org/jira/browse/KAFKA-16629 to add more tests in the 
future. Hence, please fix comment 
(https://github.com/apache/kafka/pull/15645#discussion_r1581621779) and I will 
take a look later to ship it ASAP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16629) add broker-related tests to ConfigCommandIntegrationTest

2024-04-26 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16629:
--

 Summary: add broker-related tests to ConfigCommandIntegrationTest
 Key: KAFKA-16629
 URL: https://issues.apache.org/jira/browse/KAFKA-16629
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


[https://github.com/apache/kafka/pull/15645] will rewrite the 
ConfigCommandIntegrationTest by java and new test infra. However, it still 
lacks of enough tests for broker-related configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]

2024-04-26 Thread via GitHub


chia7712 commented on code in PR #15645:
URL: https://github.com/apache/kafka/pull/15645#discussion_r1581621779


##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin;
+
+import kafka.cluster.Broker;
+import kafka.cluster.EndPoint;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.AutoStart;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.ClusterTests;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.test.junit.ZkClusterInvocationContext;
+import kafka.zk.AdminZkClient;
+import kafka.zk.BrokerInfo;
+import kafka.zk.KafkaZkClient;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.security.PasswordEncoder;
+import org.apache.kafka.security.PasswordEncoderConfigs;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.config.ZooKeeperInternals;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ZK)
+@Tag("integration")
+public class ConfigCommandIntegrationTest {
+AdminZkClient adminZkClient;
+List alterOpts;
+
+private final ClusterInstance cluster;
+
+public ConfigCommandIntegrationTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTests({
+@ClusterTest(clusterType = Type.ZK),
+@ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 1),

Review Comment:
   That is caused by `--entity-name", "0"`: you define a nonexistent broker id 
when you only have a single broker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-26 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1581608377


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java:
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.consumer.group;
+
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.util.Collections.singleton;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.GroupType.CONSUMER;
+
+class ConsumerGroupExecutor {
+
+static AutoCloseable buildConsumerGroup(String brokerAddress,
+int numberOfConsumers,
+String groupId,
+String topic,
+String groupProtocol,
+Optional remoteAssignor,
+Map customConfigs,
+boolean syncCommit) {
+return buildConsumers(
+brokerAddress,
+numberOfConsumers,
+groupId,
+groupProtocol,
+topic,
+RangeAssignor.class.getName(),
+remoteAssignor,
+customConfigs,
+syncCommit
+);
+}
+
+static AutoCloseable buildClassicGroup(String brokerAddress,
+   int numberOfConsumers,
+   String groupId,
+   String topic,
+   String assignmentStrategy,
+   Map customConfigs,
+   boolean syncCommit) {
+return buildConsumers(
+brokerAddress,
+numberOfConsumers,
+groupId,
+GroupProtocol.CLASSIC.name,
+topic,
+assignmentStrategy,
+Optional.empty(),
+customConfigs,
+syncCommit
+);
+}
+
+private ConsumerGroupExecutor() {
+}
+
+private static AutoCloseable buildConsumers(
+String brokerAddress,
+int numberOfConsumers,
+String groupId,
+String groupProtocol,
+String topic,
+String assignmentStrategy,
+Optional remoteAssignor,
+Map customConfigs,
+boolean syncCommit
+) {
+List> allConfigs = IntStream.range(0, 
numberOfConsumers)
+.mapToObj(ignored ->
+composeConfigs(
+  

Re: [PR] MINOR: Clean up TestUtils.scala [kafka]

2024-04-26 Thread via GitHub


chia7712 commented on PR #15808:
URL: https://github.com/apache/kafka/pull/15808#issuecomment-2080226483

   @m1a2st Could you please fix conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]

2024-04-26 Thread via GitHub


chia7712 commented on PR #15788:
URL: https://github.com/apache/kafka/pull/15788#issuecomment-2080225060

   @johnnychhsu Could you please fix conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in generator [kafka]

2024-04-26 Thread via GitHub


chia7712 merged PR #15807:
URL: https://github.com/apache/kafka/pull/15807


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: refine DeleteOffsetsConsumerGroupCommandIntegrationTest#produceRecord [kafka]

2024-04-26 Thread via GitHub


chia7712 merged PR #15802:
URL: https://github.com/apache/kafka/pull/15802


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KRAFT configs out of KafkaConfig [kafka]

2024-04-26 Thread via GitHub


chia7712 merged PR #15775:
URL: https://github.com/apache/kafka/pull/15775


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-26 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1581597711


##
core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala:
##
@@ -71,11 +72,16 @@ class KafkaServerKRaftRegistrationTest {
   val readyFuture = 
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
 
   // Enable migration configs and restart brokers
-  
zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, 
"true")
-  
zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
kraftCluster.quorumVotersConfig())
-  
zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp,
 "CONTROLLER")
-  
zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
 "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-  zkCluster.rollingBrokerRestart()
+  val serverProperties = new java.util.HashMap[String, String]()

Review Comment:
   ```java
   val serverProperties = new util.HashMap[String, 
String](zkCluster.config().serverProperties(zkCluster.config().serverProperties()))
   
   ```



##
core/src/test/java/kafka/test/ClusterConfigTest.java:
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test;
+
+import kafka.test.annotation.Type;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+
+public class ClusterConfigTest {
+
+@Test
+public void testClusterConfigBuilder() throws IOException {
+File trustStoreFile = TestUtils.tempFile();
+
+ClusterConfig clusterConfig = ClusterConfig.builder()
+.setType(Type.KRAFT)
+.setBrokers(1)
+.setControllers(1)
+.setName("builder-test")
+.setAutoStart(true)
+.setSecurityProtocol(SecurityProtocol.PLAINTEXT)
+.setListenerName("EXTERNAL")
+.setTrustStoreFile(trustStoreFile)
+.setMetadataVersion(MetadataVersion.IBP_0_8_0)
+.setServerProperties(Collections.singletonMap("broker", 
"broker_value"))
+.setConsumerProperties(Collections.singletonMap("consumer", 
"consumer_value"))
+.setProducerProperties(Collections.singletonMap("producer", 
"producer_value"))
+
.setAdminClientProperties(Collections.singletonMap("admin_client", 
"admin_client_value"))
+
.setSaslClientProperties(Collections.singletonMap("sasl_client", 
"sasl_client_value"))
+
.setSaslServerProperties(Collections.singletonMap("sasl_server", 
"sasl_server_value"))
+.setPerBrokerProperties(Collections.singletonMap(0, 
Collections.singletonMap("broker_0", "broker_0_value")))
+.build();
+
+Assertions.assertEquals(Type.KRAFT, clusterConfig.clusterType());

Review Comment:
   Dear friend, this test can't protect us from overlooking the field copy 
since we don't check the field added in the future, right? Maybe we can use 
reflection to get all fields and then check all dynamically.
   ```java
   private static Map fields(ClusterConfig config) {
   return 
Arrays.stream(config.getClass().getDeclaredFields()).collect(Collectors.toMap(Field::getName,
 f -> {
   f.setAccessible(true);
   return Assertions.assertDoesNotThrow(() -> f.get(config));
   }));
   }
   
   @Test
   public void testClusterConfigBuilder() throws IOException {
   File trustStoreFile = TestUtils.tempFile();
   
   ClusterConfig clusterConfig = ClusterConfig.builder()
   .setType(Type.KRAFT)
   .setBrokers(3)
   .setControllers(2)
   .setName("builder-test")
   .setAutoStart(true)
  

Re: [PR] MINOR: Change the Brokers field documentation . [kafka]

2024-04-26 Thread via GitHub


cmccabe merged PR #15809:
URL: https://github.com/apache/kafka/pull/15809


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-26 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841374#comment-17841374
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16514:


I opened a quick example PR to showcase more clearly what I'm proposing here. 
It's definitely rather hacky, but as I said already, this functionality of not 
leaving the group when a consumer is closed was done in a hacky way to begin 
with (ie via internal consumer config introduced for use by Kafka Streams 
only). So we may as well fix this issue so the Streams closeOptions have 
correct semantics

The more I think about this the more I feel strongly that it's just silly for 
Streams users to be unable to opt-out of this "don't leave the group on close" 
behavior. It's not even possible to use the internal config since Streams 
strictly overrides it inside StreamsConfig. You can work around this by 
plugging in your own consumers via KafkaClientSupplier though that does feel a 
bit extreme. More importantly though, you'd still have to choose up front 
whether or not to leave the group on close, where you would obviously not know 
whether it makes sense to leave until you're actually calling close and know 
_why_ you're calling close (specifically, whether it's a temporary 
restart/bounce or "permanent" close)

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [DO NOT MERGE] KAFKA-16514: backdoor for modifying internal leave.group.on.close config [kafka]

2024-04-26 Thread via GitHub


ableegoldman opened a new pull request, #15819:
URL: https://github.com/apache/kafka/pull/15819

   POC for one option to fix the issue described in KAFKA-16514
   
   This is definitely a hacky approach to introduce a backdoor solely for Kafka 
Streams to implement its special close semantics. But to be fair, that is 
exactly the same thing that happens right now, except that instead of a 
backdoor internal method we have a backdoor internal ConsumerConfig (ie 
"leave.group.on.close"). This just adds a way to modify that config so Kafka 
Streams operators can decide at close time whether they want to leave the group 
or not.
   
   For example, a Streams application may want to scale in by permanently 
stopping some nodes, in which case the consumers should all immediately leave 
the group. However it may also want to simply bounce the node, in which case it 
makes sense to not leave the group when closed. Both are valid scenarios but 
having to choose one up front when the consumer is first created due to the 
immutability of configs means that in practice, users are forced to guess which 
one they will want/need and can't decide at close time.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-26 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841369#comment-17841369
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16514:


The immutable internal config thing is definitely a bummer.

To recap: if we want to solve this so that the current Streams API – ie the 
#close(closeOptions) API – works as intended, ie for non-static members as 
well, we'd need to change the way the consumer works. Or wait for mutable 
configs, which would be nice, but realistically that's not happening soon 
enough.

To do this "right" we'd probably need to introduce a new public consumer API of 
some sort which would mean going through a KIP which could be a bit messy.

But as a slightly-hacky alternative, would it be possible to just introduce an 
internal API that works similar to the effect of the existing internal config, 
and just have Kafka Streams use that internal API without making it a "real" 
API and having to do a KIP? I mean that's basically what the internal config is 
anyways – an internal config not exposed to/intended for use by consumer 
applications and only introduced for Kafka Streams to use. Doesn't seem that 
big a deal to just switch from this immutable config to a new internal overload 
of #close (or even an internal #leaveGroupOnClose API that can be toggled 
on/off). Thoughts?

[~mjsax] [~cadonna] maybe you can raise this with someone who works on the 
clients to see if there are any concerns/make sure no one would object to this 
approach?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-26 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1581517439


##
raft/src/main/java/org/apache/kafka/raft/internals/History.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * A object tracks values of {@code T} at different offsets.
+ */
+public interface History {
+/**
+ * Add a new value at a given offset.
+ *
+ * The provided {@code offset} must be greater than or equal to 0 and must 
be greater than the
+ * offset of all previous calls to this method.
+ *
+ * @param offset the offset
+ * @param value the value to store
+ * @throws IllegalArgumentException if the offset is not greater that all 
previous offsets
+ */
+public void addAt(long offset, T value);
+
+/**
+ * Returns the value that has the largest offset that is less than or 
equals to the provided
+ * offset.
+ *
+ * @param offset the offset
+ * @return the value if it exist, otherwise {@code Optional.empty()}
+ */
+public Optional valueAtOrBefore(long offset);
+
+/**
+ * Returns the value with the largest offset.
+ *
+ * @return the value if it exist, otherwise {@code Optional.empty()}
+ */
+public Optional> lastEntry();
+
+/**
+ * Removes all entries with an offset greater than or equal to {@code 
endOffset}.
+ *
+ * @param endOffset the ending offset
+ */
+public void truncateTo(long endOffset);
+
+/**
+ * Removes all entries but the largest entry that has an offset that is 
less than or equal to
+ * {@code startOffset}.
+ *
+ * This operation does not remove the entry with the largest offset that 
is less than or equal
+ * to {@code startOffset}. This is needed so that calls to {@code 
valueAtOrBefore} and
+ * {@code lastEntry} always return a non-empty value if a value was 
previously added to this
+ * object.
+ *
+ * @param startOffset the starting offset
+ */
+public void trimPrefixTo(long startOffset);

Review Comment:
   I went with `truncateOldEntries` and `truncateNewEntries`. Thanks for the 
suggestions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16465: Fix consumer sys test revocation validation [kafka]

2024-04-26 Thread via GitHub


lianetm commented on code in PR #15778:
URL: https://github.com/apache/kafka/pull/15778#discussion_r1581508063


##
tests/kafkatest/tests/client/consumer_test.py:
##
@@ -242,16 +242,15 @@ def test_static_consumer_bounce(self, clean_shutdown, 
static_membership, bounce_
 self.rolling_bounce_consumers(consumer, keep_alive=num_keep_alive, 
num_bounces=num_bounces)
 
 num_revokes_after_bounce = consumer.num_revokes_for_alive() - 
num_revokes_before_bounce
-
-check_condition = num_revokes_after_bounce != 0
+
 # under static membership, the live consumer shall not revoke any 
current running partitions,
 # since there is no global rebalance being triggered.
 if static_membership:
-check_condition = num_revokes_after_bounce == 0
-
-assert check_condition, \
-"Total revoked count %d does not match the expectation of having 0 
revokes as %d" % \
-(num_revokes_after_bounce, check_condition)
+assert num_revokes_after_bounce == 0, \
+"Unexpected revocation triggered when bouncing static member. 
Expecting 0 but had %d revocations" % num_revokes_after_bounce
+elif consumer.is_eager():
+assert num_revokes_after_bounce != 0, \

Review Comment:
   Done. I also created the 
[Jira](https://issues.apache.org/jira/browse/KAFKA-16628) assigned to me to add 
a test to cover all the non-eager paths that we know we don't have yet. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16628) Add system test for validating static consumer bounce and assignment when not eager

2024-04-26 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-16628:
--

Assignee: Lianet Magrans

> Add system test for validating static consumer bounce and assignment when not 
> eager
> ---
>
> Key: KAFKA-16628
> URL: https://issues.apache.org/jira/browse/KAFKA-16628
> Project: Kafka
>  Issue Type: Task
>  Components: consumer, system tests
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>
> Existing system 
> [test|https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/tests/kafkatest/tests/client/consumer_test.py#L209]
>  include a test for validating that partitions are not re-assigned when a 
> static member is bounced, but the test design and setup is intended for 
> testing this for the Eager assignment strategy only (based on the eager 
> protocol where all dynamic members revoke their partitions when a rebalance 
> happens). 
> We should considering adding a test that would ensure that partitions are not 
> re-assigned when using the cooperative sticky assignor or the new consumer 
> group protocol assignments. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16628) Add system test for validating static consumer bounce and assignment when not eager

2024-04-26 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16628:
--

 Summary: Add system test for validating static consumer bounce and 
assignment when not eager
 Key: KAFKA-16628
 URL: https://issues.apache.org/jira/browse/KAFKA-16628
 Project: Kafka
  Issue Type: Task
  Components: consumer, system tests
Reporter: Lianet Magrans


Existing system 
[test|https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/tests/kafkatest/tests/client/consumer_test.py#L209]
 include a test for validating that partitions are not re-assigned when a 
static member is bounced, but the test design and setup is intended for testing 
this for the Eager assignment strategy only (based on the eager protocol where 
all dynamic members revoke their partitions when a rebalance happens). 
We should considering adding a test that would ensure that partitions are not 
re-assigned when using the cooperative sticky assignor or the new consumer 
group protocol assignments. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16528) Reset member heartbeat interval when request sent

2024-04-26 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans closed KAFKA-16528.
--

> Reset member heartbeat interval when request sent
> -
>
> Key: KAFKA-16528
> URL: https://issues.apache.org/jira/browse/KAFKA-16528
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Member should reset the heartbeat timer when the request is sent, rather than 
> when a response is received. This aims to ensure that we don't add-up to 
> interval any delay there might be in a response. With this, we better respect 
> the contract of members sending HB on the interval to remain in the group, 
> and avoid potential unwanted rebalances.   
> Note that there is already a logic in place to avoid sending a request if a 
> response hasn't been received. So that will ensure that, even with the reset 
> of the interval on the send,  the next HB will only be sent as when the 
> response is received. (Will be sent out on the next poll of the HB manager, 
> and respecting the minimal backoff for sending consecutive requests). This 
> will btw be consistent with how the interval timing & in-flights is handled 
> for auto-commits.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-26 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1581484869


##
raft/src/main/java/org/apache/kafka/raft/internals/History.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * A object tracks values of {@code T} at different offsets.
+ */
+public interface History {

Review Comment:
   Do you mind if we wait to see if we need that flexibility? I ask because if 
we really want to make it generic, we need to support both `Comparable` and 
`Comparator`.
   > The map is ordered according to the [natural 
ordering](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/lang/Comparable.html)
 of its keys, or by a 
[Comparator](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/Comparator.html)
 typically provided at sorted map creation time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add replayRecords to CoordinatorResult [kafka]

2024-04-26 Thread via GitHub


dajac commented on PR #15818:
URL: https://github.com/apache/kafka/pull/15818#issuecomment-2080046398

   @dongnuo123 Thanks for the patch. Could you also check if we have other 
cases like this one: 
https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java#L531?
 We need to ensure that we use replayRecords.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-26 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1581480096


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * A type for representing the set of voters for a topic partition.
+ *
+ * It encapsulates static information like a voter's endpoint and their 
supported kraft.version.
+ *
+ * It providees functionality for converting to and from {@code VotersRecord} 
and for converting
+ * from the static configuration.
+ */
+final public class VoterSet {
+private final Map voters;
+
+VoterSet(Map voters) {
+if (voters.isEmpty()) {
+throw new IllegalArgumentException("Voters cannot be empty");
+}
+
+this.voters = voters;
+}
+
+/**
+ * Returns the socket address for a given voter at a given listener.
+ *
+ * @param voter the id of the voter
+ * @param listener the name of the listener
+ * @return the socket address if it exist, otherwise {@code 
Optional.empty()}
+ */
+public Optional voterAddress(int voter, String 
listener) {
+return Optional.ofNullable(voters.get(voter))
+.flatMap(voterNode -> voterNode.address(listener));
+}
+
+/**
+ * Returns all of the voter ids.
+ */
+public Set voterIds() {
+return voters.keySet();
+}
+
+/**
+ * Adds a voter to the voter set.
+ *
+ * This object is immutable. A new voter set is returned if the voter was 
added.
+ *
+ * A new voter can be added to a voter set if its id doesn't already exist 
in the voter set.
+ *
+ * @param voter the new voter to add
+ * @return a new voter set if the voter was added, otherwise {@code 
Optional.empty()}
+ */
+public Optional addVoter(VoterNode voter) {
+if (voters.containsKey(voter.id())) {
+return Optional.empty();
+}
+
+HashMap newVoters = new HashMap<>(voters);
+newVoters.put(voter.id(), voter);
+
+return Optional.of(new VoterSet(newVoters));
+}
+
+/**
+ * Removew a voter from the voter set.
+ *
+ * This object is immutable. A new voter set is returned if the voter was 
removed.
+ *
+ * A voter can be removed from the voter set if its id and uuid match.
+ *
+ * @param voterId the voter id
+ * @param voterUuid the voter uuid
+ * @return a new voter set if the voter was remove, otherwise {@code 
Optional.empty()}
+ */
+public Optional removeVoter(int voterId, Optional 
voterUuid) {
+VoterNode oldVoter = voters.get(voterId);
+if (oldVoter != null && Objects.equals(oldVoter.uuid(), voterUuid)) {
+HashMap newVoters = new HashMap<>(voters);
+newVoters.remove(voterId);
+
+return Optional.of(new VoterSet(newVoters));
+}
+
+return Optional.empty();
+}
+
+/**
+ * Converts a voter set to a voters record for a given version.
+ *
+ * @param version the version of the voters record
+ */
+public VotersRecord toVotersRecord(short version) {
+return new VotersRecord()
+.setVersion(version)
+.setVoters(
+voters
+.values()
+.stream()
+.map(voter -> {
+Iterator endpoints = voter
+.listeners()
+.entrySet()
+.stream()
+.map(entry ->
+new VotersRecord.Endpoint()
+   

Re: [PR] KAFKA-16565: IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned [kafka]

2024-04-26 Thread via GitHub


kirktrue commented on code in PR #15737:
URL: https://github.com/apache/kafka/pull/15737#discussion_r1581441497


##
tests/kafkatest/services/verifiable_consumer.py:
##
@@ -140,22 +150,32 @@ class 
IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler):
 def __init__(self, node, verify_offsets, idx):
 super().__init__(node, verify_offsets, idx)
 
-def handle_partitions_revoked(self, event):
+def handle_partitions_revoked(self, event, node, logger):
 self.revoked_count += 1
 self.state = ConsumerState.Rebalancing
 self.position = {}
+revoked = []
+
 for topic_partition in event["partitions"]:
-topic = topic_partition["topic"]
-partition = topic_partition["partition"]
-self.assignment.remove(TopicPartition(topic, partition))
+tp = _create_partition_from_dict(topic_partition)
 
-def handle_partitions_assigned(self, event):
+if tp in self.assignment:
+self.assignment.remove(tp)
+revoked.append(tp)
+else:
+logger.warn("Could not remove topic partition %s from 
assignment as it was not previously assigned to %s" % (tp, 
node.account.hostname))

Review Comment:
   Filed KAFKA-16623, FYI.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16609) Update parse_describe_topic to support new topic describe output

2024-04-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-16609.
---
  Reviewer: Lucas Brutschy
Resolution: Fixed

> Update parse_describe_topic to support new topic describe output
> 
>
> Key: KAFKA-16609
> URL: https://issues.apache.org/jira/browse/KAFKA-16609
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: system-test-failure
> Fix For: 3.8.0
>
>
> It appears that recent changes to the describe topic output has broken the 
> system test's ability to parse the output.
> {noformat}
> test_id:
> kafkatest.tests.core.reassign_partitions_test.ReassignPartitionsTest.test_reassign_partitions.bounce_brokers=False.reassign_from_offset_zero=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   50.333 seconds
> IndexError('list index out of range')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/reassign_partitions_test.py",
>  line 175, in test_reassign_partitions
> self.run_produce_consume_validate(core_test_action=lambda: 
> self.reassign_partitions(bounce_brokers))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 105, in run_produce_consume_validate
> core_test_action(*args)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/reassign_partitions_test.py",
>  line 175, in 
> self.run_produce_consume_validate(core_test_action=lambda: 
> self.reassign_partitions(bounce_brokers))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/reassign_partitions_test.py",
>  line 82, in reassign_partitions
> partition_info = 
> self.kafka.parse_describe_topic(self.kafka.describe_topic(self.topic))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/kafka/kafka.py",
>  line 1400, in parse_describe_topic
> fields = list(map(lambda x: x.split(" ")[1], fields))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/kafka/kafka.py",
>  line 1400, in 
> fields = list(map(lambda x: x.split(" ")[1], fields))
> IndexError: list index out of range
> {noformat} 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-26 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1581410429


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * A type for representing the set of voters for a topic partition.
+ *
+ * It encapsulates static information like a voter's endpoint and their 
supported kraft.version.
+ *
+ * It providees functionality for converting to and from {@code VotersRecord} 
and for converting
+ * from the static configuration.
+ */
+final public class VoterSet {
+private final Map voters;
+
+VoterSet(Map voters) {
+if (voters.isEmpty()) {
+throw new IllegalArgumentException("Voters cannot be empty");
+}
+
+this.voters = voters;
+}
+
+/**
+ * Returns the socket address for a given voter at a given listener.
+ *
+ * @param voter the id of the voter
+ * @param listener the name of the listener
+ * @return the socket address if it exist, otherwise {@code 
Optional.empty()}
+ */
+public Optional voterAddress(int voter, String 
listener) {
+return Optional.ofNullable(voters.get(voter))
+.flatMap(voterNode -> voterNode.address(listener));
+}
+
+/**
+ * Returns all of the voter ids.
+ */
+public Set voterIds() {
+return voters.keySet();
+}
+
+/**
+ * Adds a voter to the voter set.
+ *
+ * This object is immutable. A new voter set is returned if the voter was 
added.
+ *
+ * A new voter can be added to a voter set if its id doesn't already exist 
in the voter set.
+ *
+ * @param voter the new voter to add
+ * @return a new voter set if the voter was added, otherwise {@code 
Optional.empty()}
+ */
+public Optional addVoter(VoterNode voter) {
+if (voters.containsKey(voter.id())) {
+return Optional.empty();
+}
+
+HashMap newVoters = new HashMap<>(voters);
+newVoters.put(voter.id(), voter);
+
+return Optional.of(new VoterSet(newVoters));
+}
+
+/**
+ * Removew a voter from the voter set.
+ *
+ * This object is immutable. A new voter set is returned if the voter was 
removed.
+ *
+ * A voter can be removed from the voter set if its id and uuid match.
+ *
+ * @param voterId the voter id
+ * @param voterUuid the voter uuid
+ * @return a new voter set if the voter was remove, otherwise {@code 
Optional.empty()}
+ */
+public Optional removeVoter(int voterId, Optional 
voterUuid) {
+VoterNode oldVoter = voters.get(voterId);
+if (oldVoter != null && Objects.equals(oldVoter.uuid(), voterUuid)) {
+HashMap newVoters = new HashMap<>(voters);
+newVoters.remove(voterId);
+
+return Optional.of(new VoterSet(newVoters));
+}
+
+return Optional.empty();
+}
+
+/**
+ * Converts a voter set to a voters record for a given version.
+ *
+ * @param version the version of the voters record
+ */
+public VotersRecord toVotersRecord(short version) {
+return new VotersRecord()
+.setVersion(version)
+.setVoters(
+voters
+.values()
+.stream()
+.map(voter -> {
+Iterator endpoints = voter
+.listeners()

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To 

Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]

2024-04-26 Thread via GitHub


nizhikov commented on PR #15645:
URL: https://github.com/apache/kafka/pull/15645#issuecomment-2079970499

   @chia7712 CI looks good to me. Do you have any other comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [Minor] Add replayRecords to CoordinatorResult [kafka]

2024-04-26 Thread via GitHub


dongnuo123 opened a new pull request, #15818:
URL: https://github.com/apache/kafka/pull/15818

   The patch adds a boolean attribute `replayRecords` that specifies whether 
the records should be replayed. The default value is `appendFuture == null` so 
no change is needed for the existing code.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-26 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1581410429


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * A type for representing the set of voters for a topic partition.
+ *
+ * It encapsulates static information like a voter's endpoint and their 
supported kraft.version.
+ *
+ * It providees functionality for converting to and from {@code VotersRecord} 
and for converting
+ * from the static configuration.
+ */
+final public class VoterSet {
+private final Map voters;
+
+VoterSet(Map voters) {
+if (voters.isEmpty()) {
+throw new IllegalArgumentException("Voters cannot be empty");
+}
+
+this.voters = voters;
+}
+
+/**
+ * Returns the socket address for a given voter at a given listener.
+ *
+ * @param voter the id of the voter
+ * @param listener the name of the listener
+ * @return the socket address if it exist, otherwise {@code 
Optional.empty()}
+ */
+public Optional voterAddress(int voter, String 
listener) {
+return Optional.ofNullable(voters.get(voter))
+.flatMap(voterNode -> voterNode.address(listener));
+}
+
+/**
+ * Returns all of the voter ids.
+ */
+public Set voterIds() {
+return voters.keySet();
+}
+
+/**
+ * Adds a voter to the voter set.
+ *
+ * This object is immutable. A new voter set is returned if the voter was 
added.
+ *
+ * A new voter can be added to a voter set if its id doesn't already exist 
in the voter set.
+ *
+ * @param voter the new voter to add
+ * @return a new voter set if the voter was added, otherwise {@code 
Optional.empty()}
+ */
+public Optional addVoter(VoterNode voter) {
+if (voters.containsKey(voter.id())) {
+return Optional.empty();
+}
+
+HashMap newVoters = new HashMap<>(voters);
+newVoters.put(voter.id(), voter);
+
+return Optional.of(new VoterSet(newVoters));
+}
+
+/**
+ * Removew a voter from the voter set.
+ *
+ * This object is immutable. A new voter set is returned if the voter was 
removed.
+ *
+ * A voter can be removed from the voter set if its id and uuid match.
+ *
+ * @param voterId the voter id
+ * @param voterUuid the voter uuid
+ * @return a new voter set if the voter was remove, otherwise {@code 
Optional.empty()}
+ */
+public Optional removeVoter(int voterId, Optional 
voterUuid) {
+VoterNode oldVoter = voters.get(voterId);
+if (oldVoter != null && Objects.equals(oldVoter.uuid(), voterUuid)) {
+HashMap newVoters = new HashMap<>(voters);
+newVoters.remove(voterId);
+
+return Optional.of(new VoterSet(newVoters));
+}
+
+return Optional.empty();
+}
+
+/**
+ * Converts a voter set to a voters record for a given version.
+ *
+ * @param version the version of the voters record
+ */
+public VotersRecord toVotersRecord(short version) {
+return new VotersRecord()
+.setVersion(version)
+.setVoters(
+voters
+.values()
+.stream()
+.map(voter -> {
+Iterator endpoints = voter
+.listeners()

Review Comment:
   Fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To 

Re: [PR] KAFKA-16211: Inconsistent config values in CreateTopicsResult and DescribeConfigsResult [kafka]

2024-04-26 Thread via GitHub


infantlikesprogramming commented on PR #15696:
URL: https://github.com/apache/kafka/pull/15696#issuecomment-2079935542

   @chia7712 Thank you! The cluster that I deployed in Kraft (locally) consists 
of 3 controllers and 4 brokers, using standard configurations from the 
config/kraft folder. There is a slight modification to the value of 
log.segment.bytes of the 4th broker to 286870912. Would you like me to include 
the files for my test cluster and test file, or would like me to add a new test 
in the test class for the function?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-26 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1581398905


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * A type for representing the set of voters for a topic partition.
+ *
+ * It encapsulates static information like a voter's endpoint and their 
supported kraft.version.
+ *
+ * It providees functionality for converting to and from {@code VotersRecord} 
and for converting
+ * from the static configuration.
+ */
+final public class VoterSet {
+private final Map voters;
+
+VoterSet(Map voters) {
+if (voters.isEmpty()) {
+throw new IllegalArgumentException("Voters cannot be empty");
+}
+
+this.voters = voters;
+}
+
+/**
+ * Returns the socket address for a given voter at a given listener.
+ *
+ * @param voter the id of the voter
+ * @param listener the name of the listener
+ * @return the socket address if it exist, otherwise {@code 
Optional.empty()}
+ */
+public Optional voterAddress(int voter, String 
listener) {
+return Optional.ofNullable(voters.get(voter))
+.flatMap(voterNode -> voterNode.address(listener));
+}
+
+/**
+ * Returns all of the voter ids.
+ */
+public Set voterIds() {
+return voters.keySet();
+}
+
+/**
+ * Adds a voter to the voter set.
+ *
+ * This object is immutable. A new voter set is returned if the voter was 
added.
+ *
+ * A new voter can be added to a voter set if its id doesn't already exist 
in the voter set.
+ *
+ * @param voter the new voter to add
+ * @return a new voter set if the voter was added, otherwise {@code 
Optional.empty()}
+ */
+public Optional addVoter(VoterNode voter) {
+if (voters.containsKey(voter.id())) {
+return Optional.empty();
+}
+
+HashMap newVoters = new HashMap<>(voters);
+newVoters.put(voter.id(), voter);
+
+return Optional.of(new VoterSet(newVoters));
+}
+
+/**
+ * Removew a voter from the voter set.
+ *
+ * This object is immutable. A new voter set is returned if the voter was 
removed.
+ *
+ * A voter can be removed from the voter set if its id and uuid match.
+ *
+ * @param voterId the voter id

Review Comment:
   I have this wording in the description "A voter can be removed from the 
voter set if its id and directory id match."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-26 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1581392367


##
raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java:
##
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+/**
+ * A type for representing the set of voters for a topic partition.
+ *
+ * It encapsulates static information like a voter's endpoint and their 
supported kraft.version.
+ *
+ * It providees functionality for converting to and from {@code VotersRecord} 
and for converting
+ * from the static configuration.
+ */
+final public class VoterSet {
+private final Map voters;
+
+VoterSet(Map voters) {
+if (voters.isEmpty()) {
+throw new IllegalArgumentException("Voters cannot be empty");
+}
+
+this.voters = voters;
+}
+
+/**
+ * Returns the socket address for a given voter at a given listener.
+ *
+ * @param voter the id of the voter
+ * @param listener the name of the listener
+ * @return the socket address if it exist, otherwise {@code 
Optional.empty()}
+ */
+public Optional voterAddress(int voter, String 
listener) {
+return Optional.ofNullable(voters.get(voter))
+.flatMap(voterNode -> voterNode.address(listener));
+}
+
+/**
+ * Returns all of the voter ids.
+ */
+public Set voterIds() {
+return voters.keySet();
+}
+
+/**
+ * Adds a voter to the voter set.
+ *
+ * This object is immutable. A new voter set is returned if the voter was 
added.
+ *
+ * A new voter can be added to a voter set if its id doesn't already exist 
in the voter set.
+ *
+ * @param voter the new voter to add
+ * @return a new voter set if the voter was added, otherwise {@code 
Optional.empty()}
+ */
+public Optional addVoter(VoterNode voter) {
+if (voters.containsKey(voter.id())) {
+return Optional.empty();

Review Comment:
   We don't have to. The important invariant for `VoterSet` is that voter ids 
are unique. The kraft leader will not allow a replica to join the `VoterSet` if 
its replica id already exists in the voters set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-26 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1581378400


##
raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java:
##
@@ -261,7 +260,7 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) {
  * @param snapshotId the end offset and epoch that identifies the snapshot
  * @return a writable snapshot if it doesn't already exist
  */
-Optional storeSnapshot(OffsetAndEpoch snapshotId);
+Optional createNewSnapshotUnchecked(OffsetAndEpoch 
snapshotId);

Review Comment:
   Fix the comments for this method and `createNewSnapshot`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16584) Make log processing summary configurable or debug

2024-04-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841332#comment-17841332
 ] 

Matthias J. Sax commented on KAFKA-16584:
-

There is an issue with creating new account: 
https://issues.apache.org/jira/browse/INFRA-25451 – we are waiting for infra 
team to resolve it. Unfortunately, we cannot do anything about it.

The only thing I can offer is, if you prepare a KIP using some other tool (eg 
google doc or similar) and share it with me, and I can c it in the wiki for 
you.

> Make log processing summary configurable or debug
> -
>
> Key: KAFKA-16584
> URL: https://issues.apache.org/jira/browse/KAFKA-16584
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Andras Hatvani
>Assignee: dujian
>Priority: Major
>  Labels: needs-kip, newbie
>
> Currently *every two minutes for every stream thread* statistics will be 
> logged on INFO log level. 
> {code}
> 2024-04-18T09:18:23.790+02:00  INFO 33178 --- [service] [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread         : stream-thread 
> [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 
> total records, ran 0 punctuators, and committed 0 total tasks since the last 
> update {code}
> This is absolutely unnecessary and even harmful since it fills the logs and 
> thus storage space with unwanted and useless data. Otherwise the INFO logs 
> are useful and helpful, therefore it's not an option to raise the log level 
> to WARN.
> Please make the logProcessingSummary 
> * either to a DEBUG level log or
> * make it configurable so that it can be disabled.
> This is the relevant code: 
> https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once

2024-04-26 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841137#comment-17841137
 ] 

Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 6:04 PM:


if after the consumer reaches 1 and the 1st checkpoint is emitted, MM2 
restarts before the other 1 messages are produced,
then bug https://issues.apache.org/jira/browse/KAFKA-15905 hits and we end up 
with just two checkpoints, at 1 and 2.

but the problem here is that if the consumer never fully catches up once, we 
will never have a checkpoint.

If the {color:#00}OffsetSyncStore.{color}{color:#871094}offsetSyncs 
{color}contained a distribution of {color:#00}OffsetSync rather than just 
multiple copies of the last {color}{color:#00}OffsetSync , Checkpoints 
would be computed before
{color}

{color:#00} {color}


was (Author: ecomar):
if after the consumer reaches 1 and the 1st checkpoint is emitted, MM2 
restarts before the other 1 messages are produced,
then bug https://issues.apache.org/jira/browse/KAFKA-15905 hits and we end up 
with just two checkpoints, at 1 and 2.

 

but the problem here is that if the consumer never fully cathces up once, we 
will never have a checkpoint

> Mirromaker2 first Checkpoint not emitted until consumer group fully catches 
> up once
> ---
>
> Key: KAFKA-16622
> URL: https://issues.apache.org/jira/browse/KAFKA-16622
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.0, 3.6.2, 3.8.0
>Reporter: Edoardo Comar
>Priority: Major
> Attachments: connect.log.2024-04-26-10.zip, 
> edo-connect-mirror-maker-sourcetarget.properties
>
>
> We observed an excessively delayed emission of the MM2 Checkpoint record.
> It only gets created when the source consumer reaches the end of a topic. 
> This does not seem reasonable.
> In a very simple setup :
> Tested with a standalone single process MirrorMaker2 mirroring between two 
> single-node kafka clusters(mirromaker config attached) with quick refresh 
> intervals (eg 5 sec) and a small offset.lag.max (eg 10)
> create a single topic in the source cluster
> produce data to it (e.g. 1 records)
> start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec 
> between polls which commits after each poll
> watch the Checkpoint topic in the target cluster
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \
>   --topic source.checkpoints.internal \
>   --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \
>--from-beginning
> -> no record appears in the checkpoint topic until the consumer reaches the 
> end of the topic (ie its consumer group lag gets down to 0).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-26 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1581343739


##
core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala:
##
@@ -324,15 +323,11 @@ class DumpLogSegmentsTest {
 val lastContainedLogTimestamp = 1
 
 TestUtils.resource(
-  RecordsSnapshotWriter.createWithHeader(
-() => metadataLog.createNewSnapshot(new OffsetAndEpoch(0, 0)),
-1024,
-MemoryPool.NONE,
-new MockTime,
-lastContainedLogTimestamp,
-CompressionType.NONE,
-MetadataRecordSerde.INSTANCE,
-  ).get()
+  new RecordsSnapshotWriter.Builder()

Review Comment:
   Yes. It found a small bug in the dump log tool.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-26 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1581332843


##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -1341,29 +1340,25 @@ class KafkaConfigTest {
   }
 
   @Test
-  def testValidQuorumVotersConfig(): Unit = {
-val expected = new util.HashMap[Integer, AddressSpec]()
+  def testValidQuorumVotersParsing(): Unit = {
+val expected = new util.HashMap[Integer, InetSocketAddress]()

Review Comment:
   Done. I split this into 3 tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-26 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1581322854


##
core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala:
##
@@ -87,9 +85,12 @@ class RaftClusterSnapshotTest {
 
   // Check that we can read the entire snapshot
   while (snapshot.hasNext) {
-val batch = snapshot.next()
+val batch = snapshot.next
 assertTrue(batch.sizeInBytes > 0)
-assertNotEquals(Collections.emptyList(), batch.records())
+assertTrue(
+  batch.records.isEmpty != batch.controlRecords.isEmpty,

Review Comment:
   Yeah. I thought I was being creative when I discovered that invariant  . I 
wrote a comment and changed it to use `assertNotEquals`. Let me know if that 
helps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-26 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1581315479


##
clients/src/test/java/org/apache/kafka/common/record/ControlRecordTypeTest.java:
##
@@ -45,4 +45,58 @@ public void testParseUnknownVersion() {
 assertEquals(ControlRecordType.ABORT, type);
 }
 
+@Test
+public void testLeaderChange() {
+ByteBuffer buffer = ByteBuffer.allocate(32);
+buffer.putShort(ControlRecordType.CURRENT_CONTROL_RECORD_KEY_VERSION);
+buffer.putShort((short) 2);
+buffer.flip();
+
+ControlRecordType type = ControlRecordType.parse(buffer);
+assertEquals(ControlRecordType.LEADER_CHANGE, type);
+}
+
+@Test
+public void testSnapshotHeader() {
+ByteBuffer buffer = ByteBuffer.allocate(32);
+buffer.putShort(ControlRecordType.CURRENT_CONTROL_RECORD_KEY_VERSION);
+buffer.putShort((short) 3);
+buffer.flip();
+
+ControlRecordType type = ControlRecordType.parse(buffer);
+assertEquals(ControlRecordType.SNAPSHOT_HEADER, type);
+}
+
+@Test
+public void testSnapshotFooter() {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16465: Fix consumer sys test revocation validation [kafka]

2024-04-26 Thread via GitHub


lucasbru commented on code in PR #15778:
URL: https://github.com/apache/kafka/pull/15778#discussion_r1581306529


##
tests/kafkatest/tests/client/consumer_test.py:
##
@@ -242,16 +242,15 @@ def test_static_consumer_bounce(self, clean_shutdown, 
static_membership, bounce_
 self.rolling_bounce_consumers(consumer, keep_alive=num_keep_alive, 
num_bounces=num_bounces)
 
 num_revokes_after_bounce = consumer.num_revokes_for_alive() - 
num_revokes_before_bounce
-
-check_condition = num_revokes_after_bounce != 0
+
 # under static membership, the live consumer shall not revoke any 
current running partitions,
 # since there is no global rebalance being triggered.
 if static_membership:
-check_condition = num_revokes_after_bounce == 0
-
-assert check_condition, \
-"Total revoked count %d does not match the expectation of having 0 
revokes as %d" % \
-(num_revokes_after_bounce, check_condition)
+assert num_revokes_after_bounce == 0, \
+"Unexpected revocation triggered when bouncing static member. 
Expecting 0 but had %d revocations" % num_revokes_after_bounce
+elif consumer.is_eager():
+assert num_revokes_after_bounce != 0, \

Review Comment:
   Yes, I think that's a good plan. Can you update this PR to rename the test 
and create a separate JIRA for the new test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16511: Fix the leaking tiered segments during segment deletions [kafka]

2024-04-26 Thread via GitHub


kamalcph opened a new pull request, #15817:
URL: https://github.com/apache/kafka/pull/15817

   When there are overlapping segments in the remote storage, then the deletion 
may fail to remove the segments due to `isRemoteSegmentWithinLeaderEpochs` 
check. Once the deletion starts to fail for a partition, then segments won't be 
eligible for cleanup. The one workaround that we have is to move the 
log-start-offset using the kafka-delete-records script.
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15342) Considering upgrading to Mockito 5.4.1 or later

2024-04-26 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841294#comment-17841294
 ] 

Chia-Ping Tsai commented on KAFKA-15342:


not sure the benefit of having unit test and integration test. Our CI always 
run all tests regardless of fast failure. The difference of JVM setting is 
`maxHeapSize` (2g v.s 2560m), but I don't think that is something important. 
Also, we often neglect the tag in testing. 

Maybe we should just get rid of the category :_

 

> Considering upgrading to Mockito 5.4.1 or later
> ---
>
> Key: KAFKA-15342
> URL: https://issues.apache.org/jira/browse/KAFKA-15342
> Project: Kafka
>  Issue Type: Task
>  Components: unit tests
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Blocker
> Fix For: 4.0.0
>
>
> We're currently stuck on Mockito 4.x.y because the 5.x.y line requires Java 
> 11 and, until we begin to work on Kafka 4.0.0, we continue to support Java 8.
> Either directly before, or after releasing Kafka 4.0.0, we should try to 
> upgrade to a version of Mockito on the 5.x.y line.
> If we're able to use a version that includes 
> [https://github.com/mockito/mockito/pull/3078|https://github.com/mockito/mockito/pull/3078,]
>  (which should be included in either a 5.4.1 or 5.5.0 release), we should 
> also revert the change made for 
> https://issues.apache.org/jira/browse/KAFKA-14682, which is just a temporary 
> workaround. Care should be taken that, after reverting that change, unused 
> stubbings are still correctly reported during our CI builds.
> If the effort required to upgrade our Mockito version is too high, we can 
> either downgrade the severity of this ticket, or split it out into separate 
> subtasks for each to-be-upgraded module.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [Minor] Fix the flaky testConsumerGroupHeartbeatWithStableClassicGroup by sorting the topic partition list [kafka]

2024-04-26 Thread via GitHub


dongnuo123 opened a new pull request, #15816:
URL: https://github.com/apache/kafka/pull/15816

   We are seeing flaky test in 
`testConsumerGroupHeartbeatWithStableClassicGroup` where the error is caused by 
the different ordering in the expected and actual values. The patch sorts the 
topic partition list in the records to fix the issue.
   
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15698/11/tests/
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-26 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1581234346


##
clients/src/main/resources/common/message/VotersRecord.json:
##
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "VotersRecord",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "Version", "type": "int16", "versions": "0+",
+  "about": "The version of the voters record" },
+{ "name": "Voters", "type": "[]Voter", "versions": "0+", "fields": [
+  { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+"about": "The replica id of the voter in the topic partition" },
+  { "name": "VoterUuid", "type": "uuid", "versions": "0+",
+"about": "The directory id of the voter in the topic partition" },
+  { "name": "Endpoints", "type": "[]Endpoint", "versions": "0+",
+"about": "The endpoint that can be used to communicate with the 
voter", "fields": [
+{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
+  "about": "The name of the endpoint" },
+{ "name": "Host", "type": "string", "versions": "0+",
+  "about": "The hostname" },
+{ "name": "Port", "type": "uint16", "versions": "0+",

Review Comment:
   Yeah. @junrao and I had a discussion about this in the mailing list. The 
original design had the security protocol. I agreed to remove it because it is 
not strictly required since the local replica can look up the security protocol 
from its local configuration.
   
   As you point out it would be useful to have this to catch misconfiguration 
between the replicas. What do you think @junrao should we add security protocol 
to all of the endpoint descriptions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]

2024-04-26 Thread via GitHub


jsancio commented on code in PR #15671:
URL: https://github.com/apache/kafka/pull/15671#discussion_r1580159998


##
clients/src/main/resources/common/message/VotersRecord.json:
##
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "VotersRecord",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "Version", "type": "int16", "versions": "0+",
+  "about": "The version of the voters record" },
+{ "name": "Voters", "type": "[]Voter", "versions": "0+", "fields": [
+  { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+"about": "The replica id of the voter in the topic partition" },
+  { "name": "VoterUuid", "type": "uuid", "versions": "0+",

Review Comment:
   Sounds good to me. I made these changes to the KIP: 
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=217391519=94=93



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16465: Fix consumer sys test revocation validation [kafka]

2024-04-26 Thread via GitHub


lianetm commented on code in PR #15778:
URL: https://github.com/apache/kafka/pull/15778#discussion_r1581203195


##
tests/kafkatest/tests/client/consumer_test.py:
##
@@ -242,16 +242,15 @@ def test_static_consumer_bounce(self, clean_shutdown, 
static_membership, bounce_
 self.rolling_bounce_consumers(consumer, keep_alive=num_keep_alive, 
num_bounces=num_bounces)
 
 num_revokes_after_bounce = consumer.num_revokes_for_alive() - 
num_revokes_before_bounce
-
-check_condition = num_revokes_after_bounce != 0
+
 # under static membership, the live consumer shall not revoke any 
current running partitions,
 # since there is no global rebalance being triggered.
 if static_membership:
-check_condition = num_revokes_after_bounce == 0
-
-assert check_condition, \
-"Total revoked count %d does not match the expectation of having 0 
revokes as %d" % \
-(num_revokes_after_bounce, check_condition)
+assert num_revokes_after_bounce == 0, \
+"Unexpected revocation triggered when bouncing static member. 
Expecting 0 but had %d revocations" % num_revokes_after_bounce
+elif consumer.is_eager():
+assert num_revokes_after_bounce != 0, \

Review Comment:
   > So maybe we should instead try to get the set of partitions and check that 
it didn't change?
   
   Doable, but that wouldn't truly ensure the static membership behaviour 
either I guess, because the test is intentionally leaving 1 member alive that 
could be the leader or not. So the assignment would remain the same regardless 
of the static membership under CONSUMER (or Cooperative) if the single 
partition belongs to the live member (single partition that I guess was 
intentionally decided to be able to easily check the delivery semantics after 
the bounces) 
   
   So as I see it this test is specifically crafted to validate the stickiness 
that static membership brings into the `RangeAssignor` ( nicely explained in 
[this 
section](https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java#L58-L78)
 of the `RangeAssignor` class doc btw). We're trying to apply it to the 
CONSUMER protocol but finding not much value given the purpose of the test and 
the shape it has (bounce n-1 nodes, check at least 1 revocation if dynamic, 
none if static, regardless of partition ownership). Listening to myself telling 
you this makes me reconsider if we should just not run this test for the new 
protocol, as it was truly never intended or run for CooperativeAssignor? (I 
would probably rename it to something like 
`test_eager_stickiness_on_static_consumer_bounce` , make the use of 
RangeAssignor explicit, and then it looks clearer that we don't want to run 
such test on Cooper
 ativeAssignor or CONSUMER protocol. 
   
   We do have other tests that ensure that static membership behaves as 
expected for the new protocol (ex. `test_fencing_static_consumer`), but agreed 
that the "owned partition not re-assigned for a static member that is bounced" 
is not covered in sys tests (not for CONSUMER, not for Cooperative either). We 
could think of a new test to cover that. The shape would be different I expect, 
because it would either have to rely on bouncing a member with assignment while 
having others with none, or ensuring that all members have at least 1 
partition. It would also need a different, more complex delivery semantics 
validation, if any. I would just suggest a different Jira/PR for a new test, to 
be able to finalize migrating the current sys tests that apply to the new 
protocol. Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-26 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1581140616


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -966,6 +987,23 @@ private static void maybeUpdateSubscribedTopicNames(
 }
 }
 
+/**
+ * Updates the subscription type, iff necessary.
+ *
+ * If all members are subscribed to the same set of topics, the type is 
homogeneous.
+ * Otherwise, it is heterogeneous.
+ */
+private void maybeUpdateGroupSubscriptionType() {

Review Comment:
   hmm theoretically yes, but what if it stays homo/hetero even after the 
update? maybe it doesn't matter cause the variable is reassigned



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-26 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1581134610


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -620,9 +637,10 @@ public Map 
computeSubscriptionMetadata(
 TopicsImage topicsImage,
 ClusterImage clusterImage
 ) {
-// Copy and update the current subscriptions.
+// Copy and update the current subscription information.
 Map subscribedTopicNames = new 
HashMap<>(this.subscribedTopicNames);
 maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, 
newMember);
+maybeUpdateGroupSubscriptionType();

Review Comment:
   o that makes sense, I wasn't sure what the reasoning was, okie I'll 
remove it, thanks for explaining



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-26 Thread via GitHub


rreddy-22 commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1581132398


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -204,7 +198,7 @@ private void createAssignmentSpec() {
 }
 }
 
-this.assignmentSpec = new AssignmentSpec(members);
+this.assignmentSpec = new AssignmentSpec(members, subscriptionType);

Review Comment:
   hmm this is weird lol, I could've sworn I removed em all



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9800: Exponential backoff for Kafka clients - KIP-580 [kafka]

2024-04-26 Thread via GitHub


msn-tldr commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1581106940


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##
@@ -384,6 +389,32 @@ int attempts() {
 return attempts.get();
 }
 
+/*
+ * Returns whether the leader Node has changed since the last attempt.
+ * @param node The Node currently thought of as the leader, which might be 
null.
+ * @return true if the leader has changed, otherwise false
+ */
+boolean hasLeaderChanged(Node latestLeader) {
+boolean leaderChanged = false;
+if (latestLeader != null) {
+// If we don't know the leader yet, we have not yet attempted to 
send to the leader
+if (currentLeader == null) {
+currentLeader = latestLeader;
+} else {
+// If the leader's node id has changed, this counts as a 
leader change
+if (currentLeader.id() != latestLeader.id()) {

Review Comment:
   The change was done along with KIP-951 changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14892) Harmonize package names in storage module

2024-04-26 Thread Giuseppe Calabrese (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841254#comment-17841254
 ] 

Giuseppe Calabrese commented on KAFKA-14892:


Hi, I'm new in ASF.

I wonder if this ticket could be assigned to me.

Please let me know. Thanks

> Harmonize package names in storage module
> -
>
> Key: KAFKA-14892
> URL: https://issues.apache.org/jira/browse/KAFKA-14892
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Priority: Major
> Fix For: 3.8.0
>
>
> We currently have:
>  # org.apache.kafka.server.log.remote.storage: public api in storage-api 
> module
>  # org.apache.kafka.server.log.remote: private api in storage module
>  # org.apache.kafka.storage.internals.log: private api in storage module
> A way to make this consistent could be:
>  # org.apache.kafka.storage.* or org.apache.kafka.storage.api.*: public api 
> in storage-api module
>  # org.apache.kafka.storage.internals.log.remote: private api in storage 
> module
>  # org.apache.kafka.storage.internals.log: private api in storage module 
> (stays the same)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16589: Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close [kafka]

2024-04-26 Thread via GitHub


chia7712 commented on PR #15815:
URL: https://github.com/apache/kafka/pull/15815#issuecomment-2079444105

   We are doing the migration so can this PR get pending before we see more use 
cases?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16611) Consider adding test name to "client.id" of Admin in testing

2024-04-26 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16611:
--

Assignee: PoAn Yang  (was: Chia-Ping Tsai)

> Consider adding test name to "client.id" of Admin in testing
> 
>
> Key: KAFKA-16611
> URL: https://issues.apache.org/jira/browse/KAFKA-16611
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
>
> I observed following errors many times.
> {quote}
> org.opentest4j.AssertionFailedError: Found 16 unexpected threads during 
> @BeforeAll: `kafka-admin-client-thread | 
> adminclient-287,kafka-admin-client-thread | 
> adminclient-276,kafka-admin-client-thread | 
> adminclient-271,kafka-admin-client-thread | 
> adminclient-293,kafka-admin-client-thread | 
> adminclient-281,kafka-admin-client-thread | 
> adminclient-302,kafka-admin-client-thread | 
> adminclient-334,kafka-admin-client-thread | 
> adminclient-323,kafka-admin-client-thread | 
> adminclient-257,kafka-admin-client-thread | 
> adminclient-336,kafka-admin-client-thread | 
> adminclient-308,kafka-admin-client-thread | 
> adminclient-263,kafka-admin-client-thread | 
> adminclient-273,kafka-admin-client-thread | 
> adminclient-278,kafka-admin-client-thread | 
> adminclient-283,kafka-admin-client-thread | adminclient-317` ==> expected: 
>  but was: 
> {quote}
> That could be caused by exceptional shutdown. Or we do have resource leaks in 
> some failed tests. Adding the test name to "client.id" can give hints about 
> that



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15838) [Connect] ExtractField and InsertField NULL Values are replaced by default value even in NULLABLE fields

2024-04-26 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-15838:
---
Component/s: connect

> [Connect] ExtractField and InsertField NULL Values are replaced by default 
> value even in NULLABLE fields
> 
>
> Key: KAFKA-15838
> URL: https://issues.apache.org/jira/browse/KAFKA-15838
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Eric Pangiawan
>Assignee: Mario Fiore Vitale
>Priority: Major
>
> ExtractField: Line 116-119
> [https://github.com/a0x8o/kafka/blob/master/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java#L61-L68]
> InsertField: Line 163 - 195
> [https://github.com/a0x8o/kafka/blob/master/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L163-L195]
> h1. Expect:
> Value `null` is valid for an optional filed, even though the field has a 
> default value.
> Only when field is required, the class return default value fallback when 
> value is `null`.
> h1. Actual:
> Always return default value if `null` was given.
> h1. Example:
> PostgreSQL DDL:
> {code:java}
> CREATE TABLE products(
>     id varchar(255),
>     color varchar(255),
>     quantity float8
> );
> -- Set Default
> ALTER TABLE products ALTER COLUMN quantity SET  DEFAULT 1.0; {code}
> Insert A Record:
> {code:java}
> INSERT INTO public.products VALUES('1', 'Blue', null); {code}
> Table Select *:
> {code:java}
>  id | color | quantity
> +---+--
>  1  | Blue  | {code}
> Debezium Behavior when using ExtractField and InsertField class (in the event 
> flattening SMT):
> {code:java}
> {
>     "id":"1",
>     "color":"Blue",
>     "quantity":1.0,
>     "__op":"c",
>     "__ts_ms":1698127432079,
>     "__source_ts_ms":1698127431679,
>     "__db":"testing_db",
>     "__schema":"public",
>     "__table":"products",
>     "__lsn":24470112,
>     "__txId":957,
>     "__snapshot":null,
>     "__deleted":"false"
> } {code}
> The debezium code can be found 
> [here|https://github.com/debezium/debezium/blob/2.4/debezium-core/src/main/java/io/debezium/transforms/ExtractNewRecordState.java#L116-L119]
> h1. Expected Output:
> {code:java}
> {
>     "id":"1",
>     "color":"Blue",
>     "quantity":null,
>     "__op":"c",
>     "__ts_ms":1698127432079,
>     "__source_ts_ms":1698127431679,
>     "__db":"testing_db",
>     "__schema":"public",
>     "__table":"products",
>     "__lsn":24470112,
>     "__txId":957,
>     "__snapshot":null,
>     "__deleted":"false"
> }{code}
> h1. Temporary Solution:
> use getWithoutDefault() into ExtractField and InsertField instead of get()



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16317: Add event process rate in GroupCoordinatorRuntimeMetrics [kafka]

2024-04-26 Thread via GitHub


dajac commented on PR #15448:
URL: https://github.com/apache/kafka/pull/15448#issuecomment-2079403659

   I think that we won't add this one in the end. Closing it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: KIP-848 Uniform Assignor Bugs [kafka]

2024-04-26 Thread via GitHub


dajac commented on PR #15286:
URL: https://github.com/apache/kafka/pull/15286#issuecomment-2079402203

   @rreddy-22 Do we still need this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16317: Add event process rate in GroupCoordinatorRuntimeMetrics [kafka]

2024-04-26 Thread via GitHub


dajac closed pull request #15448: KAFKA-16317: Add event process rate in 
GroupCoordinatorRuntimeMetrics
URL: https://github.com/apache/kafka/pull/15448


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP848- Add JMH Benchmarks for Client And Server Side Assignors [kafka]

2024-04-26 Thread via GitHub


dajac commented on PR #15329:
URL: https://github.com/apache/kafka/pull/15329#issuecomment-2079402641

   I think that we don't need this anymore. Closing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP848- Add JMH Benchmarks for Client And Server Side Assignors [kafka]

2024-04-26 Thread via GitHub


dajac closed pull request #15329: KIP848- Add JMH Benchmarks for Client And 
Server Side Assignors
URL: https://github.com/apache/kafka/pull/15329


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]

2024-04-26 Thread via GitHub


dajac closed pull request #15268: [Draft] Join, Sync, Heartbeat during Migration
URL: https://github.com/apache/kafka/pull/15268


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]

2024-04-26 Thread via GitHub


dajac commented on PR #15268:
URL: https://github.com/apache/kafka/pull/15268#issuecomment-2079401308

   I think that we don't need this anymore. Closing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Refactor SSL/SASL admin integration tests to not use a custom authorizer [kafka]

2024-04-26 Thread via GitHub


tinaselenge commented on code in PR #15377:
URL: https://github.com/apache/kafka/pull/15377#discussion_r1581029496


##
core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala:
##
@@ -259,4 +275,22 @@ class SslAdminIntegrationTest extends 
SaslSslAdminIntegrationTest {
 assertTrue(metrics.nonEmpty, s"Unable to find metric $name: allMetrics: 
${allMetrics.keySet.map(_.getMBeanName)}")
 metrics.map(_.asInstanceOf[Gauge[Int]].value).sum
   }
+
+  override def createAdminClient: Admin = {

Review Comment:
   So I initially thought the `createConfig()` in BaseAdminIntegrationTest is 
creating different security properties by calling 
`TestUtils.adminClientSecurityConfigs()` than the security properties created 
as part of the set up in IntegrationTestHarness. However following all the 
properties `adminClientSecurityConfigs()` set, they are exactly the same except 
the `certAlias`. So basically BaseAdminIntegrationTest mostly just setting 
security properties twice for no good reason. This seems to upset SSL admin 
test, because of the mismatch in the certAlias. 
   
   Removing the unnecessary call to `adminClientSecurityConfigs()` in 
BaseAdminIntegrationTest worked for all of the tests. 
   
   Thanks for pointing this out! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Refactor SSL/SASL admin integration tests to not use a custom authorizer [kafka]

2024-04-26 Thread via GitHub


tinaselenge commented on code in PR #15377:
URL: https://github.com/apache/kafka/pull/15377#discussion_r1581029496


##
core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala:
##
@@ -259,4 +275,22 @@ class SslAdminIntegrationTest extends 
SaslSslAdminIntegrationTest {
 assertTrue(metrics.nonEmpty, s"Unable to find metric $name: allMetrics: 
${allMetrics.keySet.map(_.getMBeanName)}")
 metrics.map(_.asInstanceOf[Gauge[Int]].value).sum
   }
+
+  override def createAdminClient: Admin = {

Review Comment:
   So I initially thought the `createConfig()` in BaseAdminIntegrationTest is 
creating different security properties by calling 
`TestUtils.adminClientSecurityConfigs()` than the security properties created 
as part of the set up in IntegrationTestHarness. However following all the 
properties `adminClientSecurityConfigs()` set, they are exactly the same except 
the `certAlias`. So basically BaseAdminIntegrationTest mostly just setting 
security properties twice for no good reason. This seems to upset SSL admin 
test, because of the mismatch in the certAlias. 
   
   Removing the unnecessary call to `adminClientSecurityConfigs()` in 
BaseAdminIntegrationTest worked for all of the tests. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in raft [kafka]

2024-04-26 Thread via GitHub


mimaison merged PR #15805:
URL: https://github.com/apache/kafka/pull/15805


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-26 Thread via GitHub


dongnuo123 commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1581026465


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1122,4 +1122,27 @@ private  OUT handleOperationException(
 return handler.apply(apiError.error(), apiError.message());
 }
 }
+
+/**
+ * Creates the JoinGroupResponseData according to the error type.
+ *
+ * @param memberId  The member id.
+ * @param error The error.
+ * @return The JoinGroupResponseData.
+ */
+private static JoinGroupResponseData createJoinGroupResponseData(
+String memberId,
+Errors error
+) {
+switch (error) {
+case MEMBER_ID_REQUIRED:

Review Comment:
   Ah yeah you're right. Let's keep it as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-26 Thread via GitHub


dajac commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580929744


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1413,6 +1506,243 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handle a JoinGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The group to join.
+ * @param contextThe request context.
+ * @param requestThe actual JoinGroup request.
+ * @param responseFuture The join group response future.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+private CoordinatorResult classicGroupJoinToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) throws ApiException {
+final long currentTimeMs = time.milliseconds();
+final List records = new ArrayList<>();
+final String groupId = request.groupId();
+String memberId = request.memberId();
+final String instanceId = request.groupInstanceId();
+final JoinGroupRequestProtocolCollection protocols = 
request.protocols();
+final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+
+throwIfConsumerGroupIsFull(group, memberId);
+throwIfClassicProtocolIsNotSupported(group, memberId, 
request.protocolType(), protocols);
+// TODO: need to throw an exception if group is dead?
+
+// Get or create the member.
+if (isUnknownMember) memberId = Uuid.randomUuid().toString();
+ConsumerGroupMember member;
+ConsumerGroupMember.Builder updatedMemberBuilder;
+boolean staticMemberReplaced = false;
+boolean newMemberCreated = false;
+if (instanceId == null) {
+// A dynamic member (re-)joins.
+throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember, 
context);
+newMemberCreated = !group.hasMember(memberId);
+member = group.getOrMaybeCreateMember(memberId, true);
+log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+} else {
+member = group.staticMember(instanceId);
+// A new static member joins or the existing static member rejoins.
+if (isUnknownMember) {
+newMemberCreated = true;
+if (member == null) {
+// New static member.
+member = group.getOrMaybeCreateMember(memberId, true);
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+} else {
+// Replace the current static member.
+staticMemberReplaced = true;
+updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+.setAssignedPartitions(member.assignedPartitions());
+removeMember(records, groupId, member.memberId());
+log.info("[GroupId {}] Static member with unknown member 
id and instance id {} re-joins the consumer group. " +
+"Created a new member {} to replace the existing 
member {}.", groupId, instanceId, memberId, member.memberId());
+}
+} else {
+// Rejoining static member. Fence the static group with 
unmatched member id.
+throwIfStaticMemberIsUnknown(member, instanceId);
+throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+log.info("[GroupId {}] Static member {} with instance id {} 
re-joins the consumer group.", groupId, memberId, instanceId);
+}
+}
+
+int groupEpoch = group.groupEpoch();
+Map subscriptionMetadata = 
group.subscriptionMetadata();
+final ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(protocols);
+final List 
ownedTopicPartitions =
+validateGenerationIdAndGetOwnedPartition(member, subscription);
+
+// 1. Create or update the member. If the member is new or has 
changed, a ConsumerGroupMemberMetadataValue
+// record is written to the __consumer_offsets partition to persist 
the change. If the subscriptions have
+// changed, the subscription metadata is updated and persisted by 
writing a ConsumerGroupPartitionMetadataValue
+// record to the __consumer_offsets partition. Finally, the group 
epoch is 

[PR] KAFKA-16589: Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close [kafka]

2024-04-26 Thread via GitHub


FrankYang0529 opened a new pull request, #15815:
URL: https://github.com/apache/kafka/pull/15815

   Sometimes we close the admin created by `createAdminClient`, and sometimes 
we don't. That is not a true problem since the `ClusterInstance` will call 
`close` when stopping.
   
   However, that cause a lot of inconsistent code, and in fact it does not save 
much time since creating a Admin is not a hard work. We can get 
`bootstrapServers` and `bootstrapControllers` from `ClusterInstance` easily.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16611) Consider adding test name to "client.id" of Admin in testing

2024-04-26 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841193#comment-17841193
 ] 

PoAn Yang commented on KAFKA-16611:
---

Hi [~chia7712], I'm interested in this issue. May I take it? Thank you.

> Consider adding test name to "client.id" of Admin in testing
> 
>
> Key: KAFKA-16611
> URL: https://issues.apache.org/jira/browse/KAFKA-16611
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> I observed following errors many times.
> {quote}
> org.opentest4j.AssertionFailedError: Found 16 unexpected threads during 
> @BeforeAll: `kafka-admin-client-thread | 
> adminclient-287,kafka-admin-client-thread | 
> adminclient-276,kafka-admin-client-thread | 
> adminclient-271,kafka-admin-client-thread | 
> adminclient-293,kafka-admin-client-thread | 
> adminclient-281,kafka-admin-client-thread | 
> adminclient-302,kafka-admin-client-thread | 
> adminclient-334,kafka-admin-client-thread | 
> adminclient-323,kafka-admin-client-thread | 
> adminclient-257,kafka-admin-client-thread | 
> adminclient-336,kafka-admin-client-thread | 
> adminclient-308,kafka-admin-client-thread | 
> adminclient-263,kafka-admin-client-thread | 
> adminclient-273,kafka-admin-client-thread | 
> adminclient-278,kafka-admin-client-thread | 
> adminclient-283,kafka-admin-client-thread | adminclient-317` ==> expected: 
>  but was: 
> {quote}
> That could be caused by exceptional shutdown. Or we do have resource leaks in 
> some failed tests. Adding the test name to "client.id" can give hints about 
> that



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]

2024-04-26 Thread via GitHub


dajac commented on code in PR #15798:
URL: https://github.com/apache/kafka/pull/15798#discussion_r1580920772


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1288,25 +1396,10 @@ private 
CoordinatorResult consumerGr
 
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
 .setClientId(clientId)
 .setClientHost(clientHost)
+.setClassicMemberMetadata(null)
 .build();
 
-boolean bumpGroupEpoch = false;
-if (!updatedMember.equals(member)) {
-records.add(newMemberSubscriptionRecord(groupId, updatedMember));
-
-if 
(!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
-log.info("[GroupId {}] Member {} updated its subscribed topics 
to: {}.",
-groupId, memberId, updatedMember.subscribedTopicNames());
-bumpGroupEpoch = true;
-}
-
-if 
(!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
-log.info("[GroupId {}] Member {} updated its subscribed regex 
to: {}.",
-groupId, memberId, updatedMember.subscribedTopicRegex());
-bumpGroupEpoch = true;
-}
-}
-
+boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId, 
member, updatedMember, records);

Review Comment:
   nit: The name does not really represent the intent here. How about 
`hasMemberChanged` or something along those lines?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1122,4 +1122,27 @@ private  OUT handleOperationException(
 return handler.apply(apiError.error(), apiError.message());
 }
 }
+
+/**
+ * Creates the JoinGroupResponseData according to the error type.
+ *
+ * @param memberId  The member id.
+ * @param error The error.
+ * @return The JoinGroupResponseData.
+ */
+private static JoinGroupResponseData createJoinGroupResponseData(
+String memberId,
+Errors error
+) {
+switch (error) {
+case MEMBER_ID_REQUIRED:
+case INVALID_SESSION_TIMEOUT:
+return new JoinGroupResponseData()
+.setMemberId(memberId)

Review Comment:
   I actually wonder why we set the member id in the `INVALID_SESSION_TIMEOUT` 
case. Looking at the client code, we don't use it in the java client.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1092,6 +1097,86 @@ private void 
throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
 }
 }
 
+/**
+ * Validates if the received classic member protocols are supported by the 
group.
+ *
+ * @param group The ConsumerGroup.
+ * @param memberId  The joining member id.
+ * @param protocolType  The joining member protocol type.
+ * @param protocols The joining member protocol collection.
+ */
+private void throwIfClassicProtocolIsNotSupported(
+ConsumerGroup group,
+String memberId,
+String protocolType,
+JoinGroupRequestProtocolCollection protocols
+) {
+if (!group.supportsClassicProtocols(protocolType, 
ClassicGroupMember.plainProtocolSet(protocols))) {
+throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + 
memberId + "'s protocols are not supported.");
+}
+}
+
+/**
+ * Deserialize the subscription in JoinGroupRequestProtocolCollection.
+ * All the protocols have the same subscription, so the method picks a 
random one.
+ *
+ * @param protocols The JoinGroupRequestProtocolCollection.
+ * @return The Subscription.
+ */
+private static ConsumerPartitionAssignor.Subscription 
deserializeSubscription(
+JoinGroupRequestProtocolCollection protocols
+) {
+try {
+return ConsumerProtocol.deserializeSubscription(
+ByteBuffer.wrap(protocols.stream().findAny().get().metadata())
+);
+} catch (SchemaException e) {
+throw new IllegalStateException("Malformed embedded consumer 
protocol.");
+}
+}
+
+/**
+ * Validates the generation id and returns the owned partitions in the 
JoinGroupRequest to a consumer group.
+ *
+ * @param memberThe joining member.
+ * @param subscription  The Subscription.
+ * @return The owned partitions if valid, otherwise return null.
+ */
+private List 
validateGenerationIdAndGetOwnedPartition(
+ConsumerGroupMember member,
+ConsumerPartitionAssignor.Subscription subscription
+) {
+List 
ownedPartitions =
+toTopicPartitions(subscription.ownedPartitions(), 

Re: [PR] KAFKA-16345: Optionally URL-encode clientID and clientSecret in authorization header [kafka]

2024-04-26 Thread via GitHub


bachmanity1 commented on PR #15475:
URL: https://github.com/apache/kafka/pull/15475#issuecomment-2079216075

   Hi @mimaison, would you mind taking a look at the KIP and sharing your vote 
on it? Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]

2024-04-26 Thread via GitHub


cadonna commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1580785461


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1169,8 +1129,7 @@ private Map 
beginningOrEndOffset(Collection 
offsetAndTimestampMap;
 offsetAndTimestampMap = applicationEventHandler.addAndGet(
-listOffsetsEvent,
-timer);
+listOffsetsEvent);

Review Comment:
   nit: 
   Could you please move this parameter to the previous line? 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -998,7 +958,7 @@ public List partitionsFor(String topic, 
Duration timeout) {
 wakeupTrigger.setActiveTask(topicMetadataEvent.future());
 try {
 Map> topicMetadata =
-applicationEventHandler.addAndGet(topicMetadataEvent, 
timer);
+applicationEventHandler.addAndGet(topicMetadataEvent);

Review Comment:
   The timer is only used in the event.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -946,8 +907,7 @@ public Map 
committed(final Set 
offsetsForTimes(Map 
beginningOrEndOffset(Collection 
offsetAndTimestampMap;
 offsetAndTimestampMap = applicationEventHandler.addAndGet(
-listOffsetsEvent,
-timer);
+listOffsetsEvent);

Review Comment:
   I think you do actually not need the timer in this method at all. You could 
pass a deadline to the event.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1026,7 +986,7 @@ public Map> 
listTopics(Duration timeout) {
 final AllTopicsMetadataEvent topicMetadataEvent = new 
AllTopicsMetadataEvent(timer);

Review Comment:
   Also here the timer is only used in the event. Using a deadline would be 
simpler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]

2024-04-26 Thread via GitHub


appchemist commented on code in PR #15647:
URL: https://github.com/apache/kafka/pull/15647#discussion_r1580504115


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget,
 if (partitionData.currentLeader().leaderId() != -1 && 
partitionData.currentLeader().leaderEpoch() != -1) {
 partitionsWithUpdatedLeaderInfo.put(partition, new 
Metadata.LeaderIdAndEpoch(
 
Optional.of(partitionData.currentLeader().leaderId()), 
Optional.of(partitionData.currentLeader().leaderEpoch(;
+} else {
+requestMetadataUpdate(metadata, subscriptions, 
partition);
+subscriptions.awaitUpdate(partition);

Review Comment:
   As another alternative, it could change the status to `AWAIT_UPDATE` in 
`FetchCollector.handleInitializeErrors()` only when it doesn't include leader 
info
   Upon further thought, it seems possible to differentiate based on the 
following conditions.
   ```
   completedFetch.partitionData.currentLeader().leaderId() != -1 && 
completedFetch.partitionData.currentLeader().leaderEpoch() != -1
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once

2024-04-26 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841137#comment-17841137
 ] 

Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:40 AM:
-

if after the consumer reaches 1 and the 1st checkpoint is emitted, MM2 
restarts before the other 1 messages are produced,
then bug https://issues.apache.org/jira/browse/KAFKA-15905 hits and we end up 
with just two checkpoints, at 1 and 2.

 

but the problem here is that if the consumer never fully cathces up once, we 
will never have a checkpoint


was (Author: ecomar):
if after the consumer reaches 1 and the 1st checkpoint is emitted, MM2 
restarts before the other 1 messages are produced,
then bug https://issues.apache.org/jira/browse/KAFKA-15905 hits and we end up 
with just two checkpoints, at 1 and 2.


> Mirromaker2 first Checkpoint not emitted until consumer group fully catches 
> up once
> ---
>
> Key: KAFKA-16622
> URL: https://issues.apache.org/jira/browse/KAFKA-16622
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.0, 3.6.2, 3.8.0
>Reporter: Edoardo Comar
>Priority: Major
> Attachments: connect.log.2024-04-26-10.zip, 
> edo-connect-mirror-maker-sourcetarget.properties
>
>
> We observed an excessively delayed emission of the MM2 Checkpoint record.
> It only gets created when the source consumer reaches the end of a topic. 
> This does not seem reasonable.
> In a very simple setup :
> Tested with a standalone single process MirrorMaker2 mirroring between two 
> single-node kafka clusters(mirromaker config attached) with quick refresh 
> intervals (eg 5 sec) and a small offset.lag.max (eg 10)
> create a single topic in the source cluster
> produce data to it (e.g. 1 records)
> start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec 
> between polls which commits after each poll
> watch the Checkpoint topic in the target cluster
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \
>   --topic source.checkpoints.internal \
>   --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \
>--from-beginning
> -> no record appears in the checkpoint topic until the consumer reaches the 
> end of the topic (ie its consumer group lag gets down to 0).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once

2024-04-26 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841137#comment-17841137
 ] 

Edoardo Comar commented on KAFKA-16622:
---

if after the consumer reaches 1 and the 1st checkpoint is emitted, MM2 
restarts before the other 1 messages are produced,
then bug https://issues.apache.org/jira/browse/KAFKA-15905 hits and we end up 
with just two checkpoints, at 1 and 2.


> Mirromaker2 first Checkpoint not emitted until consumer group fully catches 
> up once
> ---
>
> Key: KAFKA-16622
> URL: https://issues.apache.org/jira/browse/KAFKA-16622
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.0, 3.6.2, 3.8.0
>Reporter: Edoardo Comar
>Priority: Major
> Attachments: connect.log.2024-04-26-10.zip, 
> edo-connect-mirror-maker-sourcetarget.properties
>
>
> We observed an excessively delayed emission of the MM2 Checkpoint record.
> It only gets created when the source consumer reaches the end of a topic. 
> This does not seem reasonable.
> In a very simple setup :
> Tested with a standalone single process MirrorMaker2 mirroring between two 
> single-node kafka clusters(mirromaker config attached) with quick refresh 
> intervals (eg 5 sec) and a small offset.lag.max (eg 10)
> create a single topic in the source cluster
> produce data to it (e.g. 1 records)
> start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec 
> between polls which commits after each poll
> watch the Checkpoint topic in the target cluster
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \
>   --topic source.checkpoints.internal \
>   --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \
>--from-beginning
> -> no record appears in the checkpoint topic until the consumer reaches the 
> end of the topic (ie its consumer group lag gets down to 0).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once

2024-04-26 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134
 ] 

Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:18 AM:
-

Hi [~gharris1727]
The task did not restart. Here are the trace logs generated in the following 
scenario.
clean start with the properties file posted above.
create 'mytopic' 1 topic with 1 partition
produce 10,000 messages to source cluster
start a single consumer in source cluster that in a loop
 - polls 100 messages
 - commitSync
 - waits 1 sec
e.g.

{{ }}

{{{color:#00}clientProps{color}.put({color:#00}ConsumerConfig{color}.{color:#871094}MAX_POLL_RECORDS_CONFIG{color},
 {color:#067d17}"100"{color});}}

{{{color:#0033b3}...
while {color}({color:#00}System{color}.currentTimeMillis() - 
{color:#00}now {color}< 
{color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}}
{{{color:#00} ConsumerRecords{color}<{color:#00}String{color}, 
{color:#00}String{color}> {color:#00}crs1 {color}= 
{color:#00}consumer{color}.poll({color:#00}Duration{color}.ofMillis({color:#1750eb}1000L{color}));}}
{{{color:#00} polledCount {color}= {color:#00}polledCount {color}+ 
print({color:#00}crs1{color}, {color:#00}consumer{color});}}
{{{color:#00} 
consumer{color}.commitSync({color:#00}Duration{color}.ofSeconds({color:#1750eb}10{color}));}}
{{{color:#00} Thread{color}.sleep({color:#1750eb}1000L{color});}}
{{}}}

the first checkpoint is only emitted when the consumer catches up fully at 
1.
Then other 1 messages are produced quickly and the consumer advances, and 
some checkpoints are emitted
so that overall we have

 
 
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=1, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=1, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=16700, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=16501, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=18200, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=18096, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=19200, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=18965, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=19700, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=19636, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=2, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=2, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}

[^connect.log.2024-04-26-10.zip]


was (Author: ecomar):
Hi [~gharris1727]
The task did not restart. Here are the trace logs generated in the following 
scenario.
clean start with the properties file posted above.
create 'mytopic' 1 topic with 1 partition
produce 10,000 messages to source cluster
start a single consumer in source cluster that in a loop
 - polls 100 messages
 - commitSync
 - waits 1 sec
e.g.

{{ }}

{{{color:#00}clientProps{color}.put({color:#00}ConsumerConfig{color}.{color:#871094}MAX_POLL_RECORDS_CONFIG{color},
 {color:#067d17}"100"{color});}}

{{{color:#0033b3}...
while {color}({color:#00}System{color}.currentTimeMillis() - 
{color:#00}now {color}< 
{color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}}
{{{color:#00} ConsumerRecords{color}<{color:#00}String{color}, 
{color:#00}String{color}> {color:#00}crs1 {color}= 

[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once

2024-04-26 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134
 ] 

Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:17 AM:
-

Hi [~gharris1727]
The task did not restart. Here are the trace logs generated in the following 
scenario.
clean start with the properties file posted above.
create 'mytopic' 1 topic with 1 partition
produce 10,000 messages to source cluster
start a single consumer in source cluster that in a loop
 - polls 100 messages
 - commitSync
 - waits 1 sec
e.g.

{{ }}

{{{color:#0033b3}while {color}({color:#00}System{color}.currentTimeMillis() 
- {color:#00}now {color}< 
{color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}}
{{{color:#00}ConsumerRecords{color}<{color:#00}String{color}, 
{color:#00}String{color}> {color:#00}crs1 {color}= 
{color:#00}consumer{color}.poll({color:#00}Duration{color}.ofMillis({color:#1750eb}1000L{color}));}}
{{{color:#00}polledCount {color}= {color:#00}polledCount {color}+ 
print({color:#00}crs1{color}, {color:#00}consumer{color});}}
{{{color:#00}consumer{color}.commitSync({color:#00}Duration{color}.ofSeconds({color:#1750eb}10{color}));}}
{{{color:#00}Thread{color}.sleep({color:#1750eb}1000L{color});}}
{{}}}

the first checkpoint is only emitted when the consumer catches up fully at 
1.
Then other 1 messages are produced quickly and the consumer advances, and 
some checkpoints are emitted
so that overall we have

 
 
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=1, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=1, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=16700, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=16501, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=18200, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=18096, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=19200, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=18965, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=19700, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=19636, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=2, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=2, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}

[^connect.log.2024-04-26-10.zip]{}}}


was (Author: ecomar):
Hi [~gharris1727]
The task did not restart. Here are the trace logs generated in the following 
scenario.
clean start with the properties file posted above.
create 'mytopic' 1 topic with 1 partition
produce 10,000 messages to source cluster
start a single consumer in source cluster that in a loop
 - polls 100 messages
 - commitSync
 - waits 1 sec
e.g.

{{ }}
{{}}

{{{color:#0033b3}while {color}({color:#00}System{color}.currentTimeMillis() 
- {color:#00}now {color}< 
{color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}}
{{{color:#00}ConsumerRecords{color}<{color:#00}String{color}, 
{color:#00}String{color}> {color:#00}crs1 {color}= 
{color:#00}consumer{color}.poll({color:#00}Duration{color}.ofMillis({color:#1750eb}1000L{color}));}}
{{{color:#00}polledCount {color}= {color:#00}polledCount {color}+ 
print({color:#00}crs1{color}, {color:#00}consumer{color});}}
{{{color:#00}consumer{color}.commitSync({color:#00}Duration{color}.ofSeconds({color:#1750eb}10{color}));}}

[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once

2024-04-26 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134
 ] 

Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:17 AM:
-

Hi [~gharris1727]
The task did not restart. Here are the trace logs generated in the following 
scenario.
clean start with the properties file posted above.
create 'mytopic' 1 topic with 1 partition
produce 10,000 messages to source cluster
start a single consumer in source cluster that in a loop
 - polls 100 messages
 - commitSync
 - waits 1 sec
e.g.

{{ }}

{{{color:#00}clientProps{color}.put({color:#00}ConsumerConfig{color}.{color:#871094}MAX_POLL_RECORDS_CONFIG{color},
 {color:#067d17}"100"{color});}}

{{{color:#0033b3}...
while {color}({color:#00}System{color}.currentTimeMillis() - 
{color:#00}now {color}< 
{color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}}
{{{color:#00} ConsumerRecords{color}<{color:#00}String{color}, 
{color:#00}String{color}> {color:#00}crs1 {color}= 
{color:#00}consumer{color}.poll({color:#00}Duration{color}.ofMillis({color:#1750eb}1000L{color}));}}
{{{color:#00} polledCount {color}= {color:#00}polledCount {color}+ 
print({color:#00}crs1{color}, {color:#00}consumer{color});}}
{{{color:#00} 
consumer{color}.commitSync({color:#00}Duration{color}.ofSeconds({color:#1750eb}10{color}));}}
{{{color:#00} Thread{color}.sleep({color:#1750eb}1000L{color});}}
{{}}}

the first checkpoint is only emitted when the consumer catches up fully at 
1.
Then other 1 messages are produced quickly and the consumer advances, and 
some checkpoints are emitted
so that overall we have

 
 
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=1, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=1, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=16700, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=16501, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=18200, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=18096, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=19200, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=18965, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=19700, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=19636, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=2, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=2, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}

[^connect.log.2024-04-26-10.zip]{}}}


was (Author: ecomar):
Hi [~gharris1727]
The task did not restart. Here are the trace logs generated in the following 
scenario.
clean start with the properties file posted above.
create 'mytopic' 1 topic with 1 partition
produce 10,000 messages to source cluster
start a single consumer in source cluster that in a loop
 - polls 100 messages
 - commitSync
 - waits 1 sec
e.g.

{{ }}

{{{color:#0033b3}while {color}({color:#00}System{color}.currentTimeMillis() 
- {color:#00}now {color}< 
{color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}}
{{{color:#00}ConsumerRecords{color}<{color:#00}String{color}, 
{color:#00}String{color}> {color:#00}crs1 {color}= 
{color:#00}consumer{color}.poll({color:#00}Duration{color}.ofMillis({color:#1750eb}1000L{color}));}}
{{{color:#00}polledCount {color}= {color:#00}polledCount {color}+ 
print({color:#00}crs1{color}, 

[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once

2024-04-26 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134
 ] 

Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:16 AM:
-

Hi [~gharris1727]
The task did not restart. Here are the trace logs generated in the following 
scenario.
clean start with the properties file posted above.
create 'mytopic' 1 topic with 1 partition
produce 10,000 messages to source cluster
start a single consumer in source cluster that in a loop
 - polls 100 messages
 - commitSync
 - waits 1 sec
e.g.

{{ }}
{{}}

{{{color:#0033b3}while {color}({color:#00}System{color}.currentTimeMillis() 
- {color:#00}now {color}< 
{color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}}
{{{color:#00}ConsumerRecords{color}<{color:#00}String{color}, 
{color:#00}String{color}> {color:#00}crs1 {color}= 
{color:#00}consumer{color}.poll({color:#00}Duration{color}.ofMillis({color:#1750eb}1000L{color}));}}
{{{color:#00}polledCount {color}= {color:#00}polledCount {color}+ 
print({color:#00}crs1{color}, {color:#00}consumer{color});}}
{{{color:#00}consumer{color}.commitSync({color:#00}Duration{color}.ofSeconds({color:#1750eb}10{color}));}}
{{{color:#00}Thread{color}.sleep({color:#1750eb}1000L{color});}}
{{}}}

the first checkpoint is only emitted when the consumer catches up fully at 
1.
Then other 1 messages are produced quickly and the consumer advances, and 
some checkpoints are emitted
so that overall we have

 
 
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=1, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=1, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=16700, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=16501, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=18200, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=18096, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=19200, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=18965, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=19700, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=19636, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}
{color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1,
 {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, 
{color}{color:#ff}upstreamOffset{color}{color:#00}=2, 
{color}{color:#ff}downstreamOffset{color}{color:#00}=2, 
{color}{color:#ff}metadata{color}{color:#00}=}{color}

[^connect.log.2024-04-26-10.zip]{}}}


was (Author: ecomar):
Hi [~gharris1727]
The task did not restart. Here are the trace logs generated in the following 
scenario.
clean start with the properties file posted above.
create 'mytopic' 1 topic with 1 partition
produce 10,000 messages to source cluster
start a single consumer in source cluster that in a loop
 - polls 100 messages
 - commitSync
 - waits 1 sec
e.g.

{{ }}
{{{}while (...){}}}{{{}{ {}}}

{{ConsumerRecords crs1 = 
consumer.poll(Duration.ofMillis(1000L)); }}

{{// print(crs1, consumer); print last record of polled 
consumer.commitSync(Duration.ofSeconds(10)); }}

{{Thread.sleep(1000L); }}

{{}}}

the first checkpoint is only emitted when the consumer catches up fully at 
1.
Then other 1 messages are produced quickly and the consumer advances, and 
some checkpoints are emitted
so that overall we have


{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, 
topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, 
metadata=}{}}}{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, 
topicPartition=source.mytopic-0, upstreamOffset=16700, 

[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once

2024-04-26 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134
 ] 

Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:15 AM:
-

Hi [~gharris1727]
The task did not restart. Here are the trace logs generated in the following 
scenario.
clean start with the properties file posted above.
create 'mytopic' 1 topic with 1 partition
produce 10,000 messages to source cluster
start a single consumer in source cluster that in a loop
 - polls 100 messages
 - commitSync
 - waits 1 sec
e.g.

{{ }}
{{{}while (...){}}}{{{}{ {}}}

{{ConsumerRecords crs1 = 
consumer.poll(Duration.ofMillis(1000L)); }}

{{// print(crs1, consumer); print last record of polled 
consumer.commitSync(Duration.ofSeconds(10)); }}

{{Thread.sleep(1000L); }}

{{}}}

the first checkpoint is only emitted when the consumer catches up fully at 
1.
Then other 1 messages are produced quickly and the consumer advances, and 
some checkpoints are emitted
so that overall we have


{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, 
topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, 
metadata=}{}}}{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, 
topicPartition=source.mytopic-0, upstreamOffset=16700, downstreamOffset=16501, 
metadata=}{}}}{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, 
topicPartition=source.mytopic-0, upstreamOffset=18200, downstreamOffset=18096, 
metadata=}{}}}{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, 
topicPartition=source.mytopic-0, upstreamOffset=19200, downstreamOffset=18965, 
metadata=}{}}}{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, 
topicPartition=source.mytopic-0, upstreamOffset=19700, downstreamOffset=19636, 
metadata=}{}}}{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, 
topicPartition=source.mytopic-0, upstreamOffset=2, downstreamOffset=2, 
metadata=}{}}}{{{}{}}}{{{}
[^connect.log.2024-04-26-10.zip]{}}}


was (Author: ecomar):
Hi [~gharris1727]
The task did not restart. Here are the trace logs generated in the following 
scenario.
clean start with the properties file posted above.
create 'mytopic' 1 topic with 1 partition
produce 10,000 messages to source cluster
start a single consumer in source cluster that in a loop
- polls 100 messages
- commitSync
- waits 1 sec
e.g.

 {{   
while (...) {
ConsumerRecords crs1 = 
consumer.poll(Duration.ofMillis(1000L));
   // print(crs1, consumer); print last record of polled
consumer.commitSync(Duration.ofSeconds(10));
Thread.sleep(1000L);
  }
}}

the first checkpoint is only emitted when the consumer catches up fully at 
1.
Then other 1 messages are produced quickly and the consumer advances, and 
some checkpoints are emitted
so that overall we have

{{
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=1, downstreamOffset=1, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=16700, downstreamOffset=16501, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=18200, downstreamOffset=18096, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19200, downstreamOffset=18965, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19700, downstreamOffset=19636, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=2, downstreamOffset=2, metadata=}
}}

 [^connect.log.2024-04-26-10.zip] 

> Mirromaker2 first Checkpoint not emitted until consumer group fully catches 
> up once
> ---
>
> Key: KAFKA-16622
> URL: https://issues.apache.org/jira/browse/KAFKA-16622
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.0, 3.6.2, 3.8.0
>Reporter: Edoardo Comar
>Priority: Major
> Attachments: connect.log.2024-04-26-10.zip, 
> edo-connect-mirror-maker-sourcetarget.properties
>
>
> We observed an excessively delayed emission of the MM2 Checkpoint record.
> It only gets created when the source consumer reaches the end of a topic. 
> This does not seem reasonable.
> In a very simple setup :
> Tested with a standalone single process MirrorMaker2 mirroring between two 
> single-node kafka clusters(mirromaker config attached) with quick refresh 
> intervals (eg 5 sec) and a small offset.lag.max (eg 10)
> create a single topic in the source cluster
> produce data to it (e.g. 1 records)
> start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec 
> between polls which commits after each poll
> watch the Checkpoint topic in the 

[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once

2024-04-26 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134
 ] 

Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:13 AM:
-

Hi [~gharris1727]
The task did not restart. Here are the trace logs generated in the following 
scenario.
clean start with the properties file posted above.
create 'mytopic' 1 topic with 1 partition
produce 10,000 messages to source cluster
start a single consumer in source cluster that in a loop
- polls 100 messages
- commitSync
- waits 1 sec
e.g.

 {{   
while (...) {
ConsumerRecords crs1 = 
consumer.poll(Duration.ofMillis(1000L));
   // print(crs1, consumer); print last record of polled
consumer.commitSync(Duration.ofSeconds(10));
Thread.sleep(1000L);
  }
}}

the first checkpoint is only emitted when the consumer catches up fully at 
1.
Then other 1 messages are produced quickly and the consumer advances, and 
some checkpoints are emitted
so that overall we have

{{
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=1, downstreamOffset=1, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=16700, downstreamOffset=16501, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=18200, downstreamOffset=18096, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19200, downstreamOffset=18965, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19700, downstreamOffset=19636, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=2, downstreamOffset=2, metadata=}
}}

 [^connect.log.2024-04-26-10.zip] 


was (Author: ecomar):
Hi [~gharris1727]
The task did not restart. Here are the trace logs generated in the following 
scenario.
clean start with the properties file posted above.
create 'mytopic' 1 topic with 1 partition
produce 10,000 messages to source cluster
start a single consumer in source cluster that in a loop
- polls 100 messages
- commitSync
- waits 1 sec
e.g.
```
 {{   while (...) {
ConsumerRecords crs1 = 
consumer.poll(Duration.ofMillis(1000L));
   // print(crs1, consumer); print last record of polled
consumer.commitSync(Duration.ofSeconds(10));
Thread.sleep(1000L);
  }
}}```

the first checkpoint is only emitted when the consumer catches up fully at 
1.
Then other 1 messages are produced quickly and the consumer advances, and 
some checkpoints are emitted
so that overall we have
```
{{Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=1, downstreamOffset=1, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=16700, downstreamOffset=16501, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=18200, downstreamOffset=18096, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19200, downstreamOffset=18965, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19700, downstreamOffset=19636, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=2, downstreamOffset=2, metadata=}}}
```
 [^connect.log.2024-04-26-10.zip] 

> Mirromaker2 first Checkpoint not emitted until consumer group fully catches 
> up once
> ---
>
> Key: KAFKA-16622
> URL: https://issues.apache.org/jira/browse/KAFKA-16622
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.0, 3.6.2, 3.8.0
>Reporter: Edoardo Comar
>Priority: Major
> Attachments: connect.log.2024-04-26-10.zip, 
> edo-connect-mirror-maker-sourcetarget.properties
>
>
> We observed an excessively delayed emission of the MM2 Checkpoint record.
> It only gets created when the source consumer reaches the end of a topic. 
> This does not seem reasonable.
> In a very simple setup :
> Tested with a standalone single process MirrorMaker2 mirroring between two 
> single-node kafka clusters(mirromaker config attached) with quick refresh 
> intervals (eg 5 sec) and a small offset.lag.max (eg 10)
> create a single topic in the source cluster
> produce data to it (e.g. 1 records)
> start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec 
> between polls which commits after each poll
> watch the Checkpoint topic in the target cluster
> bin/kafka-console-consumer.sh 

[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once

2024-04-26 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134
 ] 

Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:12 AM:
-

Hi [~gharris1727]
The task did not restart. Here are the trace logs generated in the following 
scenario.
clean start with the properties file posted above.
create 'mytopic' 1 topic with 1 partition
produce 10,000 messages to source cluster
start a single consumer in source cluster that in a loop
- polls 100 messages
- commitSync
- waits 1 sec
e.g.
```
 {{   while (...) {
ConsumerRecords crs1 = 
consumer.poll(Duration.ofMillis(1000L));
   // print(crs1, consumer); print last record of polled
consumer.commitSync(Duration.ofSeconds(10));
Thread.sleep(1000L);
  }
}}```

the first checkpoint is only emitted when the consumer catches up fully at 
1.
Then other 1 messages are produced quickly and the consumer advances, and 
some checkpoints are emitted
so that overall we have
```
{{Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=1, downstreamOffset=1, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=16700, downstreamOffset=16501, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=18200, downstreamOffset=18096, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19200, downstreamOffset=18965, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19700, downstreamOffset=19636, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=2, downstreamOffset=2, metadata=}}}
```
 [^connect.log.2024-04-26-10.zip] 


was (Author: ecomar):
Hi [~gharris1727]
The task did not restart. Here are the trace logs generated in the following 
scenario.
clean start with the properties file posted above.
create 'mytopic' 1 topic with 1 partition
produce 10,000 messages to source cluster
start a single consumer in source cluster that in a loop
- polls 100 messages
- commitSync
- waits 1 sec
e.g.
```
while (...) {
ConsumerRecords crs1 = 
consumer.poll(Duration.ofMillis(1000L));
   // print(crs1, consumer); print last record of polled
consumer.commitSync(Duration.ofSeconds(10));
Thread.sleep(1000L);
  }
```

the first checkpoint is only emitted when the consumer catches up fully at 
1.
Then other 1 messages are produced quickly and the consumer advances, and 
some checkpoints are emitted
so that overall we have
```
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=1, downstreamOffset=1, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=16700, downstreamOffset=16501, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=18200, downstreamOffset=18096, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19200, downstreamOffset=18965, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=19700, downstreamOffset=19636, metadata=}
Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, 
upstreamOffset=2, downstreamOffset=2, metadata=}
```
 [^connect.log.2024-04-26-10.zip] 

> Mirromaker2 first Checkpoint not emitted until consumer group fully catches 
> up once
> ---
>
> Key: KAFKA-16622
> URL: https://issues.apache.org/jira/browse/KAFKA-16622
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.0, 3.6.2, 3.8.0
>Reporter: Edoardo Comar
>Priority: Major
> Attachments: connect.log.2024-04-26-10.zip, 
> edo-connect-mirror-maker-sourcetarget.properties
>
>
> We observed an excessively delayed emission of the MM2 Checkpoint record.
> It only gets created when the source consumer reaches the end of a topic. 
> This does not seem reasonable.
> In a very simple setup :
> Tested with a standalone single process MirrorMaker2 mirroring between two 
> single-node kafka clusters(mirromaker config attached) with quick refresh 
> intervals (eg 5 sec) and a small offset.lag.max (eg 10)
> create a single topic in the source cluster
> produce data to it (e.g. 1 records)
> start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec 
> between polls which commits after each poll
> watch the Checkpoint topic in the target cluster
> bin/kafka-console-consumer.sh 

  1   2   >