Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15821: URL: https://github.com/apache/kafka/pull/15821#issuecomment-2080374723 Rely on https://github.com/apache/kafka/pull/15766. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
FrankYang0529 opened a new pull request, #15821: URL: https://github.com/apache/kafka/pull/15821 By using `ClusterTestExtensions`, `ListConsumerGroupTest` get get away from `KafkaServerTestHarness` dependency. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14892) Harmonize package names in storage module
[ https://issues.apache.org/jira/browse/KAFKA-14892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841398#comment-17841398 ] Giuseppe Calabrese edited comment on KAFKA-14892 at 4/27/24 5:16 AM: - [~abhijeetkumar] [~chia7712] any ideas about it? ** was (Author: JIRAUSER305174): [~abhijeetkumar] [~chia7712] any ideas about it? ** > Harmonize package names in storage module > - > > Key: KAFKA-14892 > URL: https://issues.apache.org/jira/browse/KAFKA-14892 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Priority: Major > Fix For: 3.8.0 > > > We currently have: > # org.apache.kafka.server.log.remote.storage: public api in storage-api > module > # org.apache.kafka.server.log.remote: private api in storage module > # org.apache.kafka.storage.internals.log: private api in storage module > A way to make this consistent could be: > # org.apache.kafka.storage.* or org.apache.kafka.storage.api.*: public api > in storage-api module > # org.apache.kafka.storage.internals.log.remote: private api in storage > module > # org.apache.kafka.storage.internals.log: private api in storage module > (stays the same) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [WIP] Kafka 15434: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 opened a new pull request, #15820: URL: https://github.com/apache/kafka/pull/15820 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14892) Harmonize package names in storage module
[ https://issues.apache.org/jira/browse/KAFKA-14892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841398#comment-17841398 ] Giuseppe Calabrese commented on KAFKA-14892: [~abhijeetkumar] [~chia7712] any ideas about it? ** > Harmonize package names in storage module > - > > Key: KAFKA-14892 > URL: https://issues.apache.org/jira/browse/KAFKA-14892 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Priority: Major > Fix For: 3.8.0 > > > We currently have: > # org.apache.kafka.server.log.remote.storage: public api in storage-api > module > # org.apache.kafka.server.log.remote: private api in storage module > # org.apache.kafka.storage.internals.log: private api in storage module > A way to make this consistent could be: > # org.apache.kafka.storage.* or org.apache.kafka.storage.api.*: public api > in storage-api module > # org.apache.kafka.storage.internals.log.remote: private api in storage > module > # org.apache.kafka.storage.internals.log: private api in storage module > (stays the same) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16514: backdoor for modifying internal leave.group.on.close config [kafka]
ableegoldman commented on code in PR #15819: URL: https://github.com/apache/kafka/pull/15819#discussion_r1581729152 ## clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java: ## @@ -83,6 +84,14 @@ public GroupRebalanceConfig(AbstractConfig config, ProtocolType protocolType) { } } +public void setLeaveGroupOnClose(final boolean leaveGroupOnClose) { Review Comment: Damn. Didn't realize this class was public. I'll refactor to avoid changing APIs here, thanks for pointing it out -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1581724166 ## core/src/test/java/kafka/test/ClusterConfigTest.java: ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.test.annotation.Type; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Optional; + +public class ClusterConfigTest { + +@Test +public void testClusterConfigBuilder() throws IOException { +File trustStoreFile = TestUtils.tempFile(); + +ClusterConfig clusterConfig = ClusterConfig.builder() +.setType(Type.KRAFT) +.setBrokers(1) +.setControllers(1) +.setName("builder-test") +.setAutoStart(true) +.setSecurityProtocol(SecurityProtocol.PLAINTEXT) +.setListenerName("EXTERNAL") +.setTrustStoreFile(trustStoreFile) +.setMetadataVersion(MetadataVersion.IBP_0_8_0) +.setServerProperties(Collections.singletonMap("broker", "broker_value")) +.setConsumerProperties(Collections.singletonMap("consumer", "consumer_value")) +.setProducerProperties(Collections.singletonMap("producer", "producer_value")) + .setAdminClientProperties(Collections.singletonMap("admin_client", "admin_client_value")) + .setSaslClientProperties(Collections.singletonMap("sasl_client", "sasl_client_value")) + .setSaslServerProperties(Collections.singletonMap("sasl_server", "sasl_server_value")) +.setPerBrokerProperties(Collections.singletonMap(0, Collections.singletonMap("broker_0", "broker_0_value"))) +.build(); + +Assertions.assertEquals(Type.KRAFT, clusterConfig.clusterType()); Review Comment: Sign... But we may still overlook add fields in `equals()` and `hashCode`. I'll address the test as you provided in this comment. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1581723855 ## core/src/test/java/kafka/test/ClusterConfigTest.java: ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.test.annotation.Type; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Optional; + +public class ClusterConfigTest { + +@Test +public void testClusterConfigBuilder() throws IOException { +File trustStoreFile = TestUtils.tempFile(); + +ClusterConfig clusterConfig = ClusterConfig.builder() +.setType(Type.KRAFT) +.setBrokers(1) +.setControllers(1) +.setName("builder-test") +.setAutoStart(true) +.setSecurityProtocol(SecurityProtocol.PLAINTEXT) +.setListenerName("EXTERNAL") +.setTrustStoreFile(trustStoreFile) +.setMetadataVersion(MetadataVersion.IBP_0_8_0) +.setServerProperties(Collections.singletonMap("broker", "broker_value")) +.setConsumerProperties(Collections.singletonMap("consumer", "consumer_value")) +.setProducerProperties(Collections.singletonMap("producer", "producer_value")) + .setAdminClientProperties(Collections.singletonMap("admin_client", "admin_client_value")) + .setSaslClientProperties(Collections.singletonMap("sasl_client", "sasl_client_value")) + .setSaslServerProperties(Collections.singletonMap("sasl_server", "sasl_server_value")) +.setPerBrokerProperties(Collections.singletonMap(0, Collections.singletonMap("broker_0", "broker_0_value"))) +.build(); + +Assertions.assertEquals(Type.KRAFT, clusterConfig.clusterType()); Review Comment: Sorry, I missed the important part in this test. I'm going to override `equals()` and `hashCode` in ClusterConfig instead of using reflect method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16629) add broker-related tests to ConfigCommandIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-16629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16629: -- Assignee: 黃竣陽 (was: Chia-Ping Tsai) > add broker-related tests to ConfigCommandIntegrationTest > > > Key: KAFKA-16629 > URL: https://issues.apache.org/jira/browse/KAFKA-16629 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: 黃竣陽 >Priority: Major > > [https://github.com/apache/kafka/pull/15645] will rewrite the > ConfigCommandIntegrationTest by java and new test infra. However, it still > lacks of enough tests for broker-related configs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16629) add broker-related tests to ConfigCommandIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-16629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841391#comment-17841391 ] 黃竣陽 commented on KAFKA-16629: - I will handle this issue. > add broker-related tests to ConfigCommandIntegrationTest > > > Key: KAFKA-16629 > URL: https://issues.apache.org/jira/browse/KAFKA-16629 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > [https://github.com/apache/kafka/pull/15645] will rewrite the > ConfigCommandIntegrationTest by java and new test infra. However, it still > lacks of enough tests for broker-related configs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Add Integration tests for the the KIP-966 [kafka]
artemlivshits commented on code in PR #15759: URL: https://github.com/apache/kafka/pull/15759#discussion_r1581642961 ## core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java: ## @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.integration; + +import kafka.server.KafkaBroker; +import kafka.server.KafkaConfig; +import kafka.utils.Logging; +import kafka.utils.TestUtils; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import scala.collection.JavaConverters; +import scala.collection.Seq; +import scala.collection.mutable.HashMap; + +import java.io.File; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class EligibleLeaderReplicasIntegrationTest extends kafka.integration.KafkaServerTestHarness implements Logging { +private String bootstrapServer; +private String testTopicName; +private Admin adminClient; +@Override +public Seq generateConfigs() { +List brokerConfigs = new ArrayList<>(); + brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs( +5, +zkConnectOrNull(), +true, +true, +scala.Option.empty(), +scala.Option.empty(), +scala.Option.empty(), +true, +false, +false, +false, +new HashMap<>(), +1, +false, +1, +(short) 4, +0, +false +))); +List configs = new ArrayList<>(); +for (Properties props : brokerConfigs) { +props.put(KafkaConfig.ElrEnabledProp(), "true"); +configs.add(KafkaConfig.fromProps(props)); +} +return JavaConverters.asScalaBuffer(configs).toSeq(); +} + +@Override +public Seq kraftControllerConfigs() { +Properties properties = new Properties(); +properties.put(KafkaConfig.ElrEnabledProp(), "true"); +return new scala.collection.mutable.ListBuffer().addOne(properties); +} + +@BeforeEach +public void setUp(TestInfo info) { +super.setUp(info); +// create adminClient +Properties props = new Properties(); +bootstrapServer = bootstrapServers(listenerName()); +props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]
chia7712 commented on PR #15744: URL: https://github.com/apache/kafka/pull/15744#issuecomment-2080249371 @mumrah I'm sorry but could you please fix the conflicts again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP]KAFKA-16614:Disallow @ClusterTemplate("") [kafka]
chia7712 commented on code in PR #15800: URL: https://github.com/apache/kafka/pull/15800#discussion_r1581630236 ## core/src/test/java/kafka/test/ClusterTestExtensionsUnitTest.java: ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.test.junit.ClusterTestExtensions; +import kafka.test.annotation.ClusterTemplate; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.TestTemplateInvocationContext; +import java.lang.reflect.Method; +import java.util.function.Consumer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ClusterTestExtensionsUnitTest { +@Test +void testProcessClusterTemplate() throws Exception { +ClusterTestExtensions ext = mock(ClusterTestExtensions.class); +ExtensionContext context = mock(ExtensionContext.class); +Consumer testInvocations = mock(Consumer.class); +ClusterTemplate annot = mock(ClusterTemplate.class); +when(annot.value()).thenReturn(""); + +Method method = ClusterTestExtensions.class.getDeclaredMethod("processClusterTemplate", Review Comment: Maybe we can make `processClusterTemplate` be package-private, and then we just test that the method can throw exception when value is empty -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16514: backdoor for modifying internal leave.group.on.close config [kafka]
mjsax commented on code in PR #15819: URL: https://github.com/apache/kafka/pull/15819#discussion_r1581626291 ## clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java: ## @@ -83,6 +84,14 @@ public GroupRebalanceConfig(AbstractConfig config, ProtocolType protocolType) { } } +public void setLeaveGroupOnClose(final boolean leaveGroupOnClose) { Review Comment: Looks like a public API change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-6527) Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig
[ https://issues.apache.org/jira/browse/KAFKA-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-6527. --- Resolution: Fixed It is enabled by https://github.com/apache/kafka/pull/15796 > Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig > > > Key: KAFKA-6527 > URL: https://issues.apache.org/jira/browse/KAFKA-6527 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: TaiJuWu >Priority: Blocker > Labels: flakey, flaky-test > Fix For: 3.8.0 > > > {code:java} > java.lang.AssertionError: Log segment size increase not applied > at kafka.utils.TestUtils$.fail(TestUtils.scala:355) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865) > at > kafka.server.DynamicBrokerReconfigurationTest.testDefaultTopicConfig(DynamicBrokerReconfigurationTest.scala:348) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] enable test, check if continues failing in CI [kafka]
chia7712 commented on PR #13953: URL: https://github.com/apache/kafka/pull/13953#issuecomment-2080246965 close as that is resolved by #15796 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] enable test, check if continues failing in CI [kafka]
chia7712 closed pull request #13953: enable test, check if continues failing in CI URL: https://github.com/apache/kafka/pull/13953 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-6527) Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig
[ https://issues.apache.org/jira/browse/KAFKA-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-6527: - Assignee: TaiJuWu > Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig > > > Key: KAFKA-6527 > URL: https://issues.apache.org/jira/browse/KAFKA-6527 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: TaiJuWu >Priority: Blocker > Labels: flakey, flaky-test > Fix For: 3.8.0 > > > {code:java} > java.lang.AssertionError: Log segment size increase not applied > at kafka.utils.TestUtils$.fail(TestUtils.scala:355) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865) > at > kafka.server.DynamicBrokerReconfigurationTest.testDefaultTopicConfig(DynamicBrokerReconfigurationTest.scala:348) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-6527:Enable DynamicBrokerReconfigurationTest.testDefaultTopicConfig [kafka]
chia7712 merged PR #15796: URL: https://github.com/apache/kafka/pull/15796 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-6527:Enable DynamicBrokerReconfigurationTest.testDefaultTopicConfig [kafka]
chia7712 commented on PR #15796: URL: https://github.com/apache/kafka/pull/15796#issuecomment-2080246076 Let's ship it to run it in the QA ASAP -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]
chia7712 commented on PR #15645: URL: https://github.com/apache/kafka/pull/15645#issuecomment-2080244534 @nizhikov I'm ok to accept this PR to be a java rewriting, and I file https://issues.apache.org/jira/browse/KAFKA-16629 to add more tests in the future. Hence, please fix comment (https://github.com/apache/kafka/pull/15645#discussion_r1581621779) and I will take a look later to ship it ASAP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16629) add broker-related tests to ConfigCommandIntegrationTest
Chia-Ping Tsai created KAFKA-16629: -- Summary: add broker-related tests to ConfigCommandIntegrationTest Key: KAFKA-16629 URL: https://issues.apache.org/jira/browse/KAFKA-16629 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai [https://github.com/apache/kafka/pull/15645] will rewrite the ConfigCommandIntegrationTest by java and new test infra. However, it still lacks of enough tests for broker-related configs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]
chia7712 commented on code in PR #15645: URL: https://github.com/apache/kafka/pull/15645#discussion_r1581621779 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin; + +import kafka.cluster.Broker; +import kafka.cluster.EndPoint; +import kafka.test.ClusterInstance; +import kafka.test.annotation.AutoStart; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.ClusterTests; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import kafka.test.junit.ZkClusterInvocationContext; +import kafka.zk.AdminZkClient; +import kafka.zk.BrokerInfo; +import kafka.zk.KafkaZkClient; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.security.PasswordEncoder; +import org.apache.kafka.security.PasswordEncoderConfigs; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.config.ZooKeeperInternals; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.extension.ExtendWith; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ZK) +@Tag("integration") +public class ConfigCommandIntegrationTest { +AdminZkClient adminZkClient; +List alterOpts; + +private final ClusterInstance cluster; + +public ConfigCommandIntegrationTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@ClusterTests({ +@ClusterTest(clusterType = Type.ZK), +@ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 1), Review Comment: That is caused by `--entity-name", "0"`: you define a nonexistent broker id when you only have a single broker. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1581608377 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupExecutor.java: ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.consumer.group; + +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Utils; + +import java.time.Duration; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.util.Collections.singleton; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.common.GroupType.CONSUMER; + +class ConsumerGroupExecutor { + +static AutoCloseable buildConsumerGroup(String brokerAddress, +int numberOfConsumers, +String groupId, +String topic, +String groupProtocol, +Optional remoteAssignor, +Map customConfigs, +boolean syncCommit) { +return buildConsumers( +brokerAddress, +numberOfConsumers, +groupId, +groupProtocol, +topic, +RangeAssignor.class.getName(), +remoteAssignor, +customConfigs, +syncCommit +); +} + +static AutoCloseable buildClassicGroup(String brokerAddress, + int numberOfConsumers, + String groupId, + String topic, + String assignmentStrategy, + Map customConfigs, + boolean syncCommit) { +return buildConsumers( +brokerAddress, +numberOfConsumers, +groupId, +GroupProtocol.CLASSIC.name, +topic, +assignmentStrategy, +Optional.empty(), +customConfigs, +syncCommit +); +} + +private ConsumerGroupExecutor() { +} + +private static AutoCloseable buildConsumers( +String brokerAddress, +int numberOfConsumers, +String groupId, +String groupProtocol, +String topic, +String assignmentStrategy, +Optional remoteAssignor, +Map customConfigs, +boolean syncCommit +) { +List> allConfigs = IntStream.range(0, numberOfConsumers) +.mapToObj(ignored -> +composeConfigs( +
Re: [PR] MINOR: Clean up TestUtils.scala [kafka]
chia7712 commented on PR #15808: URL: https://github.com/apache/kafka/pull/15808#issuecomment-2080226483 @m1a2st Could you please fix conflicts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]
chia7712 commented on PR #15788: URL: https://github.com/apache/kafka/pull/15788#issuecomment-2080225060 @johnnychhsu Could you please fix conflicts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in generator [kafka]
chia7712 merged PR #15807: URL: https://github.com/apache/kafka/pull/15807 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: refine DeleteOffsetsConsumerGroupCommandIntegrationTest#produceRecord [kafka]
chia7712 merged PR #15802: URL: https://github.com/apache/kafka/pull/15802 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KRAFT configs out of KafkaConfig [kafka]
chia7712 merged PR #15775: URL: https://github.com/apache/kafka/pull/15775 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1581597711 ## core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala: ## @@ -71,11 +72,16 @@ class KafkaServerKRaftRegistrationTest { val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3) // Enable migration configs and restart brokers - zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true") - zkCluster.config().serverProperties().put(QuorumConfig.QUORUM_VOTERS_CONFIG, kraftCluster.quorumVotersConfig()) - zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") - zkCluster.config().serverProperties().put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT") - zkCluster.rollingBrokerRestart() + val serverProperties = new java.util.HashMap[String, String]() Review Comment: ```java val serverProperties = new util.HashMap[String, String](zkCluster.config().serverProperties(zkCluster.config().serverProperties())) ``` ## core/src/test/java/kafka/test/ClusterConfigTest.java: ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import kafka.test.annotation.Type; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Optional; + +public class ClusterConfigTest { + +@Test +public void testClusterConfigBuilder() throws IOException { +File trustStoreFile = TestUtils.tempFile(); + +ClusterConfig clusterConfig = ClusterConfig.builder() +.setType(Type.KRAFT) +.setBrokers(1) +.setControllers(1) +.setName("builder-test") +.setAutoStart(true) +.setSecurityProtocol(SecurityProtocol.PLAINTEXT) +.setListenerName("EXTERNAL") +.setTrustStoreFile(trustStoreFile) +.setMetadataVersion(MetadataVersion.IBP_0_8_0) +.setServerProperties(Collections.singletonMap("broker", "broker_value")) +.setConsumerProperties(Collections.singletonMap("consumer", "consumer_value")) +.setProducerProperties(Collections.singletonMap("producer", "producer_value")) + .setAdminClientProperties(Collections.singletonMap("admin_client", "admin_client_value")) + .setSaslClientProperties(Collections.singletonMap("sasl_client", "sasl_client_value")) + .setSaslServerProperties(Collections.singletonMap("sasl_server", "sasl_server_value")) +.setPerBrokerProperties(Collections.singletonMap(0, Collections.singletonMap("broker_0", "broker_0_value"))) +.build(); + +Assertions.assertEquals(Type.KRAFT, clusterConfig.clusterType()); Review Comment: Dear friend, this test can't protect us from overlooking the field copy since we don't check the field added in the future, right? Maybe we can use reflection to get all fields and then check all dynamically. ```java private static Map fields(ClusterConfig config) { return Arrays.stream(config.getClass().getDeclaredFields()).collect(Collectors.toMap(Field::getName, f -> { f.setAccessible(true); return Assertions.assertDoesNotThrow(() -> f.get(config)); })); } @Test public void testClusterConfigBuilder() throws IOException { File trustStoreFile = TestUtils.tempFile(); ClusterConfig clusterConfig = ClusterConfig.builder() .setType(Type.KRAFT) .setBrokers(3) .setControllers(2) .setName("builder-test") .setAutoStart(true)
Re: [PR] MINOR: Change the Brokers field documentation . [kafka]
cmccabe merged PR #15809: URL: https://github.com/apache/kafka/pull/15809 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841374#comment-17841374 ] A. Sophie Blee-Goldman commented on KAFKA-16514: I opened a quick example PR to showcase more clearly what I'm proposing here. It's definitely rather hacky, but as I said already, this functionality of not leaving the group when a consumer is closed was done in a hacky way to begin with (ie via internal consumer config introduced for use by Kafka Streams only). So we may as well fix this issue so the Streams closeOptions have correct semantics The more I think about this the more I feel strongly that it's just silly for Streams users to be unable to opt-out of this "don't leave the group on close" behavior. It's not even possible to use the internal config since Streams strictly overrides it inside StreamsConfig. You can work around this by plugging in your own consumers via KafkaClientSupplier though that does feel a bit extreme. More importantly though, you'd still have to choose up front whether or not to leave the group on close, where you would obviously not know whether it makes sense to leave until you're actually calling close and know _why_ you're calling close (specifically, whether it's a temporary restart/bounce or "permanent" close) > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [DO NOT MERGE] KAFKA-16514: backdoor for modifying internal leave.group.on.close config [kafka]
ableegoldman opened a new pull request, #15819: URL: https://github.com/apache/kafka/pull/15819 POC for one option to fix the issue described in KAFKA-16514 This is definitely a hacky approach to introduce a backdoor solely for Kafka Streams to implement its special close semantics. But to be fair, that is exactly the same thing that happens right now, except that instead of a backdoor internal method we have a backdoor internal ConsumerConfig (ie "leave.group.on.close"). This just adds a way to modify that config so Kafka Streams operators can decide at close time whether they want to leave the group or not. For example, a Streams application may want to scale in by permanently stopping some nodes, in which case the consumers should all immediately leave the group. However it may also want to simply bounce the node, in which case it makes sense to not leave the group when closed. Both are valid scenarios but having to choose one up front when the consumer is first created due to the immutability of configs means that in practice, users are forced to guess which one they will want/need and can't decide at close time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841369#comment-17841369 ] A. Sophie Blee-Goldman commented on KAFKA-16514: The immutable internal config thing is definitely a bummer. To recap: if we want to solve this so that the current Streams API – ie the #close(closeOptions) API – works as intended, ie for non-static members as well, we'd need to change the way the consumer works. Or wait for mutable configs, which would be nice, but realistically that's not happening soon enough. To do this "right" we'd probably need to introduce a new public consumer API of some sort which would mean going through a KIP which could be a bit messy. But as a slightly-hacky alternative, would it be possible to just introduce an internal API that works similar to the effect of the existing internal config, and just have Kafka Streams use that internal API without making it a "real" API and having to do a KIP? I mean that's basically what the internal config is anyways – an internal config not exposed to/intended for use by consumer applications and only introduced for Kafka Streams to use. Doesn't seem that big a deal to just switch from this immutable config to a new internal overload of #close (or even an internal #leaveGroupOnClose API that can be toggled on/off). Thoughts? [~mjsax] [~cadonna] maybe you can raise this with someone who works on the clients to see if there are any concerns/make sure no one would object to this approach? > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1581517439 ## raft/src/main/java/org/apache/kafka/raft/internals/History.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.util.Objects; +import java.util.Optional; + +/** + * A object tracks values of {@code T} at different offsets. + */ +public interface History { +/** + * Add a new value at a given offset. + * + * The provided {@code offset} must be greater than or equal to 0 and must be greater than the + * offset of all previous calls to this method. + * + * @param offset the offset + * @param value the value to store + * @throws IllegalArgumentException if the offset is not greater that all previous offsets + */ +public void addAt(long offset, T value); + +/** + * Returns the value that has the largest offset that is less than or equals to the provided + * offset. + * + * @param offset the offset + * @return the value if it exist, otherwise {@code Optional.empty()} + */ +public Optional valueAtOrBefore(long offset); + +/** + * Returns the value with the largest offset. + * + * @return the value if it exist, otherwise {@code Optional.empty()} + */ +public Optional> lastEntry(); + +/** + * Removes all entries with an offset greater than or equal to {@code endOffset}. + * + * @param endOffset the ending offset + */ +public void truncateTo(long endOffset); + +/** + * Removes all entries but the largest entry that has an offset that is less than or equal to + * {@code startOffset}. + * + * This operation does not remove the entry with the largest offset that is less than or equal + * to {@code startOffset}. This is needed so that calls to {@code valueAtOrBefore} and + * {@code lastEntry} always return a non-empty value if a value was previously added to this + * object. + * + * @param startOffset the starting offset + */ +public void trimPrefixTo(long startOffset); Review Comment: I went with `truncateOldEntries` and `truncateNewEntries`. Thanks for the suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16465: Fix consumer sys test revocation validation [kafka]
lianetm commented on code in PR #15778: URL: https://github.com/apache/kafka/pull/15778#discussion_r1581508063 ## tests/kafkatest/tests/client/consumer_test.py: ## @@ -242,16 +242,15 @@ def test_static_consumer_bounce(self, clean_shutdown, static_membership, bounce_ self.rolling_bounce_consumers(consumer, keep_alive=num_keep_alive, num_bounces=num_bounces) num_revokes_after_bounce = consumer.num_revokes_for_alive() - num_revokes_before_bounce - -check_condition = num_revokes_after_bounce != 0 + # under static membership, the live consumer shall not revoke any current running partitions, # since there is no global rebalance being triggered. if static_membership: -check_condition = num_revokes_after_bounce == 0 - -assert check_condition, \ -"Total revoked count %d does not match the expectation of having 0 revokes as %d" % \ -(num_revokes_after_bounce, check_condition) +assert num_revokes_after_bounce == 0, \ +"Unexpected revocation triggered when bouncing static member. Expecting 0 but had %d revocations" % num_revokes_after_bounce +elif consumer.is_eager(): +assert num_revokes_after_bounce != 0, \ Review Comment: Done. I also created the [Jira](https://issues.apache.org/jira/browse/KAFKA-16628) assigned to me to add a test to cover all the non-eager paths that we know we don't have yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16628) Add system test for validating static consumer bounce and assignment when not eager
[ https://issues.apache.org/jira/browse/KAFKA-16628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans reassigned KAFKA-16628: -- Assignee: Lianet Magrans > Add system test for validating static consumer bounce and assignment when not > eager > --- > > Key: KAFKA-16628 > URL: https://issues.apache.org/jira/browse/KAFKA-16628 > Project: Kafka > Issue Type: Task > Components: consumer, system tests >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > > Existing system > [test|https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/tests/kafkatest/tests/client/consumer_test.py#L209] > include a test for validating that partitions are not re-assigned when a > static member is bounced, but the test design and setup is intended for > testing this for the Eager assignment strategy only (based on the eager > protocol where all dynamic members revoke their partitions when a rebalance > happens). > We should considering adding a test that would ensure that partitions are not > re-assigned when using the cooperative sticky assignor or the new consumer > group protocol assignments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16628) Add system test for validating static consumer bounce and assignment when not eager
Lianet Magrans created KAFKA-16628: -- Summary: Add system test for validating static consumer bounce and assignment when not eager Key: KAFKA-16628 URL: https://issues.apache.org/jira/browse/KAFKA-16628 Project: Kafka Issue Type: Task Components: consumer, system tests Reporter: Lianet Magrans Existing system [test|https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/tests/kafkatest/tests/client/consumer_test.py#L209] include a test for validating that partitions are not re-assigned when a static member is bounced, but the test design and setup is intended for testing this for the Eager assignment strategy only (based on the eager protocol where all dynamic members revoke their partitions when a rebalance happens). We should considering adding a test that would ensure that partitions are not re-assigned when using the cooperative sticky assignor or the new consumer group protocol assignments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-16528) Reset member heartbeat interval when request sent
[ https://issues.apache.org/jira/browse/KAFKA-16528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans closed KAFKA-16528. -- > Reset member heartbeat interval when request sent > - > > Key: KAFKA-16528 > URL: https://issues.apache.org/jira/browse/KAFKA-16528 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > Member should reset the heartbeat timer when the request is sent, rather than > when a response is received. This aims to ensure that we don't add-up to > interval any delay there might be in a response. With this, we better respect > the contract of members sending HB on the interval to remain in the group, > and avoid potential unwanted rebalances. > Note that there is already a logic in place to avoid sending a request if a > response hasn't been received. So that will ensure that, even with the reset > of the interval on the send, the next HB will only be sent as when the > response is received. (Will be sent out on the next poll of the HB manager, > and respecting the minimal backoff for sending consecutive requests). This > will btw be consistent with how the interval timing & in-flights is handled > for auto-commits. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1581484869 ## raft/src/main/java/org/apache/kafka/raft/internals/History.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.util.Objects; +import java.util.Optional; + +/** + * A object tracks values of {@code T} at different offsets. + */ +public interface History { Review Comment: Do you mind if we wait to see if we need that flexibility? I ask because if we really want to make it generic, we need to support both `Comparable` and `Comparator`. > The map is ordered according to the [natural ordering](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/lang/Comparable.html) of its keys, or by a [Comparator](https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/Comparator.html) typically provided at sorted map creation time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add replayRecords to CoordinatorResult [kafka]
dajac commented on PR #15818: URL: https://github.com/apache/kafka/pull/15818#issuecomment-2080046398 @dongnuo123 Thanks for the patch. Could you also check if we have other cases like this one: https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java#L531? We need to ensure that we use replayRecords. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1581480096 ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.VotersRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.feature.SupportedVersionRange; + +/** + * A type for representing the set of voters for a topic partition. + * + * It encapsulates static information like a voter's endpoint and their supported kraft.version. + * + * It providees functionality for converting to and from {@code VotersRecord} and for converting + * from the static configuration. + */ +final public class VoterSet { +private final Map voters; + +VoterSet(Map voters) { +if (voters.isEmpty()) { +throw new IllegalArgumentException("Voters cannot be empty"); +} + +this.voters = voters; +} + +/** + * Returns the socket address for a given voter at a given listener. + * + * @param voter the id of the voter + * @param listener the name of the listener + * @return the socket address if it exist, otherwise {@code Optional.empty()} + */ +public Optional voterAddress(int voter, String listener) { +return Optional.ofNullable(voters.get(voter)) +.flatMap(voterNode -> voterNode.address(listener)); +} + +/** + * Returns all of the voter ids. + */ +public Set voterIds() { +return voters.keySet(); +} + +/** + * Adds a voter to the voter set. + * + * This object is immutable. A new voter set is returned if the voter was added. + * + * A new voter can be added to a voter set if its id doesn't already exist in the voter set. + * + * @param voter the new voter to add + * @return a new voter set if the voter was added, otherwise {@code Optional.empty()} + */ +public Optional addVoter(VoterNode voter) { +if (voters.containsKey(voter.id())) { +return Optional.empty(); +} + +HashMap newVoters = new HashMap<>(voters); +newVoters.put(voter.id(), voter); + +return Optional.of(new VoterSet(newVoters)); +} + +/** + * Removew a voter from the voter set. + * + * This object is immutable. A new voter set is returned if the voter was removed. + * + * A voter can be removed from the voter set if its id and uuid match. + * + * @param voterId the voter id + * @param voterUuid the voter uuid + * @return a new voter set if the voter was remove, otherwise {@code Optional.empty()} + */ +public Optional removeVoter(int voterId, Optional voterUuid) { +VoterNode oldVoter = voters.get(voterId); +if (oldVoter != null && Objects.equals(oldVoter.uuid(), voterUuid)) { +HashMap newVoters = new HashMap<>(voters); +newVoters.remove(voterId); + +return Optional.of(new VoterSet(newVoters)); +} + +return Optional.empty(); +} + +/** + * Converts a voter set to a voters record for a given version. + * + * @param version the version of the voters record + */ +public VotersRecord toVotersRecord(short version) { +return new VotersRecord() +.setVersion(version) +.setVoters( +voters +.values() +.stream() +.map(voter -> { +Iterator endpoints = voter +.listeners() +.entrySet() +.stream() +.map(entry -> +new VotersRecord.Endpoint() +
Re: [PR] KAFKA-16565: IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned [kafka]
kirktrue commented on code in PR #15737: URL: https://github.com/apache/kafka/pull/15737#discussion_r1581441497 ## tests/kafkatest/services/verifiable_consumer.py: ## @@ -140,22 +150,32 @@ class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler): def __init__(self, node, verify_offsets, idx): super().__init__(node, verify_offsets, idx) -def handle_partitions_revoked(self, event): +def handle_partitions_revoked(self, event, node, logger): self.revoked_count += 1 self.state = ConsumerState.Rebalancing self.position = {} +revoked = [] + for topic_partition in event["partitions"]: -topic = topic_partition["topic"] -partition = topic_partition["partition"] -self.assignment.remove(TopicPartition(topic, partition)) +tp = _create_partition_from_dict(topic_partition) -def handle_partitions_assigned(self, event): +if tp in self.assignment: +self.assignment.remove(tp) +revoked.append(tp) +else: +logger.warn("Could not remove topic partition %s from assignment as it was not previously assigned to %s" % (tp, node.account.hostname)) Review Comment: Filed KAFKA-16623, FYI. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16609) Update parse_describe_topic to support new topic describe output
[ https://issues.apache.org/jira/browse/KAFKA-16609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True resolved KAFKA-16609. --- Reviewer: Lucas Brutschy Resolution: Fixed > Update parse_describe_topic to support new topic describe output > > > Key: KAFKA-16609 > URL: https://issues.apache.org/jira/browse/KAFKA-16609 > Project: Kafka > Issue Type: Bug > Components: admin, system tests >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: system-test-failure > Fix For: 3.8.0 > > > It appears that recent changes to the describe topic output has broken the > system test's ability to parse the output. > {noformat} > test_id: > kafkatest.tests.core.reassign_partitions_test.ReassignPartitionsTest.test_reassign_partitions.bounce_brokers=False.reassign_from_offset_zero=True.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 50.333 seconds > IndexError('list index out of range') > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/reassign_partitions_test.py", > line 175, in test_reassign_partitions > self.run_produce_consume_validate(core_test_action=lambda: > self.reassign_partitions(bounce_brokers)) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/produce_consume_validate.py", > line 105, in run_produce_consume_validate > core_test_action(*args) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/reassign_partitions_test.py", > line 175, in > self.run_produce_consume_validate(core_test_action=lambda: > self.reassign_partitions(bounce_brokers)) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/core/reassign_partitions_test.py", > line 82, in reassign_partitions > partition_info = > self.kafka.parse_describe_topic(self.kafka.describe_topic(self.topic)) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/kafka/kafka.py", > line 1400, in parse_describe_topic > fields = list(map(lambda x: x.split(" ")[1], fields)) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/kafka/kafka.py", > line 1400, in > fields = list(map(lambda x: x.split(" ")[1], fields)) > IndexError: list index out of range > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1581410429 ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.VotersRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.feature.SupportedVersionRange; + +/** + * A type for representing the set of voters for a topic partition. + * + * It encapsulates static information like a voter's endpoint and their supported kraft.version. + * + * It providees functionality for converting to and from {@code VotersRecord} and for converting + * from the static configuration. + */ +final public class VoterSet { +private final Map voters; + +VoterSet(Map voters) { +if (voters.isEmpty()) { +throw new IllegalArgumentException("Voters cannot be empty"); +} + +this.voters = voters; +} + +/** + * Returns the socket address for a given voter at a given listener. + * + * @param voter the id of the voter + * @param listener the name of the listener + * @return the socket address if it exist, otherwise {@code Optional.empty()} + */ +public Optional voterAddress(int voter, String listener) { +return Optional.ofNullable(voters.get(voter)) +.flatMap(voterNode -> voterNode.address(listener)); +} + +/** + * Returns all of the voter ids. + */ +public Set voterIds() { +return voters.keySet(); +} + +/** + * Adds a voter to the voter set. + * + * This object is immutable. A new voter set is returned if the voter was added. + * + * A new voter can be added to a voter set if its id doesn't already exist in the voter set. + * + * @param voter the new voter to add + * @return a new voter set if the voter was added, otherwise {@code Optional.empty()} + */ +public Optional addVoter(VoterNode voter) { +if (voters.containsKey(voter.id())) { +return Optional.empty(); +} + +HashMap newVoters = new HashMap<>(voters); +newVoters.put(voter.id(), voter); + +return Optional.of(new VoterSet(newVoters)); +} + +/** + * Removew a voter from the voter set. + * + * This object is immutable. A new voter set is returned if the voter was removed. + * + * A voter can be removed from the voter set if its id and uuid match. + * + * @param voterId the voter id + * @param voterUuid the voter uuid + * @return a new voter set if the voter was remove, otherwise {@code Optional.empty()} + */ +public Optional removeVoter(int voterId, Optional voterUuid) { +VoterNode oldVoter = voters.get(voterId); +if (oldVoter != null && Objects.equals(oldVoter.uuid(), voterUuid)) { +HashMap newVoters = new HashMap<>(voters); +newVoters.remove(voterId); + +return Optional.of(new VoterSet(newVoters)); +} + +return Optional.empty(); +} + +/** + * Converts a voter set to a voters record for a given version. + * + * @param version the version of the voters record + */ +public VotersRecord toVotersRecord(short version) { +return new VotersRecord() +.setVersion(version) +.setVoters( +voters +.values() +.stream() +.map(voter -> { +Iterator endpoints = voter +.listeners() Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To
Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]
nizhikov commented on PR #15645: URL: https://github.com/apache/kafka/pull/15645#issuecomment-2079970499 @chia7712 CI looks good to me. Do you have any other comments? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [Minor] Add replayRecords to CoordinatorResult [kafka]
dongnuo123 opened a new pull request, #15818: URL: https://github.com/apache/kafka/pull/15818 The patch adds a boolean attribute `replayRecords` that specifies whether the records should be replayed. The default value is `appendFuture == null` so no change is needed for the existing code. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1581410429 ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.VotersRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.feature.SupportedVersionRange; + +/** + * A type for representing the set of voters for a topic partition. + * + * It encapsulates static information like a voter's endpoint and their supported kraft.version. + * + * It providees functionality for converting to and from {@code VotersRecord} and for converting + * from the static configuration. + */ +final public class VoterSet { +private final Map voters; + +VoterSet(Map voters) { +if (voters.isEmpty()) { +throw new IllegalArgumentException("Voters cannot be empty"); +} + +this.voters = voters; +} + +/** + * Returns the socket address for a given voter at a given listener. + * + * @param voter the id of the voter + * @param listener the name of the listener + * @return the socket address if it exist, otherwise {@code Optional.empty()} + */ +public Optional voterAddress(int voter, String listener) { +return Optional.ofNullable(voters.get(voter)) +.flatMap(voterNode -> voterNode.address(listener)); +} + +/** + * Returns all of the voter ids. + */ +public Set voterIds() { +return voters.keySet(); +} + +/** + * Adds a voter to the voter set. + * + * This object is immutable. A new voter set is returned if the voter was added. + * + * A new voter can be added to a voter set if its id doesn't already exist in the voter set. + * + * @param voter the new voter to add + * @return a new voter set if the voter was added, otherwise {@code Optional.empty()} + */ +public Optional addVoter(VoterNode voter) { +if (voters.containsKey(voter.id())) { +return Optional.empty(); +} + +HashMap newVoters = new HashMap<>(voters); +newVoters.put(voter.id(), voter); + +return Optional.of(new VoterSet(newVoters)); +} + +/** + * Removew a voter from the voter set. + * + * This object is immutable. A new voter set is returned if the voter was removed. + * + * A voter can be removed from the voter set if its id and uuid match. + * + * @param voterId the voter id + * @param voterUuid the voter uuid + * @return a new voter set if the voter was remove, otherwise {@code Optional.empty()} + */ +public Optional removeVoter(int voterId, Optional voterUuid) { +VoterNode oldVoter = voters.get(voterId); +if (oldVoter != null && Objects.equals(oldVoter.uuid(), voterUuid)) { +HashMap newVoters = new HashMap<>(voters); +newVoters.remove(voterId); + +return Optional.of(new VoterSet(newVoters)); +} + +return Optional.empty(); +} + +/** + * Converts a voter set to a voters record for a given version. + * + * @param version the version of the voters record + */ +public VotersRecord toVotersRecord(short version) { +return new VotersRecord() +.setVersion(version) +.setVoters( +voters +.values() +.stream() +.map(voter -> { +Iterator endpoints = voter +.listeners() Review Comment: Fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To
Re: [PR] KAFKA-16211: Inconsistent config values in CreateTopicsResult and DescribeConfigsResult [kafka]
infantlikesprogramming commented on PR #15696: URL: https://github.com/apache/kafka/pull/15696#issuecomment-2079935542 @chia7712 Thank you! The cluster that I deployed in Kraft (locally) consists of 3 controllers and 4 brokers, using standard configurations from the config/kraft folder. There is a slight modification to the value of log.segment.bytes of the 4th broker to 286870912. Would you like me to include the files for my test cluster and test file, or would like me to add a new test in the test class for the function? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1581398905 ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.VotersRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.feature.SupportedVersionRange; + +/** + * A type for representing the set of voters for a topic partition. + * + * It encapsulates static information like a voter's endpoint and their supported kraft.version. + * + * It providees functionality for converting to and from {@code VotersRecord} and for converting + * from the static configuration. + */ +final public class VoterSet { +private final Map voters; + +VoterSet(Map voters) { +if (voters.isEmpty()) { +throw new IllegalArgumentException("Voters cannot be empty"); +} + +this.voters = voters; +} + +/** + * Returns the socket address for a given voter at a given listener. + * + * @param voter the id of the voter + * @param listener the name of the listener + * @return the socket address if it exist, otherwise {@code Optional.empty()} + */ +public Optional voterAddress(int voter, String listener) { +return Optional.ofNullable(voters.get(voter)) +.flatMap(voterNode -> voterNode.address(listener)); +} + +/** + * Returns all of the voter ids. + */ +public Set voterIds() { +return voters.keySet(); +} + +/** + * Adds a voter to the voter set. + * + * This object is immutable. A new voter set is returned if the voter was added. + * + * A new voter can be added to a voter set if its id doesn't already exist in the voter set. + * + * @param voter the new voter to add + * @return a new voter set if the voter was added, otherwise {@code Optional.empty()} + */ +public Optional addVoter(VoterNode voter) { +if (voters.containsKey(voter.id())) { +return Optional.empty(); +} + +HashMap newVoters = new HashMap<>(voters); +newVoters.put(voter.id(), voter); + +return Optional.of(new VoterSet(newVoters)); +} + +/** + * Removew a voter from the voter set. + * + * This object is immutable. A new voter set is returned if the voter was removed. + * + * A voter can be removed from the voter set if its id and uuid match. + * + * @param voterId the voter id Review Comment: I have this wording in the description "A voter can be removed from the voter set if its id and directory id match." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1581392367 ## raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java: ## @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.VotersRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.feature.SupportedVersionRange; + +/** + * A type for representing the set of voters for a topic partition. + * + * It encapsulates static information like a voter's endpoint and their supported kraft.version. + * + * It providees functionality for converting to and from {@code VotersRecord} and for converting + * from the static configuration. + */ +final public class VoterSet { +private final Map voters; + +VoterSet(Map voters) { +if (voters.isEmpty()) { +throw new IllegalArgumentException("Voters cannot be empty"); +} + +this.voters = voters; +} + +/** + * Returns the socket address for a given voter at a given listener. + * + * @param voter the id of the voter + * @param listener the name of the listener + * @return the socket address if it exist, otherwise {@code Optional.empty()} + */ +public Optional voterAddress(int voter, String listener) { +return Optional.ofNullable(voters.get(voter)) +.flatMap(voterNode -> voterNode.address(listener)); +} + +/** + * Returns all of the voter ids. + */ +public Set voterIds() { +return voters.keySet(); +} + +/** + * Adds a voter to the voter set. + * + * This object is immutable. A new voter set is returned if the voter was added. + * + * A new voter can be added to a voter set if its id doesn't already exist in the voter set. + * + * @param voter the new voter to add + * @return a new voter set if the voter was added, otherwise {@code Optional.empty()} + */ +public Optional addVoter(VoterNode voter) { +if (voters.containsKey(voter.id())) { +return Optional.empty(); Review Comment: We don't have to. The important invariant for `VoterSet` is that voter ids are unique. The kraft leader will not allow a replica to join the `VoterSet` if its replica id already exists in the voters set. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1581378400 ## raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java: ## @@ -261,7 +260,7 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) { * @param snapshotId the end offset and epoch that identifies the snapshot * @return a writable snapshot if it doesn't already exist */ -Optional storeSnapshot(OffsetAndEpoch snapshotId); +Optional createNewSnapshotUnchecked(OffsetAndEpoch snapshotId); Review Comment: Fix the comments for this method and `createNewSnapshot`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16584) Make log processing summary configurable or debug
[ https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841332#comment-17841332 ] Matthias J. Sax commented on KAFKA-16584: - There is an issue with creating new account: https://issues.apache.org/jira/browse/INFRA-25451 – we are waiting for infra team to resolve it. Unfortunately, we cannot do anything about it. The only thing I can offer is, if you prepare a KIP using some other tool (eg google doc or similar) and share it with me, and I can c it in the wiki for you. > Make log processing summary configurable or debug > - > > Key: KAFKA-16584 > URL: https://issues.apache.org/jira/browse/KAFKA-16584 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.2 >Reporter: Andras Hatvani >Assignee: dujian >Priority: Major > Labels: needs-kip, newbie > > Currently *every two minutes for every stream thread* statistics will be > logged on INFO log level. > {code} > 2024-04-18T09:18:23.790+02:00 INFO 33178 --- [service] [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 > total records, ran 0 punctuators, and committed 0 total tasks since the last > update {code} > This is absolutely unnecessary and even harmful since it fills the logs and > thus storage space with unwanted and useless data. Otherwise the INFO logs > are useful and helpful, therefore it's not an option to raise the log level > to WARN. > Please make the logProcessingSummary > * either to a DEBUG level log or > * make it configurable so that it can be disabled. > This is the relevant code: > https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841137#comment-17841137 ] Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 6:04 PM: if after the consumer reaches 1 and the 1st checkpoint is emitted, MM2 restarts before the other 1 messages are produced, then bug https://issues.apache.org/jira/browse/KAFKA-15905 hits and we end up with just two checkpoints, at 1 and 2. but the problem here is that if the consumer never fully catches up once, we will never have a checkpoint. If the {color:#00}OffsetSyncStore.{color}{color:#871094}offsetSyncs {color}contained a distribution of {color:#00}OffsetSync rather than just multiple copies of the last {color}{color:#00}OffsetSync , Checkpoints would be computed before {color} {color:#00} {color} was (Author: ecomar): if after the consumer reaches 1 and the 1st checkpoint is emitted, MM2 restarts before the other 1 messages are produced, then bug https://issues.apache.org/jira/browse/KAFKA-15905 hits and we end up with just two checkpoints, at 1 and 2. but the problem here is that if the consumer never fully cathces up once, we will never have a checkpoint > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1581343739 ## core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala: ## @@ -324,15 +323,11 @@ class DumpLogSegmentsTest { val lastContainedLogTimestamp = 1 TestUtils.resource( - RecordsSnapshotWriter.createWithHeader( -() => metadataLog.createNewSnapshot(new OffsetAndEpoch(0, 0)), -1024, -MemoryPool.NONE, -new MockTime, -lastContainedLogTimestamp, -CompressionType.NONE, -MetadataRecordSerde.INSTANCE, - ).get() + new RecordsSnapshotWriter.Builder() Review Comment: Yes. It found a small bug in the dump log tool. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1581332843 ## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ## @@ -1341,29 +1340,25 @@ class KafkaConfigTest { } @Test - def testValidQuorumVotersConfig(): Unit = { -val expected = new util.HashMap[Integer, AddressSpec]() + def testValidQuorumVotersParsing(): Unit = { +val expected = new util.HashMap[Integer, InetSocketAddress]() Review Comment: Done. I split this into 3 tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1581322854 ## core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala: ## @@ -87,9 +85,12 @@ class RaftClusterSnapshotTest { // Check that we can read the entire snapshot while (snapshot.hasNext) { -val batch = snapshot.next() +val batch = snapshot.next assertTrue(batch.sizeInBytes > 0) -assertNotEquals(Collections.emptyList(), batch.records()) +assertTrue( + batch.records.isEmpty != batch.controlRecords.isEmpty, Review Comment: Yeah. I thought I was being creative when I discovered that invariant . I wrote a comment and changed it to use `assertNotEquals`. Let me know if that helps. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1581315479 ## clients/src/test/java/org/apache/kafka/common/record/ControlRecordTypeTest.java: ## @@ -45,4 +45,58 @@ public void testParseUnknownVersion() { assertEquals(ControlRecordType.ABORT, type); } +@Test +public void testLeaderChange() { +ByteBuffer buffer = ByteBuffer.allocate(32); +buffer.putShort(ControlRecordType.CURRENT_CONTROL_RECORD_KEY_VERSION); +buffer.putShort((short) 2); +buffer.flip(); + +ControlRecordType type = ControlRecordType.parse(buffer); +assertEquals(ControlRecordType.LEADER_CHANGE, type); +} + +@Test +public void testSnapshotHeader() { +ByteBuffer buffer = ByteBuffer.allocate(32); +buffer.putShort(ControlRecordType.CURRENT_CONTROL_RECORD_KEY_VERSION); +buffer.putShort((short) 3); +buffer.flip(); + +ControlRecordType type = ControlRecordType.parse(buffer); +assertEquals(ControlRecordType.SNAPSHOT_HEADER, type); +} + +@Test +public void testSnapshotFooter() { Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16465: Fix consumer sys test revocation validation [kafka]
lucasbru commented on code in PR #15778: URL: https://github.com/apache/kafka/pull/15778#discussion_r1581306529 ## tests/kafkatest/tests/client/consumer_test.py: ## @@ -242,16 +242,15 @@ def test_static_consumer_bounce(self, clean_shutdown, static_membership, bounce_ self.rolling_bounce_consumers(consumer, keep_alive=num_keep_alive, num_bounces=num_bounces) num_revokes_after_bounce = consumer.num_revokes_for_alive() - num_revokes_before_bounce - -check_condition = num_revokes_after_bounce != 0 + # under static membership, the live consumer shall not revoke any current running partitions, # since there is no global rebalance being triggered. if static_membership: -check_condition = num_revokes_after_bounce == 0 - -assert check_condition, \ -"Total revoked count %d does not match the expectation of having 0 revokes as %d" % \ -(num_revokes_after_bounce, check_condition) +assert num_revokes_after_bounce == 0, \ +"Unexpected revocation triggered when bouncing static member. Expecting 0 but had %d revocations" % num_revokes_after_bounce +elif consumer.is_eager(): +assert num_revokes_after_bounce != 0, \ Review Comment: Yes, I think that's a good plan. Can you update this PR to rename the test and create a separate JIRA for the new test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16511: Fix the leaking tiered segments during segment deletions [kafka]
kamalcph opened a new pull request, #15817: URL: https://github.com/apache/kafka/pull/15817 When there are overlapping segments in the remote storage, then the deletion may fail to remove the segments due to `isRemoteSegmentWithinLeaderEpochs` check. Once the deletion starts to fail for a partition, then segments won't be eligible for cleanup. The one workaround that we have is to move the log-start-offset using the kafka-delete-records script. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15342) Considering upgrading to Mockito 5.4.1 or later
[ https://issues.apache.org/jira/browse/KAFKA-15342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841294#comment-17841294 ] Chia-Ping Tsai commented on KAFKA-15342: not sure the benefit of having unit test and integration test. Our CI always run all tests regardless of fast failure. The difference of JVM setting is `maxHeapSize` (2g v.s 2560m), but I don't think that is something important. Also, we often neglect the tag in testing. Maybe we should just get rid of the category :_ > Considering upgrading to Mockito 5.4.1 or later > --- > > Key: KAFKA-15342 > URL: https://issues.apache.org/jira/browse/KAFKA-15342 > Project: Kafka > Issue Type: Task > Components: unit tests >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Blocker > Fix For: 4.0.0 > > > We're currently stuck on Mockito 4.x.y because the 5.x.y line requires Java > 11 and, until we begin to work on Kafka 4.0.0, we continue to support Java 8. > Either directly before, or after releasing Kafka 4.0.0, we should try to > upgrade to a version of Mockito on the 5.x.y line. > If we're able to use a version that includes > [https://github.com/mockito/mockito/pull/3078|https://github.com/mockito/mockito/pull/3078,] > (which should be included in either a 5.4.1 or 5.5.0 release), we should > also revert the change made for > https://issues.apache.org/jira/browse/KAFKA-14682, which is just a temporary > workaround. Care should be taken that, after reverting that change, unused > stubbings are still correctly reported during our CI builds. > If the effort required to upgrade our Mockito version is too high, we can > either downgrade the severity of this ticket, or split it out into separate > subtasks for each to-be-upgraded module. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [Minor] Fix the flaky testConsumerGroupHeartbeatWithStableClassicGroup by sorting the topic partition list [kafka]
dongnuo123 opened a new pull request, #15816: URL: https://github.com/apache/kafka/pull/15816 We are seeing flaky test in `testConsumerGroupHeartbeatWithStableClassicGroup` where the error is caused by the different ordering in the expected and actual values. The patch sorts the topic partition list in the records to fix the issue. https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15698/11/tests/ ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1581234346 ## clients/src/main/resources/common/message/VotersRecord.json: ## @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "type": "data", + "name": "VotersRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "Version", "type": "int16", "versions": "0+", + "about": "The version of the voters record" }, +{ "name": "Voters", "type": "[]Voter", "versions": "0+", "fields": [ + { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId", +"about": "The replica id of the voter in the topic partition" }, + { "name": "VoterUuid", "type": "uuid", "versions": "0+", +"about": "The directory id of the voter in the topic partition" }, + { "name": "Endpoints", "type": "[]Endpoint", "versions": "0+", +"about": "The endpoint that can be used to communicate with the voter", "fields": [ +{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true, + "about": "The name of the endpoint" }, +{ "name": "Host", "type": "string", "versions": "0+", + "about": "The hostname" }, +{ "name": "Port", "type": "uint16", "versions": "0+", Review Comment: Yeah. @junrao and I had a discussion about this in the mailing list. The original design had the security protocol. I agreed to remove it because it is not strictly required since the local replica can look up the security protocol from its local configuration. As you point out it would be useful to have this to catch misconfiguration between the replicas. What do you think @junrao should we add security protocol to all of the endpoint descriptions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1580159998 ## clients/src/main/resources/common/message/VotersRecord.json: ## @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "type": "data", + "name": "VotersRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "Version", "type": "int16", "versions": "0+", + "about": "The version of the voters record" }, +{ "name": "Voters", "type": "[]Voter", "versions": "0+", "fields": [ + { "name": "VoterId", "type": "int32", "versions": "0+", "entityType": "brokerId", +"about": "The replica id of the voter in the topic partition" }, + { "name": "VoterUuid", "type": "uuid", "versions": "0+", Review Comment: Sounds good to me. I made these changes to the KIP: https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=217391519=94=93 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16465: Fix consumer sys test revocation validation [kafka]
lianetm commented on code in PR #15778: URL: https://github.com/apache/kafka/pull/15778#discussion_r1581203195 ## tests/kafkatest/tests/client/consumer_test.py: ## @@ -242,16 +242,15 @@ def test_static_consumer_bounce(self, clean_shutdown, static_membership, bounce_ self.rolling_bounce_consumers(consumer, keep_alive=num_keep_alive, num_bounces=num_bounces) num_revokes_after_bounce = consumer.num_revokes_for_alive() - num_revokes_before_bounce - -check_condition = num_revokes_after_bounce != 0 + # under static membership, the live consumer shall not revoke any current running partitions, # since there is no global rebalance being triggered. if static_membership: -check_condition = num_revokes_after_bounce == 0 - -assert check_condition, \ -"Total revoked count %d does not match the expectation of having 0 revokes as %d" % \ -(num_revokes_after_bounce, check_condition) +assert num_revokes_after_bounce == 0, \ +"Unexpected revocation triggered when bouncing static member. Expecting 0 but had %d revocations" % num_revokes_after_bounce +elif consumer.is_eager(): +assert num_revokes_after_bounce != 0, \ Review Comment: > So maybe we should instead try to get the set of partitions and check that it didn't change? Doable, but that wouldn't truly ensure the static membership behaviour either I guess, because the test is intentionally leaving 1 member alive that could be the leader or not. So the assignment would remain the same regardless of the static membership under CONSUMER (or Cooperative) if the single partition belongs to the live member (single partition that I guess was intentionally decided to be able to easily check the delivery semantics after the bounces) So as I see it this test is specifically crafted to validate the stickiness that static membership brings into the `RangeAssignor` ( nicely explained in [this section](https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java#L58-L78) of the `RangeAssignor` class doc btw). We're trying to apply it to the CONSUMER protocol but finding not much value given the purpose of the test and the shape it has (bounce n-1 nodes, check at least 1 revocation if dynamic, none if static, regardless of partition ownership). Listening to myself telling you this makes me reconsider if we should just not run this test for the new protocol, as it was truly never intended or run for CooperativeAssignor? (I would probably rename it to something like `test_eager_stickiness_on_static_consumer_bounce` , make the use of RangeAssignor explicit, and then it looks clearer that we don't want to run such test on Cooper ativeAssignor or CONSUMER protocol. We do have other tests that ensure that static membership behaves as expected for the new protocol (ex. `test_fencing_static_consumer`), but agreed that the "owned partition not re-assigned for a static member that is bounced" is not covered in sys tests (not for CONSUMER, not for Cooperative either). We could think of a new test to cover that. The shape would be different I expect, because it would either have to rely on bouncing a member with assignment while having others with none, or ensuring that all members have at least 1 partition. It would also need a different, more complex delivery semantics validation, if any. I would just suggest a different Jira/PR for a new test, to be able to finalize migrating the current sys tests that apply to the new protocol. Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]
rreddy-22 commented on code in PR #15785: URL: https://github.com/apache/kafka/pull/15785#discussion_r1581140616 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -966,6 +987,23 @@ private static void maybeUpdateSubscribedTopicNames( } } +/** + * Updates the subscription type, iff necessary. + * + * If all members are subscribed to the same set of topics, the type is homogeneous. + * Otherwise, it is heterogeneous. + */ +private void maybeUpdateGroupSubscriptionType() { Review Comment: hmm theoretically yes, but what if it stays homo/hetero even after the update? maybe it doesn't matter cause the variable is reassigned -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]
rreddy-22 commented on code in PR #15785: URL: https://github.com/apache/kafka/pull/15785#discussion_r1581134610 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -620,9 +637,10 @@ public Map computeSubscriptionMetadata( TopicsImage topicsImage, ClusterImage clusterImage ) { -// Copy and update the current subscriptions. +// Copy and update the current subscription information. Map subscribedTopicNames = new HashMap<>(this.subscribedTopicNames); maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, newMember); +maybeUpdateGroupSubscriptionType(); Review Comment: o that makes sense, I wasn't sure what the reasoning was, okie I'll remove it, thanks for explaining -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]
rreddy-22 commented on code in PR #15785: URL: https://github.com/apache/kafka/pull/15785#discussion_r1581132398 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -204,7 +198,7 @@ private void createAssignmentSpec() { } } -this.assignmentSpec = new AssignmentSpec(members); +this.assignmentSpec = new AssignmentSpec(members, subscriptionType); Review Comment: hmm this is weird lol, I could've sworn I removed em all -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-9800: Exponential backoff for Kafka clients - KIP-580 [kafka]
msn-tldr commented on code in PR #14111: URL: https://github.com/apache/kafka/pull/14111#discussion_r1581106940 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java: ## @@ -384,6 +389,32 @@ int attempts() { return attempts.get(); } +/* + * Returns whether the leader Node has changed since the last attempt. + * @param node The Node currently thought of as the leader, which might be null. + * @return true if the leader has changed, otherwise false + */ +boolean hasLeaderChanged(Node latestLeader) { +boolean leaderChanged = false; +if (latestLeader != null) { +// If we don't know the leader yet, we have not yet attempted to send to the leader +if (currentLeader == null) { +currentLeader = latestLeader; +} else { +// If the leader's node id has changed, this counts as a leader change +if (currentLeader.id() != latestLeader.id()) { Review Comment: The change was done along with KIP-951 changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14892) Harmonize package names in storage module
[ https://issues.apache.org/jira/browse/KAFKA-14892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841254#comment-17841254 ] Giuseppe Calabrese commented on KAFKA-14892: Hi, I'm new in ASF. I wonder if this ticket could be assigned to me. Please let me know. Thanks > Harmonize package names in storage module > - > > Key: KAFKA-14892 > URL: https://issues.apache.org/jira/browse/KAFKA-14892 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Priority: Major > Fix For: 3.8.0 > > > We currently have: > # org.apache.kafka.server.log.remote.storage: public api in storage-api > module > # org.apache.kafka.server.log.remote: private api in storage module > # org.apache.kafka.storage.internals.log: private api in storage module > A way to make this consistent could be: > # org.apache.kafka.storage.* or org.apache.kafka.storage.api.*: public api > in storage-api module > # org.apache.kafka.storage.internals.log.remote: private api in storage > module > # org.apache.kafka.storage.internals.log: private api in storage module > (stays the same) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16589: Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close [kafka]
chia7712 commented on PR #15815: URL: https://github.com/apache/kafka/pull/15815#issuecomment-2079444105 We are doing the migration so can this PR get pending before we see more use cases? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16611) Consider adding test name to "client.id" of Admin in testing
[ https://issues.apache.org/jira/browse/KAFKA-16611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16611: -- Assignee: PoAn Yang (was: Chia-Ping Tsai) > Consider adding test name to "client.id" of Admin in testing > > > Key: KAFKA-16611 > URL: https://issues.apache.org/jira/browse/KAFKA-16611 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Minor > > I observed following errors many times. > {quote} > org.opentest4j.AssertionFailedError: Found 16 unexpected threads during > @BeforeAll: `kafka-admin-client-thread | > adminclient-287,kafka-admin-client-thread | > adminclient-276,kafka-admin-client-thread | > adminclient-271,kafka-admin-client-thread | > adminclient-293,kafka-admin-client-thread | > adminclient-281,kafka-admin-client-thread | > adminclient-302,kafka-admin-client-thread | > adminclient-334,kafka-admin-client-thread | > adminclient-323,kafka-admin-client-thread | > adminclient-257,kafka-admin-client-thread | > adminclient-336,kafka-admin-client-thread | > adminclient-308,kafka-admin-client-thread | > adminclient-263,kafka-admin-client-thread | > adminclient-273,kafka-admin-client-thread | > adminclient-278,kafka-admin-client-thread | > adminclient-283,kafka-admin-client-thread | adminclient-317` ==> expected: > but was: > {quote} > That could be caused by exceptional shutdown. Or we do have resource leaks in > some failed tests. Adding the test name to "client.id" can give hints about > that -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15838) [Connect] ExtractField and InsertField NULL Values are replaced by default value even in NULLABLE fields
[ https://issues.apache.org/jira/browse/KAFKA-15838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-15838: --- Component/s: connect > [Connect] ExtractField and InsertField NULL Values are replaced by default > value even in NULLABLE fields > > > Key: KAFKA-15838 > URL: https://issues.apache.org/jira/browse/KAFKA-15838 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Eric Pangiawan >Assignee: Mario Fiore Vitale >Priority: Major > > ExtractField: Line 116-119 > [https://github.com/a0x8o/kafka/blob/master/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java#L61-L68] > InsertField: Line 163 - 195 > [https://github.com/a0x8o/kafka/blob/master/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L163-L195] > h1. Expect: > Value `null` is valid for an optional filed, even though the field has a > default value. > Only when field is required, the class return default value fallback when > value is `null`. > h1. Actual: > Always return default value if `null` was given. > h1. Example: > PostgreSQL DDL: > {code:java} > CREATE TABLE products( > id varchar(255), > color varchar(255), > quantity float8 > ); > -- Set Default > ALTER TABLE products ALTER COLUMN quantity SET DEFAULT 1.0; {code} > Insert A Record: > {code:java} > INSERT INTO public.products VALUES('1', 'Blue', null); {code} > Table Select *: > {code:java} > id | color | quantity > +---+-- > 1 | Blue | {code} > Debezium Behavior when using ExtractField and InsertField class (in the event > flattening SMT): > {code:java} > { > "id":"1", > "color":"Blue", > "quantity":1.0, > "__op":"c", > "__ts_ms":1698127432079, > "__source_ts_ms":1698127431679, > "__db":"testing_db", > "__schema":"public", > "__table":"products", > "__lsn":24470112, > "__txId":957, > "__snapshot":null, > "__deleted":"false" > } {code} > The debezium code can be found > [here|https://github.com/debezium/debezium/blob/2.4/debezium-core/src/main/java/io/debezium/transforms/ExtractNewRecordState.java#L116-L119] > h1. Expected Output: > {code:java} > { > "id":"1", > "color":"Blue", > "quantity":null, > "__op":"c", > "__ts_ms":1698127432079, > "__source_ts_ms":1698127431679, > "__db":"testing_db", > "__schema":"public", > "__table":"products", > "__lsn":24470112, > "__txId":957, > "__snapshot":null, > "__deleted":"false" > }{code} > h1. Temporary Solution: > use getWithoutDefault() into ExtractField and InsertField instead of get() -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16317: Add event process rate in GroupCoordinatorRuntimeMetrics [kafka]
dajac commented on PR #15448: URL: https://github.com/apache/kafka/pull/15448#issuecomment-2079403659 I think that we won't add this one in the end. Closing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: KIP-848 Uniform Assignor Bugs [kafka]
dajac commented on PR #15286: URL: https://github.com/apache/kafka/pull/15286#issuecomment-2079402203 @rreddy-22 Do we still need this one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16317: Add event process rate in GroupCoordinatorRuntimeMetrics [kafka]
dajac closed pull request #15448: KAFKA-16317: Add event process rate in GroupCoordinatorRuntimeMetrics URL: https://github.com/apache/kafka/pull/15448 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP848- Add JMH Benchmarks for Client And Server Side Assignors [kafka]
dajac commented on PR #15329: URL: https://github.com/apache/kafka/pull/15329#issuecomment-2079402641 I think that we don't need this anymore. Closing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP848- Add JMH Benchmarks for Client And Server Side Assignors [kafka]
dajac closed pull request #15329: KIP848- Add JMH Benchmarks for Client And Server Side Assignors URL: https://github.com/apache/kafka/pull/15329 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]
dajac closed pull request #15268: [Draft] Join, Sync, Heartbeat during Migration URL: https://github.com/apache/kafka/pull/15268 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Draft] Join, Sync, Heartbeat during Migration [kafka]
dajac commented on PR #15268: URL: https://github.com/apache/kafka/pull/15268#issuecomment-2079401308 I think that we don't need this anymore. Closing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Refactor SSL/SASL admin integration tests to not use a custom authorizer [kafka]
tinaselenge commented on code in PR #15377: URL: https://github.com/apache/kafka/pull/15377#discussion_r1581029496 ## core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala: ## @@ -259,4 +275,22 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { assertTrue(metrics.nonEmpty, s"Unable to find metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}") metrics.map(_.asInstanceOf[Gauge[Int]].value).sum } + + override def createAdminClient: Admin = { Review Comment: So I initially thought the `createConfig()` in BaseAdminIntegrationTest is creating different security properties by calling `TestUtils.adminClientSecurityConfigs()` than the security properties created as part of the set up in IntegrationTestHarness. However following all the properties `adminClientSecurityConfigs()` set, they are exactly the same except the `certAlias`. So basically BaseAdminIntegrationTest mostly just setting security properties twice for no good reason. This seems to upset SSL admin test, because of the mismatch in the certAlias. Removing the unnecessary call to `adminClientSecurityConfigs()` in BaseAdminIntegrationTest worked for all of the tests. Thanks for pointing this out! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Refactor SSL/SASL admin integration tests to not use a custom authorizer [kafka]
tinaselenge commented on code in PR #15377: URL: https://github.com/apache/kafka/pull/15377#discussion_r1581029496 ## core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala: ## @@ -259,4 +275,22 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { assertTrue(metrics.nonEmpty, s"Unable to find metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}") metrics.map(_.asInstanceOf[Gauge[Int]].value).sum } + + override def createAdminClient: Admin = { Review Comment: So I initially thought the `createConfig()` in BaseAdminIntegrationTest is creating different security properties by calling `TestUtils.adminClientSecurityConfigs()` than the security properties created as part of the set up in IntegrationTestHarness. However following all the properties `adminClientSecurityConfigs()` set, they are exactly the same except the `certAlias`. So basically BaseAdminIntegrationTest mostly just setting security properties twice for no good reason. This seems to upset SSL admin test, because of the mismatch in the certAlias. Removing the unnecessary call to `adminClientSecurityConfigs()` in BaseAdminIntegrationTest worked for all of the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in raft [kafka]
mimaison merged PR #15805: URL: https://github.com/apache/kafka/pull/15805 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dongnuo123 commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1581026465 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -1122,4 +1122,27 @@ private OUT handleOperationException( return handler.apply(apiError.error(), apiError.message()); } } + +/** + * Creates the JoinGroupResponseData according to the error type. + * + * @param memberId The member id. + * @param error The error. + * @return The JoinGroupResponseData. + */ +private static JoinGroupResponseData createJoinGroupResponseData( +String memberId, +Errors error +) { +switch (error) { +case MEMBER_ID_REQUIRED: Review Comment: Ah yeah you're right. Let's keep it as is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dajac commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1580929744 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1413,6 +1506,243 @@ private CoordinatorResult consumerGr return new CoordinatorResult<>(records, response); } +/** + * Handle a JoinGroupRequest to a ConsumerGroup. + * + * @param group The group to join. + * @param contextThe request context. + * @param requestThe actual JoinGroup request. + * @param responseFuture The join group response future. + * + * @return The result that contains records to append if the join group phase completes. + */ +private CoordinatorResult classicGroupJoinToConsumerGroup( +ConsumerGroup group, +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) throws ApiException { +final long currentTimeMs = time.milliseconds(); +final List records = new ArrayList<>(); +final String groupId = request.groupId(); +String memberId = request.memberId(); +final String instanceId = request.groupInstanceId(); +final JoinGroupRequestProtocolCollection protocols = request.protocols(); +final boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); + +throwIfConsumerGroupIsFull(group, memberId); +throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols); +// TODO: need to throw an exception if group is dead? + +// Get or create the member. +if (isUnknownMember) memberId = Uuid.randomUuid().toString(); +ConsumerGroupMember member; +ConsumerGroupMember.Builder updatedMemberBuilder; +boolean staticMemberReplaced = false; +boolean newMemberCreated = false; +if (instanceId == null) { +// A dynamic member (re-)joins. +throwIfRequiresKnownMemberId(groupId, memberId, isUnknownMember, context); +newMemberCreated = !group.hasMember(memberId); +member = group.getOrMaybeCreateMember(memberId, true); +log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +} else { +member = group.staticMember(instanceId); +// A new static member joins or the existing static member rejoins. +if (isUnknownMember) { +newMemberCreated = true; +if (member == null) { +// New static member. +member = group.getOrMaybeCreateMember(memberId, true); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group.", groupId, memberId, instanceId); +} else { +// Replace the current static member. +staticMemberReplaced = true; +updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) +.setAssignedPartitions(member.assignedPartitions()); +removeMember(records, groupId, member.memberId()); +log.info("[GroupId {}] Static member with unknown member id and instance id {} re-joins the consumer group. " + +"Created a new member {} to replace the existing member {}.", groupId, instanceId, memberId, member.memberId()); +} +} else { +// Rejoining static member. Fence the static group with unmatched member id. +throwIfStaticMemberIsUnknown(member, instanceId); +throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +log.info("[GroupId {}] Static member {} with instance id {} re-joins the consumer group.", groupId, memberId, instanceId); +} +} + +int groupEpoch = group.groupEpoch(); +Map subscriptionMetadata = group.subscriptionMetadata(); +final ConsumerPartitionAssignor.Subscription subscription = deserializeSubscription(protocols); +final List ownedTopicPartitions = +validateGenerationIdAndGetOwnedPartition(member, subscription); + +// 1. Create or update the member. If the member is new or has changed, a ConsumerGroupMemberMetadataValue +// record is written to the __consumer_offsets partition to persist the change. If the subscriptions have +// changed, the subscription metadata is updated and persisted by writing a ConsumerGroupPartitionMetadataValue +// record to the __consumer_offsets partition. Finally, the group epoch is
[PR] KAFKA-16589: Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close [kafka]
FrankYang0529 opened a new pull request, #15815: URL: https://github.com/apache/kafka/pull/15815 Sometimes we close the admin created by `createAdminClient`, and sometimes we don't. That is not a true problem since the `ClusterInstance` will call `close` when stopping. However, that cause a lot of inconsistent code, and in fact it does not save much time since creating a Admin is not a hard work. We can get `bootstrapServers` and `bootstrapControllers` from `ClusterInstance` easily. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16611) Consider adding test name to "client.id" of Admin in testing
[ https://issues.apache.org/jira/browse/KAFKA-16611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841193#comment-17841193 ] PoAn Yang commented on KAFKA-16611: --- Hi [~chia7712], I'm interested in this issue. May I take it? Thank you. > Consider adding test name to "client.id" of Admin in testing > > > Key: KAFKA-16611 > URL: https://issues.apache.org/jira/browse/KAFKA-16611 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > I observed following errors many times. > {quote} > org.opentest4j.AssertionFailedError: Found 16 unexpected threads during > @BeforeAll: `kafka-admin-client-thread | > adminclient-287,kafka-admin-client-thread | > adminclient-276,kafka-admin-client-thread | > adminclient-271,kafka-admin-client-thread | > adminclient-293,kafka-admin-client-thread | > adminclient-281,kafka-admin-client-thread | > adminclient-302,kafka-admin-client-thread | > adminclient-334,kafka-admin-client-thread | > adminclient-323,kafka-admin-client-thread | > adminclient-257,kafka-admin-client-thread | > adminclient-336,kafka-admin-client-thread | > adminclient-308,kafka-admin-client-thread | > adminclient-263,kafka-admin-client-thread | > adminclient-273,kafka-admin-client-thread | > adminclient-278,kafka-admin-client-thread | > adminclient-283,kafka-admin-client-thread | adminclient-317` ==> expected: > but was: > {quote} > That could be caused by exceptional shutdown. Or we do have resource leaks in > some failed tests. Adding the test name to "client.id" can give hints about > that -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16615: JoinGroup API for upgrading ConsumerGroup [kafka]
dajac commented on code in PR #15798: URL: https://github.com/apache/kafka/pull/15798#discussion_r1580920772 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1288,25 +1396,10 @@ private CoordinatorResult consumerGr .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames)) .setClientId(clientId) .setClientHost(clientHost) +.setClassicMemberMetadata(null) .build(); -boolean bumpGroupEpoch = false; -if (!updatedMember.equals(member)) { -records.add(newMemberSubscriptionRecord(groupId, updatedMember)); - -if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) { -log.info("[GroupId {}] Member {} updated its subscribed topics to: {}.", -groupId, memberId, updatedMember.subscribedTopicNames()); -bumpGroupEpoch = true; -} - -if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) { -log.info("[GroupId {}] Member {} updated its subscribed regex to: {}.", -groupId, memberId, updatedMember.subscribedTopicRegex()); -bumpGroupEpoch = true; -} -} - +boolean bumpGroupEpoch = updateMemberSubscription(groupId, memberId, member, updatedMember, records); Review Comment: nit: The name does not really represent the intent here. How about `hasMemberChanged` or something along those lines? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -1122,4 +1122,27 @@ private OUT handleOperationException( return handler.apply(apiError.error(), apiError.message()); } } + +/** + * Creates the JoinGroupResponseData according to the error type. + * + * @param memberId The member id. + * @param error The error. + * @return The JoinGroupResponseData. + */ +private static JoinGroupResponseData createJoinGroupResponseData( +String memberId, +Errors error +) { +switch (error) { +case MEMBER_ID_REQUIRED: +case INVALID_SESSION_TIMEOUT: +return new JoinGroupResponseData() +.setMemberId(memberId) Review Comment: I actually wonder why we set the member id in the `INVALID_SESSION_TIMEOUT` case. Looking at the client code, we don't use it in the java client. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1092,6 +1097,86 @@ private void throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri } } +/** + * Validates if the received classic member protocols are supported by the group. + * + * @param group The ConsumerGroup. + * @param memberId The joining member id. + * @param protocolType The joining member protocol type. + * @param protocols The joining member protocol collection. + */ +private void throwIfClassicProtocolIsNotSupported( +ConsumerGroup group, +String memberId, +String protocolType, +JoinGroupRequestProtocolCollection protocols +) { +if (!group.supportsClassicProtocols(protocolType, ClassicGroupMember.plainProtocolSet(protocols))) { +throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + memberId + "'s protocols are not supported."); +} +} + +/** + * Deserialize the subscription in JoinGroupRequestProtocolCollection. + * All the protocols have the same subscription, so the method picks a random one. + * + * @param protocols The JoinGroupRequestProtocolCollection. + * @return The Subscription. + */ +private static ConsumerPartitionAssignor.Subscription deserializeSubscription( +JoinGroupRequestProtocolCollection protocols +) { +try { +return ConsumerProtocol.deserializeSubscription( +ByteBuffer.wrap(protocols.stream().findAny().get().metadata()) +); +} catch (SchemaException e) { +throw new IllegalStateException("Malformed embedded consumer protocol."); +} +} + +/** + * Validates the generation id and returns the owned partitions in the JoinGroupRequest to a consumer group. + * + * @param memberThe joining member. + * @param subscription The Subscription. + * @return The owned partitions if valid, otherwise return null. + */ +private List validateGenerationIdAndGetOwnedPartition( +ConsumerGroupMember member, +ConsumerPartitionAssignor.Subscription subscription +) { +List ownedPartitions = +toTopicPartitions(subscription.ownedPartitions(),
Re: [PR] KAFKA-16345: Optionally URL-encode clientID and clientSecret in authorization header [kafka]
bachmanity1 commented on PR #15475: URL: https://github.com/apache/kafka/pull/15475#issuecomment-2079216075 Hi @mimaison, would you mind taking a look at the KIP and sharing your vote on it? Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15974: Enforce that event processing respects user-provided timeout [kafka]
cadonna commented on code in PR #15640: URL: https://github.com/apache/kafka/pull/15640#discussion_r1580785461 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1169,8 +1129,7 @@ private Map beginningOrEndOffset(Collection offsetAndTimestampMap; offsetAndTimestampMap = applicationEventHandler.addAndGet( -listOffsetsEvent, -timer); +listOffsetsEvent); Review Comment: nit: Could you please move this parameter to the previous line? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -998,7 +958,7 @@ public List partitionsFor(String topic, Duration timeout) { wakeupTrigger.setActiveTask(topicMetadataEvent.future()); try { Map> topicMetadata = -applicationEventHandler.addAndGet(topicMetadataEvent, timer); +applicationEventHandler.addAndGet(topicMetadataEvent); Review Comment: The timer is only used in the event. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -946,8 +907,7 @@ public Map committed(final Set offsetsForTimes(Map beginningOrEndOffset(Collection offsetAndTimestampMap; offsetAndTimestampMap = applicationEventHandler.addAndGet( -listOffsetsEvent, -timer); +listOffsetsEvent); Review Comment: I think you do actually not need the timer in this method at all. You could pass a deadline to the event. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1026,7 +986,7 @@ public Map> listTopics(Duration timeout) { final AllTopicsMetadataEvent topicMetadataEvent = new AllTopicsMetadataEvent(timer); Review Comment: Also here the timer is only used in the event. Using a deadline would be simpler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]
appchemist commented on code in PR #15647: URL: https://github.com/apache/kafka/pull/15647#discussion_r1580504115 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ## @@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget, if (partitionData.currentLeader().leaderId() != -1 && partitionData.currentLeader().leaderEpoch() != -1) { partitionsWithUpdatedLeaderInfo.put(partition, new Metadata.LeaderIdAndEpoch( Optional.of(partitionData.currentLeader().leaderId()), Optional.of(partitionData.currentLeader().leaderEpoch(; +} else { +requestMetadataUpdate(metadata, subscriptions, partition); +subscriptions.awaitUpdate(partition); Review Comment: As another alternative, it could change the status to `AWAIT_UPDATE` in `FetchCollector.handleInitializeErrors()` only when it doesn't include leader info Upon further thought, it seems possible to differentiate based on the following conditions. ``` completedFetch.partitionData.currentLeader().leaderId() != -1 && completedFetch.partitionData.currentLeader().leaderEpoch() != -1 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841137#comment-17841137 ] Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:40 AM: - if after the consumer reaches 1 and the 1st checkpoint is emitted, MM2 restarts before the other 1 messages are produced, then bug https://issues.apache.org/jira/browse/KAFKA-15905 hits and we end up with just two checkpoints, at 1 and 2. but the problem here is that if the consumer never fully cathces up once, we will never have a checkpoint was (Author: ecomar): if after the consumer reaches 1 and the 1st checkpoint is emitted, MM2 restarts before the other 1 messages are produced, then bug https://issues.apache.org/jira/browse/KAFKA-15905 hits and we end up with just two checkpoints, at 1 and 2. > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841137#comment-17841137 ] Edoardo Comar commented on KAFKA-16622: --- if after the consumer reaches 1 and the 1st checkpoint is emitted, MM2 restarts before the other 1 messages are produced, then bug https://issues.apache.org/jira/browse/KAFKA-15905 hits and we end up with just two checkpoints, at 1 and 2. > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134 ] Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:18 AM: - Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ }} {{{color:#00}clientProps{color}.put({color:#00}ConsumerConfig{color}.{color:#871094}MAX_POLL_RECORDS_CONFIG{color}, {color:#067d17}"100"{color});}} {{{color:#0033b3}... while {color}({color:#00}System{color}.currentTimeMillis() - {color:#00}now {color}< {color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}} {{{color:#00} ConsumerRecords{color}<{color:#00}String{color}, {color:#00}String{color}> {color:#00}crs1 {color}= {color:#00}consumer{color}.poll({color:#00}Duration{color}.ofMillis({color:#1750eb}1000L{color}));}} {{{color:#00} polledCount {color}= {color:#00}polledCount {color}+ print({color:#00}crs1{color}, {color:#00}consumer{color});}} {{{color:#00} consumer{color}.commitSync({color:#00}Duration{color}.ofSeconds({color:#1750eb}10{color}));}} {{{color:#00} Thread{color}.sleep({color:#1750eb}1000L{color});}} {{}}} the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=1, {color}{color:#ff}downstreamOffset{color}{color:#00}=1, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=16700, {color}{color:#ff}downstreamOffset{color}{color:#00}=16501, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=18200, {color}{color:#ff}downstreamOffset{color}{color:#00}=18096, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=19200, {color}{color:#ff}downstreamOffset{color}{color:#00}=18965, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=19700, {color}{color:#ff}downstreamOffset{color}{color:#00}=19636, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=2, {color}{color:#ff}downstreamOffset{color}{color:#00}=2, {color}{color:#ff}metadata{color}{color:#00}=}{color} [^connect.log.2024-04-26-10.zip] was (Author: ecomar): Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ }} {{{color:#00}clientProps{color}.put({color:#00}ConsumerConfig{color}.{color:#871094}MAX_POLL_RECORDS_CONFIG{color}, {color:#067d17}"100"{color});}} {{{color:#0033b3}... while {color}({color:#00}System{color}.currentTimeMillis() - {color:#00}now {color}< {color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}} {{{color:#00} ConsumerRecords{color}<{color:#00}String{color}, {color:#00}String{color}> {color:#00}crs1 {color}=
[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134 ] Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:17 AM: - Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ }} {{{color:#0033b3}while {color}({color:#00}System{color}.currentTimeMillis() - {color:#00}now {color}< {color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}} {{{color:#00}ConsumerRecords{color}<{color:#00}String{color}, {color:#00}String{color}> {color:#00}crs1 {color}= {color:#00}consumer{color}.poll({color:#00}Duration{color}.ofMillis({color:#1750eb}1000L{color}));}} {{{color:#00}polledCount {color}= {color:#00}polledCount {color}+ print({color:#00}crs1{color}, {color:#00}consumer{color});}} {{{color:#00}consumer{color}.commitSync({color:#00}Duration{color}.ofSeconds({color:#1750eb}10{color}));}} {{{color:#00}Thread{color}.sleep({color:#1750eb}1000L{color});}} {{}}} the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=1, {color}{color:#ff}downstreamOffset{color}{color:#00}=1, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=16700, {color}{color:#ff}downstreamOffset{color}{color:#00}=16501, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=18200, {color}{color:#ff}downstreamOffset{color}{color:#00}=18096, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=19200, {color}{color:#ff}downstreamOffset{color}{color:#00}=18965, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=19700, {color}{color:#ff}downstreamOffset{color}{color:#00}=19636, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=2, {color}{color:#ff}downstreamOffset{color}{color:#00}=2, {color}{color:#ff}metadata{color}{color:#00}=}{color} [^connect.log.2024-04-26-10.zip]{}}} was (Author: ecomar): Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ }} {{}} {{{color:#0033b3}while {color}({color:#00}System{color}.currentTimeMillis() - {color:#00}now {color}< {color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}} {{{color:#00}ConsumerRecords{color}<{color:#00}String{color}, {color:#00}String{color}> {color:#00}crs1 {color}= {color:#00}consumer{color}.poll({color:#00}Duration{color}.ofMillis({color:#1750eb}1000L{color}));}} {{{color:#00}polledCount {color}= {color:#00}polledCount {color}+ print({color:#00}crs1{color}, {color:#00}consumer{color});}} {{{color:#00}consumer{color}.commitSync({color:#00}Duration{color}.ofSeconds({color:#1750eb}10{color}));}}
[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134 ] Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:17 AM: - Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ }} {{{color:#00}clientProps{color}.put({color:#00}ConsumerConfig{color}.{color:#871094}MAX_POLL_RECORDS_CONFIG{color}, {color:#067d17}"100"{color});}} {{{color:#0033b3}... while {color}({color:#00}System{color}.currentTimeMillis() - {color:#00}now {color}< {color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}} {{{color:#00} ConsumerRecords{color}<{color:#00}String{color}, {color:#00}String{color}> {color:#00}crs1 {color}= {color:#00}consumer{color}.poll({color:#00}Duration{color}.ofMillis({color:#1750eb}1000L{color}));}} {{{color:#00} polledCount {color}= {color:#00}polledCount {color}+ print({color:#00}crs1{color}, {color:#00}consumer{color});}} {{{color:#00} consumer{color}.commitSync({color:#00}Duration{color}.ofSeconds({color:#1750eb}10{color}));}} {{{color:#00} Thread{color}.sleep({color:#1750eb}1000L{color});}} {{}}} the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=1, {color}{color:#ff}downstreamOffset{color}{color:#00}=1, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=16700, {color}{color:#ff}downstreamOffset{color}{color:#00}=16501, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=18200, {color}{color:#ff}downstreamOffset{color}{color:#00}=18096, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=19200, {color}{color:#ff}downstreamOffset{color}{color:#00}=18965, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=19700, {color}{color:#ff}downstreamOffset{color}{color:#00}=19636, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=2, {color}{color:#ff}downstreamOffset{color}{color:#00}=2, {color}{color:#ff}metadata{color}{color:#00}=}{color} [^connect.log.2024-04-26-10.zip]{}}} was (Author: ecomar): Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ }} {{{color:#0033b3}while {color}({color:#00}System{color}.currentTimeMillis() - {color:#00}now {color}< {color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}} {{{color:#00}ConsumerRecords{color}<{color:#00}String{color}, {color:#00}String{color}> {color:#00}crs1 {color}= {color:#00}consumer{color}.poll({color:#00}Duration{color}.ofMillis({color:#1750eb}1000L{color}));}} {{{color:#00}polledCount {color}= {color:#00}polledCount {color}+ print({color:#00}crs1{color},
[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134 ] Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:16 AM: - Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ }} {{}} {{{color:#0033b3}while {color}({color:#00}System{color}.currentTimeMillis() - {color:#00}now {color}< {color:#1750eb}1200{color}*{color:#1750eb}1000L{color}) {}} {{{color:#00}ConsumerRecords{color}<{color:#00}String{color}, {color:#00}String{color}> {color:#00}crs1 {color}= {color:#00}consumer{color}.poll({color:#00}Duration{color}.ofMillis({color:#1750eb}1000L{color}));}} {{{color:#00}polledCount {color}= {color:#00}polledCount {color}+ print({color:#00}crs1{color}, {color:#00}consumer{color});}} {{{color:#00}consumer{color}.commitSync({color:#00}Duration{color}.ofSeconds({color:#1750eb}10{color}));}} {{{color:#00}Thread{color}.sleep({color:#1750eb}1000L{color});}} {{}}} the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=1, {color}{color:#ff}downstreamOffset{color}{color:#00}=1, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=16700, {color}{color:#ff}downstreamOffset{color}{color:#00}=16501, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=18200, {color}{color:#ff}downstreamOffset{color}{color:#00}=18096, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=19200, {color}{color:#ff}downstreamOffset{color}{color:#00}=18965, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=19700, {color}{color:#ff}downstreamOffset{color}{color:#00}=19636, {color}{color:#ff}metadata{color}{color:#00}=}{color} {color:#00}Checkpoint{{color}{color:#ff}consumerGroupId{color}{color:#00}=mygroup1, {color}{color:#ff}topicPartition{color}{color:#00}=source.mytopic-0, {color}{color:#ff}upstreamOffset{color}{color:#00}=2, {color}{color:#ff}downstreamOffset{color}{color:#00}=2, {color}{color:#ff}metadata{color}{color:#00}=}{color} [^connect.log.2024-04-26-10.zip]{}}} was (Author: ecomar): Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ }} {{{}while (...){}}}{{{}{ {}}} {{ConsumerRecords crs1 = consumer.poll(Duration.ofMillis(1000L)); }} {{// print(crs1, consumer); print last record of polled consumer.commitSync(Duration.ofSeconds(10)); }} {{Thread.sleep(1000L); }} {{}}} the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have {{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, metadata=}{}}}{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=16700,
[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134 ] Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:15 AM: - Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ }} {{{}while (...){}}}{{{}{ {}}} {{ConsumerRecords crs1 = consumer.poll(Duration.ofMillis(1000L)); }} {{// print(crs1, consumer); print last record of polled consumer.commitSync(Duration.ofSeconds(10)); }} {{Thread.sleep(1000L); }} {{}}} the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have {{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, metadata=}{}}}{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=16700, downstreamOffset=16501, metadata=}{}}}{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=18200, downstreamOffset=18096, metadata=}{}}}{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19200, downstreamOffset=18965, metadata=}{}}}{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19700, downstreamOffset=19636, metadata=}{}}}{{{}Checkpoint{}}}{{{}{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=2, downstreamOffset=2, metadata=}{}}}{{{}{}}}{{{} [^connect.log.2024-04-26-10.zip]{}}} was (Author: ecomar): Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ while (...) { ConsumerRecords crs1 = consumer.poll(Duration.ofMillis(1000L)); // print(crs1, consumer); print last record of polled consumer.commitSync(Duration.ofSeconds(10)); Thread.sleep(1000L); } }} the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have {{ Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=16700, downstreamOffset=16501, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=18200, downstreamOffset=18096, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19200, downstreamOffset=18965, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19700, downstreamOffset=19636, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=2, downstreamOffset=2, metadata=} }} [^connect.log.2024-04-26-10.zip] > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the
[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134 ] Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:13 AM: - Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. {{ while (...) { ConsumerRecords crs1 = consumer.poll(Duration.ofMillis(1000L)); // print(crs1, consumer); print last record of polled consumer.commitSync(Duration.ofSeconds(10)); Thread.sleep(1000L); } }} the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have {{ Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=16700, downstreamOffset=16501, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=18200, downstreamOffset=18096, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19200, downstreamOffset=18965, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19700, downstreamOffset=19636, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=2, downstreamOffset=2, metadata=} }} [^connect.log.2024-04-26-10.zip] was (Author: ecomar): Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. ``` {{ while (...) { ConsumerRecords crs1 = consumer.poll(Duration.ofMillis(1000L)); // print(crs1, consumer); print last record of polled consumer.commitSync(Duration.ofSeconds(10)); Thread.sleep(1000L); } }}``` the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have ``` {{Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=16700, downstreamOffset=16501, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=18200, downstreamOffset=18096, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19200, downstreamOffset=18965, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19700, downstreamOffset=19636, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=2, downstreamOffset=2, metadata=}}} ``` [^connect.log.2024-04-26-10.zip] > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh
[jira] [Comment Edited] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841134#comment-17841134 ] Edoardo Comar edited comment on KAFKA-16622 at 4/26/24 10:12 AM: - Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. ``` {{ while (...) { ConsumerRecords crs1 = consumer.poll(Duration.ofMillis(1000L)); // print(crs1, consumer); print last record of polled consumer.commitSync(Duration.ofSeconds(10)); Thread.sleep(1000L); } }}``` the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have ``` {{Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=16700, downstreamOffset=16501, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=18200, downstreamOffset=18096, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19200, downstreamOffset=18965, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19700, downstreamOffset=19636, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=2, downstreamOffset=2, metadata=}}} ``` [^connect.log.2024-04-26-10.zip] was (Author: ecomar): Hi [~gharris1727] The task did not restart. Here are the trace logs generated in the following scenario. clean start with the properties file posted above. create 'mytopic' 1 topic with 1 partition produce 10,000 messages to source cluster start a single consumer in source cluster that in a loop - polls 100 messages - commitSync - waits 1 sec e.g. ``` while (...) { ConsumerRecords crs1 = consumer.poll(Duration.ofMillis(1000L)); // print(crs1, consumer); print last record of polled consumer.commitSync(Duration.ofSeconds(10)); Thread.sleep(1000L); } ``` the first checkpoint is only emitted when the consumer catches up fully at 1. Then other 1 messages are produced quickly and the consumer advances, and some checkpoints are emitted so that overall we have ``` Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=16700, downstreamOffset=16501, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=18200, downstreamOffset=18096, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19200, downstreamOffset=18965, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19700, downstreamOffset=19636, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=2, downstreamOffset=2, metadata=} ``` [^connect.log.2024-04-26-10.zip] > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Priority: Major > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh