Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]
chia7712 commented on code in PR #15645: URL: https://github.com/apache/kafka/pull/15645#discussion_r1568266095 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin; + +import kafka.cluster.Broker; +import kafka.cluster.EndPoint; +import kafka.server.KafkaConfig; +import kafka.server.QuorumTestHarness; +import kafka.zk.AdminZkClient; +import kafka.zk.BrokerInfo; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.security.PasswordEncoder; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.config.ZooKeeperInternals; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters +public class ConfigCommandIntegrationTest extends QuorumTestHarness { Review Comment: I guess that is why you open #15729 to fix the `-parameters`. Is is possible to use `ClusterTestExtensions` to rewrite this test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1568266346 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -801,8 +803,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-future"); Review Comment: nit: The `future` string can be replaced with a variable. Also the regex can also become a variable, to make it readable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]
soarez commented on PR #15733: URL: https://github.com/apache/kafka/pull/15733#issuecomment-2060431133 Thanks @chia7712 I'll have another look at this later today -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16572) allow defining number of disks per broker in ClusterTest
[ https://issues.apache.org/jira/browse/KAFKA-16572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837974#comment-17837974 ] PoAn Yang commented on KAFKA-16572: --- Hi [~chia7712], I'm interested in this issue. May I take it? Thank you. > allow defining number of disks per broker in ClusterTest > > > Key: KAFKA-16572 > URL: https://issues.apache.org/jira/browse/KAFKA-16572 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > This is a follow-up of KAFKA-16559 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]
chia7712 commented on PR #15733: URL: https://github.com/apache/kafka/pull/15733#issuecomment-2060429559 @soarez it would be great to get your reviews before merging it :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16572) allow defining number of disks per broker in ClusterTest
[ https://issues.apache.org/jira/browse/KAFKA-16572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16572: -- Assignee: PoAn Yang (was: Chia-Ping Tsai) > allow defining number of disks per broker in ClusterTest > > > Key: KAFKA-16572 > URL: https://issues.apache.org/jira/browse/KAFKA-16572 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Minor > > This is a follow-up of KAFKA-16559 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16559) allow defining number of disks per broker in TestKitNodes
[ https://issues.apache.org/jira/browse/KAFKA-16559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16559. Fix Version/s: 3.8.0 Resolution: Fixed > allow defining number of disks per broker in TestKitNodes > - > > Key: KAFKA-16559 > URL: https://issues.apache.org/jira/browse/KAFKA-16559 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Gaurav Narula >Priority: Major > Fix For: 3.8.0 > > > from: https://github.com/apache/kafka/pull/15136#discussion_r1565571409 > That allow us to run the reassignment tests. > Also, we should enhance setNumBrokerNodes > (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/testkit/TestKitNodes.java#L81) > to accept extra argument to define the number of folders (by > setLogDirectories) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16572) allow defining number of disks per broker in ClusterTest
Chia-Ping Tsai created KAFKA-16572: -- Summary: allow defining number of disks per broker in ClusterTest Key: KAFKA-16572 URL: https://issues.apache.org/jira/browse/KAFKA-16572 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai This is a follow-up of KAFKA-16559 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16559: allow defining number of disks per broker in TestKitNodes [kafka]
chia7712 merged PR #15730: URL: https://github.com/apache/kafka/pull/15730 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1568241856 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)"); Review Comment: > We can use topicPartition, but we use similar regular expression in other place. Do we also need to update it? Thanks for pointing out these to me. I was wrong. I tried to match with topic name containing `.` or `-`, they both match correctly. That means we can use this regex directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
kamalcph commented on PR #15631: URL: https://github.com/apache/kafka/pull/15631#issuecomment-2060333550 @junrao Gentle reminder. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-759: new DSL operation on KStreams interface [kafka]
LQXshane commented on code in PR #14446: URL: https://github.com/apache/kafka/pull/14446#discussion_r1568199337 ## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ## @@ -685,6 +685,47 @@ KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper, final Named named); +/** + * Marking the {@code KStream} as partitioned signals the stream is partitioned as intended, + * and does not require further repartitioning in downstream key changing operations. + * + * + * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with interactive query(IQ) or {@link KStream#join}. + * For reasons that when repartitions happen, records are physically shuffled by a composite key defined in the stateful operation. + * However, if the repartitions were cancelled, records stayed in their original partition by its original key. IQ or joins + * assumes and uses the composite key instead of the original key. + * + * + * + * This method will overwrite a default behavior as described below. + * + * By default, Kafka Streams always automatically repartition the records to prepare for a stateful operation, + * however, it is not always required when input stream is partitioned as intended. As an example, + * if a input stream is partitioned by a String key1, calling the below function will trigger a repartition: + * {@code + * KStream inputStream = builder.stream("topic"); + * stream + * .selectKey( ... => (key1, metric)) + * .groupByKey() + * .aggregate() + * } + * + * + * + * You can then overwrite the default behavior by calling this method: + * {@code + * stream + * .selectKey( ... => (key1, metric)) + * .markAsPartitioned() + * .groupByKey() + * .aggregate() + * } + * + * + * @return a new {@code KStream} instance that will not repartition in subsequent operations: {@link KStream#selectKey(KeyValueMapper)}, {@link KStream#map(KeyValueMapper)}, {@link KStream#flatTransform(TransformerSupplier, String...)}. Review Comment: Removed for simplicity ## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ## @@ -685,6 +685,47 @@ KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper, final Named named); +/** + * Marking the {@code KStream} as partitioned signals the stream is partitioned as intended, + * and does not require further repartitioning in downstream key changing operations. + * + * + * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with interactive query(IQ) or {@link KStream#join}. + * For reasons that when repartitions happen, records are physically shuffled by a composite key defined in the stateful operation. + * However, if the repartitions were cancelled, records stayed in their original partition by its original key. IQ or joins + * assumes and uses the composite key instead of the original key. + * + * + * + * This method will overwrite a default behavior as described below. + * + * By default, Kafka Streams always automatically repartition the records to prepare for a stateful operation, + * however, it is not always required when input stream is partitioned as intended. As an example, + * if a input stream is partitioned by a String key1, calling the below function will trigger a repartition: + * {@code + * KStream inputStream = builder.stream("topic"); + * stream + * .selectKey( ... => (key1, metric)) + * .groupByKey() + * .aggregate() + * } + * + * + * + * You can then overwrite the default behavior by calling this method: + * {@code + * stream + * .selectKey( ... => (key1, metric)) + * .markAsPartitioned() + * .groupByKey() + * .aggregate() + * } + * + * + * @return a new {@code KStream} instance that will not repartition in subsequent operations: {@link KStream#selectKey(KeyValueMapper)}, {@link KStream#map(KeyValueMapper)}, {@link KStream#flatTransform(TransformerSupplier, String...)}. Review Comment: Removed for simplicity in the new PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-759: new DSL operation on KStreams interface [kafka]
LQXshane commented on code in PR #14446: URL: https://github.com/apache/kafka/pull/14446#discussion_r1568199214 ## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ## @@ -685,6 +685,47 @@ KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper, final Named named); +/** + * Marking the {@code KStream} as partitioned signals the stream is partitioned as intended, + * and does not require further repartitioning in downstream key changing operations. + * + * + * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with interactive query(IQ) or {@link KStream#join}. + * For reasons that when repartitions happen, records are physically shuffled by a composite key defined in the stateful operation. + * However, if the repartitions were cancelled, records stayed in their original partition by its original key. IQ or joins + * assumes and uses the composite key instead of the original key. + * + * + * + * This method will overwrite a default behavior as described below. + * + * By default, Kafka Streams always automatically repartition the records to prepare for a stateful operation, + * however, it is not always required when input stream is partitioned as intended. As an example, + * if a input stream is partitioned by a String key1, calling the below function will trigger a repartition: + * {@code + * KStream inputStream = builder.stream("topic"); Review Comment: This HTML tag helps the rendering from what I can tell, without them, the code indentation and whitespaces are not preserved. Here is an image of what I see in my IntelliJ with `` tag: https://github.com/apache/kafka/assets/16904776/718a2658-4c54-4a7a-acde-dd0c77140c20;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-759: new DSL operation on KStreams interface [kafka]
LQXshane commented on code in PR #14446: URL: https://github.com/apache/kafka/pull/14446#discussion_r1568198123 ## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ## @@ -685,6 +685,47 @@ KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper, final Named named); +/** + * Marking the {@code KStream} as partitioned signals the stream is partitioned as intended, + * and does not require further repartitioning in downstream key changing operations. Review Comment: Resolved in the new PR. ## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ## @@ -685,6 +685,47 @@ KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper, final Named named); +/** + * Marking the {@code KStream} as partitioned signals the stream is partitioned as intended, + * and does not require further repartitioning in downstream key changing operations. + * + * + * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with interactive query(IQ) or {@link KStream#join}. + * For reasons that when repartitions happen, records are physically shuffled by a composite key defined in the stateful operation. + * However, if the repartitions were cancelled, records stayed in their original partition by its original key. IQ or joins + * assumes and uses the composite key instead of the original key. + * Review Comment: Resolved in the new PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-759: new DSL operation on KStreams interface [kafka]
LQXshane commented on PR #14446: URL: https://github.com/apache/kafka/pull/14446#issuecomment-2060317790 @mjsax It's Shay. Taking this one up from my personal account. My personal account don't have the permission to push to the other account, is it okay to create a new PR here https://github.com/apache/kafka/pull/15740? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KIP-759 Mark as Partitioned [kafka]
LQXshane opened a new pull request, #15740: URL: https://github.com/apache/kafka/pull/15740 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-759: new DSL operation on KStreams interface [kafka]
LQXshane commented on code in PR #14446: URL: https://github.com/apache/kafka/pull/14446#discussion_r1568173605 ## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ## @@ -685,6 +685,47 @@ KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper, final Named named); +/** + * Marking the {@code KStream} as partitioned signals the stream is partitioned as intended, + * and does not require further repartitioning in downstream key changing operations. + * + * + * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with interactive query(IQ) or {@link KStream#join}. + * For reasons that when repartitions happen, records are physically shuffled by a composite key defined in the stateful operation. + * However, if the repartitions were cancelled, records stayed in their original partition by its original key. IQ or joins + * assumes and uses the composite key instead of the original key. + * Review Comment: wasn't sure as I saw the other javadocs doing it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-759: new DSL operation on KStreams interface [kafka]
LQXshane commented on code in PR #14446: URL: https://github.com/apache/kafka/pull/14446#discussion_r1568173605 ## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ## @@ -685,6 +685,47 @@ KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper, final Named named); +/** + * Marking the {@code KStream} as partitioned signals the stream is partitioned as intended, + * and does not require further repartitioning in downstream key changing operations. + * + * + * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with interactive query(IQ) or {@link KStream#join}. + * For reasons that when repartitions happen, records are physically shuffled by a composite key defined in the stateful operation. + * However, if the repartitions were cancelled, records stayed in their original partition by its original key. IQ or joins + * assumes and uses the composite key instead of the original key. + * Review Comment: wasn't sure as I saw the other javadocs doing it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lianetm closed pull request #15738: KAFKA-16566: Fix consumer static membership system test with new protocol URL: https://github.com/apache/kafka/pull/15738 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15466: Add KIP-919 support for some admin APIs [kafka]
bachmanity1 commented on PR #14399: URL: https://github.com/apache/kafka/pull/14399#issuecomment-2060259657 Hi @cmccabe Given that this PR is merged now, is it possible to dynamically update controller configs using `kafka-configs.sh`? I've tested this locally but couldn't make it work. ![image](https://github.com/apache/kafka/assets/81428651/4cd86184-bd7f-4b4f-8ad7-e57423c3e7d7) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lianetm commented on PR #15738: URL: https://github.com/apache/kafka/pull/15738#issuecomment-2060248860 FYI, I've been getting successful runs with this change: running: `TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_fencing_static_consumer" bash tests/docker/run_tests.sh` results: > > SESSION REPORT (ALL TESTS) > ducktape version: 0.11.4 > session_id: 2024-04-16--016 > run time: 27 minutes 15.117 seconds > tests run:16 > passed: 16 > flaky:0 > failed: 0 > ignored: 0 > But I still got less frequent failures after trying it several times, so I'm afraid there may be something else (maybe related to sessions remaining open between test executions happening too close to each other?). If that's the case, I'll probably be adding another change here. I'll dig into it and update here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16467) Add README to docs folder
[ https://issues.apache.org/jira/browse/KAFKA-16467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837928#comment-17837928 ] ASF GitHub Bot commented on KAFKA-16467: showuon commented on code in PR #596: URL: https://github.com/apache/kafka-site/pull/596#discussion_r1568111065 ## README.md: ## @@ -10,4 +10,32 @@ You can run it with the following command, note that it requires docker: Then you can open [localhost:8080](http://localhost:8080) on your browser and browse the documentation. -To kill the process, just type ctrl + c \ No newline at end of file +To kill the process, just type ctrl + c. + +## How to preview the latest documentation changes in Kafka repository? + +1. Generating document from kafka repository: + +```shell +# change directory into kafka repository +cd KAFKA_REPO +./gradlew clean siteDocTar +# supposing built with scala 2.13 +tar zxvf core/build/distributions/kafka_2.13-$(./gradlew properties | grep version: | awk '{print $NF}' | head -n 1)-site-docs.tgz +``` + +2. Copying the generated documents from Kafka repository into kafka-site, and preview them (note that it requires docker): + +```shell +# change directory into kafka-site repository +cd KAFKA_SITE_REPO +# copy the generated documents into dev folder +rm -rf dev +mkdir dev +# change directory into kafka repository +cp -r KAFKA_REPO/site-docs/* dev Review Comment: I don't think this comment is correct. > Add README to docs folder > - > > Key: KAFKA-16467 > URL: https://issues.apache.org/jira/browse/KAFKA-16467 > Project: Kafka > Issue Type: Improvement >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Minor > > We don't have a guide in project root folder or docs folder to show how to > run local website. It's good to provide a way to run document with kafka-site > repository. > > Option 1: Add links to wiki page > [https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes] > and > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67634793]. > Option 2: Show how to run the document within container. For example: moving > `site-docs` from kafka to kafka-site repository and run `./start-preview.sh`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1568044239 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) { void handleUncleanBrokerShutdown(int brokerId, List records) { replicationControl.handleBrokerUncleanShutdown(brokerId, records); } + +void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws InterruptedException, ExecutionException { +appendWriteEvent("partitionUpdateForMinIsrChange", OptionalLong.empty(), +() -> replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get(); +} Review Comment: hmm, I think a better way to think about it is that we want to append the min ISR config update atomically with the partition change records. Appending the partition change records once the config change is replayed is difficult to reason about and possibly incorrect. Thinking a bit more about it, triggering a write event from the `replay()` for the config change record means that every time we reload the metadata log, we would replay the config change record and generate new partition change records. Perhaps one example to look at is `ReplicationControlManager.handleBrokerFenced`. When a broker is fenced, we generate a broker registration change record along with the leaderAndIsr partition change records. I assume we want to follow a similar model with the topic configuration change events. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1568084297 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -2117,6 +2117,20 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } +ControllerResult getPartitionElrUpdatesForConfigChanges(Optional topicName) { +if (!isElrEnabled()) return ControllerResult.of(Collections.emptyList(), null); + +List records = new ArrayList<>(); +if (topicName.isPresent()) { +generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, NO_LEADER, NO_LEADER, records, + brokersToElrs.partitionsWithElr(topicsByName.get(topicName.get(; +} else { +generateLeaderAndIsrUpdates("handleMinIsrUpdate", NO_LEADER, NO_LEADER, NO_LEADER, records, +brokersToElrs.partitionsWithElr()); +} Review Comment: I haven't been following along too closely. Is my understanding correct that we would only expect to generate partition change records that clear the ELR when the min ISR config decreases? When the configured topic ISR increases, it would not be safe to include more replicas in the ELR, since they may not have the HWM. If my understanding is correct, should we have tests to verify that: 1. When the configured topic ISR decreases, we generate the expected partition change record events. 2. When the configured topic ISR increases, we do not generate any partition change record events. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lianetm commented on code in PR #15738: URL: https://github.com/apache/kafka/pull/15738#discussion_r1568061403 ## tests/kafkatest/tests/client/consumer_test.py: ## @@ -348,13 +348,32 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me consumer.start() self.await_members(consumer, len(consumer.nodes)) +num_rebalances = consumer.num_rebalances() conflict_consumer.start() -self.await_members(conflict_consumer, num_conflict_consumers) -self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers) +if group_protocol == consumer_group.classic_group_protocol: +# Classic protocol: conflicting members should join, and the intial ones with conflicting instance id should fail. +self.await_members(conflict_consumer, num_conflict_consumers) +self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers) -wait_until(lambda: len(consumer.dead_nodes()) == num_conflict_consumers, +wait_until(lambda: len(consumer.dead_nodes()) == num_conflict_consumers, timeout_sec=10, err_msg="Timed out waiting for the fenced consumers to stop") +else: +# Consumer protocol: Existing members should remain active and new conflicting ones should not be able to join. +self.await_consumed_messages(consumer) +assert num_rebalances == consumer.num_rebalances(), "Static consumers attempt to join with instance id in use should not cause a rebalance" +assert len(consumer.joined_nodes()) == len(consumer.nodes) +assert len(conflict_consumer.joined_nodes()) == 0 Review Comment: There should be no timing issues as I see it. For the `consumer.joined_nodes` there is a previous `self.await_members`, that ensures that we wait for the time needed for all the nodes to join. As for the `conflict_consumer.joined_nodes()`, its for nodes that never joined, we're just asserting that after the non-conflicting remained without rebalance, consuming (ensuring activity), the conflicting ones did not join. Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
splett2 commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1568050802 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -108,6 +103,43 @@ object StorageTool extends Logging { } } + def metadataVersionValidation(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { Review Comment: nit: `validateMetadataVersion` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]
showuon commented on code in PR #15719: URL: https://github.com/apache/kafka/pull/15719#discussion_r1568045423 ## core/src/test/scala/unit/kafka/log/LogManagerTest.scala: ## @@ -413,7 +413,7 @@ class LogManagerTest { assertEquals(numMessages * setSize / segmentBytes, log.numberOfSegments, "Check we have the expected number of segments.") // this cleanup shouldn't find any expired segments but should delete some to reduce size -time.sleep(logManager.InitialTaskDelayMs) +time.sleep(logManager.initialTaskDelayMs) assertEquals(6, log.numberOfSegments, "Now there should be exactly 6 segments") time.sleep(log.config.fileDeleteDelayMs + 1) Review Comment: > Huge thanks ! I'm a little confused about the comment above, the test in LogManagerTest itself verify that tasks like log cleanup, flush logs are triggered after sleeping initialTaskDelayMs. Yes, and currently, all of them are setting `initialTaskDelayMs=LogConfig.DEFAULT_INITIAL_TASK_DELAY_MS`. I'm thinking we could have a test and set `initialTaskDelayMs` to, let's say, 1000, and we can verify the change is adopted by verifying if we sleep only 1000ms, the log cleanup will be triggered. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16571: reassign_partitions_test.bounce_brokers should wait for messages to be sent to every partition [kafka]
jolshan commented on PR #15739: URL: https://github.com/apache/kafka/pull/15739#issuecomment-2060079351 Hmmm -- I ran the test and it is not working. I will debug -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1568044239 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) { void handleUncleanBrokerShutdown(int brokerId, List records) { replicationControl.handleBrokerUncleanShutdown(brokerId, records); } + +void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws InterruptedException, ExecutionException { +appendWriteEvent("partitionUpdateForMinIsrChange", OptionalLong.empty(), +() -> replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get(); +} Review Comment: hmm, I think a better way to think about it is that we want to append the min ISR config update atomically with the partition change records. Only appending the partition change records once the config change is replayed is difficult to reason about and possibly incorrect. Perhaps one example to look at is `ReplicationControlManager.handleBrokerFenced`. When a broker is fenced, we generate a broker registration change record along with the leaderAndIsr partition change records. I assume we want to follow a similar model with the topic configuration change events. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16571: reassign_partitions_test.bounce_brokers should wait for messages to be sent to every partition [kafka]
jolshan opened a new pull request, #15739: URL: https://github.com/apache/kafka/pull/15739 Added the check before the reassignment occurs and we start bouncing brokers. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]
kirktrue commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1568001935 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1388,6 +1393,31 @@ public void commitSync(Map offsets, Duration } } +private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean disableWakeup) { Review Comment: nit: consider changing `disableWakeup` to `enableWakeup`. Double-negatives add nonzero cognitive overhead. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1388,6 +1393,31 @@ public void commitSync(Map offsets, Duration } } +private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean disableWakeup) { +if (lastPendingAsyncCommit == null) { +return; +} + +try { +final CompletableFuture futureToAwait = new CompletableFuture<>(); +// We don't want the wake-up trigger to complete our pending async commit future, +// so create new future here. Any errors in the pending async commit will be handled +// by the async commit future / the commit callback - here, we just want to wait for it to complete. +lastPendingAsyncCommit.whenComplete((v, t) -> futureToAwait.complete(null)); +if (!disableWakeup) { +wakeupTrigger.setActiveTask(futureToAwait); +} +ConsumerUtils.getResult(futureToAwait, timer); Review Comment: Is it true that the underlying `lastPendingAsyncCommit` `Future` could already be completed by this point, right? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +final TopicPartition tp = new TopicPartition("foo", 0); +consumer.assign(Collections.singleton(tp)); +consumer.seek(tp, 20); +consumer.commitAsync(); Review Comment: Can this be replaced with a call to `testSyncCommitTimesoutAfterIncompleteAsyncCommit()` like the other tests? I glanced back and forth a couple of times and didn't see too much difference: ```suggestion final TopicPartition tp = new TopicPartition("foo", 0); testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp); ``` ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -616,6 +620,80 @@ public void testCommitSyncTriggersFencedExceptionFromCommitAsync() { assertEquals("Get fenced exception for group.instance.id groupInstanceId1", e.getMessage()); } +@Test +public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() { +time = new MockTime(1); +consumer = newConsumer(); + +// Commit async (incomplete) + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +final TopicPartition tp = new TopicPartition("foo", 0); +consumer.assign(Collections.singleton(tp)); +consumer.seek(tp, 20); +consumer.commitAsync(); + +// Commit async is not completed yet, so commit sync should wait for it to complete (time out) +assertThrows(TimeoutException.class, () -> consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100))); + +// Complete async commit event +final ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class); +verify(applicationEventHandler).add(commitEventCaptor.capture()); Review Comment: This use of JUnit is just about over my head... For my own understanding, at which line in this test does the `AsyncCommitEvent` get created and enqueued? I would assume at line 634, right? It looks like you're able to add the `ArgumentCaptor` _after_ the object pointed at by the argument was created. Is that correct? 樂 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -1005,6 +1083,43 @@ public void testNoWakeupInCloseCommit() { assertFalse(capturedEvent.get().future().isCompletedExceptionally()); } +@Test +public
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1568011957 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ClientSideAssignorBenchmark.java: ## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; +import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ClientSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +COOPERATIVE_STICKY(new CooperativeStickyAssignor()); + +private final ConsumerPartitionAssignor assignor; + +AssignorType(ConsumerPartitionAssignor assignor) { +this.assignor = assignor; +} + +public ConsumerPartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +@Param({"1000", "1"}) +private int memberCount; + +@Param({"10", "50"}) +private int partitionsPerTopicCount; + +@Param({"100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "COOPERATIVE_STICKY"}) +private AssignorType assignorType; + +@Param({"true", "false"}) +private boolean simulateRebalanceTrigger; + +private Map subscriptions = new HashMap<>(); + +private ConsumerPartitionAssignor.GroupSubscription groupSubscription; + +private static final int numberOfRacks = 3; + +private static final int replicationFactor = 2; Review Comment: For the server side tests I always used replication factor as 2, so in order to get the same distribution of racks for consistency I added the replication factor as 2. That being said I think I'll just hardcode the replication factor here as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1568009821 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +@Param({"10", "50", "100"}) +private int partitionsPerTopicCount; + +@Param({"100"}) +private int topicCount; + +@Param({"500", "1000"}) +private int memberCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"true", "false"}) +private boolean isSubscriptionUniform; + +@Param({"true", "false"}) +private boolean isRangeAssignor; + +@Param({"true", "false"}) +private boolean isReassignment; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1568009691 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +@Param({"10", "50", "100"}) +private int partitionsPerTopicCount; + +@Param({"100"}) +private int topicCount; + +@Param({"500", "1000"}) +private int memberCount; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
kirktrue commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1567992301 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -482,6 +482,15 @@ public long nextHeartbeatMs(final long currentTimeMs) { return heartbeatTimer.remainingMs(); } +public void onFailedAttempt(final long currentTimeMs) { +// Reset timer to allow sending HB after a failure without waiting for the interval. +// After a failure, a next HB may be needed with backoff (ex. errors that lead to +// retries, like coordinator load error), or immediately (ex. errors that lead to +// rejoining, like fencing errors). +heartbeatTimer.reset(0); +super.onFailedAttempt(currentTimeMs); Review Comment: Does the decision to reset the heartbeat timer depend on what _type_ of error is received? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -380,7 +380,7 @@ private void onErrorResponse(final ConsumerGroupHeartbeatResponse response, break; case UNRELEASED_INSTANCE_ID: -logger.error("GroupHeartbeatRequest failed due to the instance id {} was not released: {}", +logger.error("GroupHeartbeatRequest failed due to unreleased instance id {}: {}", Review Comment: QQ: are these logging changes of the ‘I'll just clean this up as long as I'm in here?’ variety, or dow it have some bearing on the correctness of the logs? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -231,6 +231,35 @@ public void testTimerNotDue() { assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs); } +@Test +public void testHeartbeatNotSentIfAnotherOneInFlight() { +mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + +// Heartbeat sent (no response received) +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size()); +NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); + +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a " + +"previous on in-flight"); Review Comment: Super nit-picky, sorry ```suggestion "previous one in-flight"); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
kirktrue commented on code in PR #15738: URL: https://github.com/apache/kafka/pull/15738#discussion_r156796 ## tests/kafkatest/tests/client/consumer_test.py: ## @@ -348,13 +348,32 @@ def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, me consumer.start() self.await_members(consumer, len(consumer.nodes)) +num_rebalances = consumer.num_rebalances() conflict_consumer.start() -self.await_members(conflict_consumer, num_conflict_consumers) -self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers) +if group_protocol == consumer_group.classic_group_protocol: +# Classic protocol: conflicting members should join, and the intial ones with conflicting instance id should fail. +self.await_members(conflict_consumer, num_conflict_consumers) +self.await_members(consumer, len(consumer.nodes) - num_conflict_consumers) -wait_until(lambda: len(consumer.dead_nodes()) == num_conflict_consumers, +wait_until(lambda: len(consumer.dead_nodes()) == num_conflict_consumers, timeout_sec=10, err_msg="Timed out waiting for the fenced consumers to stop") +else: +# Consumer protocol: Existing members should remain active and new conflicting ones should not be able to join. +self.await_consumed_messages(consumer) +assert num_rebalances == consumer.num_rebalances(), "Static consumers attempt to join with instance id in use should not cause a rebalance" +assert len(consumer.joined_nodes()) == len(consumer.nodes) +assert len(conflict_consumer.joined_nodes()) == 0 Review Comment: Do we anticipate any timing issues here? That is, will `num_rebalances()` and `joined_nodes()` be "guaranteed" to return the correct values immediately after the call to `await_consumed_messages()` is finished? Or do we want to wrap those assertions as `wait_until()`s to give them a few seconds to coalesce to the correct value? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16571) reassign_partitions_test.bounce_brokers should wait for messages to be sent to every partition
[ https://issues.apache.org/jira/browse/KAFKA-16571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan reassigned KAFKA-16571: -- Assignee: Justine Olshan > reassign_partitions_test.bounce_brokers should wait for messages to be sent > to every partition > -- > > Key: KAFKA-16571 > URL: https://issues.apache.org/jira/browse/KAFKA-16571 > Project: Kafka > Issue Type: Bug >Reporter: David Mao >Assignee: Justine Olshan >Priority: Major > > This particular system test tries to bounce brokers while produce is ongoing. > The test also has rf=3 and min.isr=3 configured, so if any brokers are > bounced before records are produced to every partition, it is possible to run > into OutOfOrderSequence exceptions similar to what is described in > https://issues.apache.org/jira/browse/KAFKA-14359 > When running the produce_consume_validate for the reassign_partitions_test, > instead of waiting for 5 acked messages, we should wait for messages to be > acked on the full set of partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful
[ https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-16570: --- Description: When we want to fence a producer using the admin client, we send an InitProducerId request. There is logic in that API to fence (and abort) any ongoing transactions and that is what the API relies on to fence the producer. However, this handling also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because we want to actually get a new producer ID and want to retry until the the ID is supplied or we time out. [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] [https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322] In the case of fence producer, we don't retry and instead we have no handling for concurrent transactions and log a message about an unexpected error. [https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] This is not unexpected though and the operation was successful. We should just swallow this error and treat this as a successful run of the command. was: When we want to fence a producer using the admin client, we send an InitProducerId request. There is logic in that API to fence (and abort) any ongoing transactions and that is what the API relies on to fence the producer. However, this handling also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because we want to actually get a new producer ID and want to retry until the the ID is supplied or we time out. [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] In the case of fence producer, we don't retry and instead we have no handling for concurrent transactions and log a message about an unexpected error. [https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] This is not unexpected though and the operation was successful. We should just swallow this error and treat this as a successful run of the command. > FenceProducers API returns "unexpected error" when successful > - > > Key: KAFKA-16570 > URL: https://issues.apache.org/jira/browse/KAFKA-16570 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > When we want to fence a producer using the admin client, we send an > InitProducerId request. > There is logic in that API to fence (and abort) any ongoing transactions and > that is what the API relies on to fence the producer. However, this handling > also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because > we want to actually get a new producer ID and want to retry until the the ID > is supplied or we time out. > [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] > > [https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322] > > In the case of fence producer, we don't retry and instead we have no handling > for concurrent transactions and log a message about an unexpected error. > [https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] > > This is not unexpected though and the operation was successful. We should > just swallow this error and treat this as a successful run of the command. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful
[ https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837902#comment-17837902 ] Justine Olshan commented on KAFKA-16570: cc: [~ChrisEgerton] [~showuon] [~tombentley] as author and reviewers of the original change. > FenceProducers API returns "unexpected error" when successful > - > > Key: KAFKA-16570 > URL: https://issues.apache.org/jira/browse/KAFKA-16570 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > When we want to fence a producer using the admin client, we send an > InitProducerId request. > There is logic in that API to fence (and abort) any ongoing transactions and > that is what the API relies on to fence the producer. However, this handling > also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because > we want to actually get a new producer ID and want to retry until the the ID > is supplied or we time out. > [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] > > [https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322] > > In the case of fence producer, we don't retry and instead we have no handling > for concurrent transactions and log a message about an unexpected error. > [https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] > > This is not unexpected though and the operation was successful. We should > just swallow this error and treat this as a successful run of the command. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16571) reassign_partitions_test.bounce_brokers should wait for messages to be sent to every partition
David Mao created KAFKA-16571: - Summary: reassign_partitions_test.bounce_brokers should wait for messages to be sent to every partition Key: KAFKA-16571 URL: https://issues.apache.org/jira/browse/KAFKA-16571 Project: Kafka Issue Type: Bug Reporter: David Mao This particular system test tries to bounce brokers while produce is ongoing. The test also has rf=3 and min.isr=3 configured, so if any brokers are bounced before records are produced to every partition, it is possible to run into OutOfOrderSequence exceptions similar to what is described in https://issues.apache.org/jira/browse/KAFKA-14359 When running the produce_consume_validate for the reassign_partitions_test, instead of waiting for 5 acked messages, we should wait for messages to be acked on the full set of partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful
Justine Olshan created KAFKA-16570: -- Summary: FenceProducers API returns "unexpected error" when successful Key: KAFKA-16570 URL: https://issues.apache.org/jira/browse/KAFKA-16570 Project: Kafka Issue Type: Bug Reporter: Justine Olshan Assignee: Justine Olshan When we want to fence a producer using the admin client, we send an InitProducerId request. There is logic in that API to fence (and abort) any ongoing transactions and that is what the API relies on to fence the producer. However, this handling also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because we want to actually get a new producer ID and want to retry until the the ID is supplied or we time out. [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] In the case of fence producer, we don't retry and instead we have no handling for concurrent transactions and log a message about an unexpected error. [https://github.com/confluentinc/ce-kafka/blob/b626db8bd94fe971adef3551518761a7be7de454/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] This is not unexpected though and the operation was successful. We should just swallow this error and treat this as a successful run of the command. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16569) Target Assignment Format Change
Ritika Reddy created KAFKA-16569: Summary: Target Assignment Format Change Key: KAFKA-16569 URL: https://issues.apache.org/jira/browse/KAFKA-16569 Project: Kafka Issue Type: Sub-task Reporter: Ritika Reddy Assignee: Ritika Reddy Currently the assignment is stored as Map>, we want to change it to a list -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16568) Add JMH Benchmarks for assignor performance testing
Ritika Reddy created KAFKA-16568: Summary: Add JMH Benchmarks for assignor performance testing Key: KAFKA-16568 URL: https://issues.apache.org/jira/browse/KAFKA-16568 Project: Kafka Issue Type: Sub-task Reporter: Ritika Reddy Assignee: Ritika Reddy The 3 benchmarks that are being used to test the performance and efficiency of the consumer group rebalance process. * Client Assignors (assign method) * Server Assignors (assign method) * Target Assignment Builder (build method) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16511) Leaking tiered segments
[ https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837892#comment-17837892 ] Francois Visconte commented on KAFKA-16511: --- Here is for example one of the segments and partitions where I had the issue. Remaining segment leaked on tiered storage that was never deleted {code} partition: 12, offset: 429045, value: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=9481FsNMTqiXLLzIBY03lA}, startOffset=2965469, endOffset=2968297, brokerId=10045, maxTimestampMs=1712010650848, eventTimestampMs=1712018260978, segmentLeaderEpochs={7=2965469}, segmentSizeInBytes=179013013, customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED} partition: 12, offset: 429049, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=9481FsNMTqiXLLzIBY03lA}, customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED, eventTimestampMs=1712018266033, brokerId=10045} {code} Last events in the remote log metadata for this partition: {code} partition: 12, offset: 427434, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=5_3z-dusQEeSDqNC_E5ifQ}, customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED, eventTimestampMs=1712010391903, brokerId=10041} partition: 12, offset: 427680, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=58Ht5YJ8SaW3JaI_lDEpsA}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712012524972, brokerId=10041} partition: 12, offset: 427681, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=58Ht5YJ8SaW3JaI_lDEpsA}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712012525109, brokerId=10041} partition: 12, offset: 428017, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=s4RCtworR-2Na8PGY6h2nQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712014486037, brokerId=10045} partition: 12, offset: 428018, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=s4RCtworR-2Na8PGY6h2nQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712014486168, brokerId=10045} partition: 12, offset: 428399, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=UWhaLz-GTjqPcVKL8uZOQQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712016766308, brokerId=10045} partition: 12, offset: 428400, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=UWhaLz-GTjqPcVKL8uZOQQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712016766397, brokerId=10045} partition: 12, offset: 429045, value: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=9481FsNMTqiXLLzIBY03lA}, startOffset=2965469, endOffset=2968297, brokerId=10045, maxTimestampMs=1712010650848, eventTimestampMs=1712018260978, segmentLeaderEpochs={7=2965469}, segmentSizeInBytes=179013013, customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED} partition: 12, offset: 429049, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=9481FsNMTqiXLLzIBY03lA}, customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED, eventTimestampMs=1712018266033, brokerId=10045} partition: 12, offset: 429265, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=TM44MmIqSKSBPmXMzjAvxA}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712018873277, brokerId=10045} partition: 12, offset: 429266, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=TM44MmIqSKSBPmXMzjAvxA}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712018873411, brokerId=10045} partition: 12, offset: 429602, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=djqwO5JeS9y2vRua-FBO3A}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712020973535, brokerId=10045} partition: 12, offset: 429603, value:
[jira] (KAFKA-16511) Leaking tiered segments
[ https://issues.apache.org/jira/browse/KAFKA-16511 ] Francois Visconte deleted comment on KAFKA-16511: --- was (Author: JIRAUSER288982): >The issue might be due to the overlapping remote log segments after a new >leader gets elected during rolling restart. Would you please upload the past >10 segments remote-log-segment metadata events for >5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765 partition? Thanks! Here is {code:java} partition: 27, offset: 400504, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=v8QykWvQQryMxNoS1aHMCQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712045518393, brokerId=10015} partition: 27, offset: 400505, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=v8QykWvQQryMxNoS1aHMCQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712045518494, brokerId=10015} partition: 27, offset: 400828, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=6K-bSNgxSnOypIrjl9OiGA}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712047438507, brokerId=10015} partition: 27, offset: 400829, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=6K-bSNgxSnOypIrjl9OiGA}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712047438609, brokerId=10015} partition: 27, offset: 401145, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=ErPfaCVhRiS6EKx7RnL1iQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712049418775, brokerId=10015} partition: 27, offset: 401146, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=ErPfaCVhRiS6EKx7RnL1iQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712049418894, brokerId=10015} partition: 27, offset: 401458, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=2yAT3R6tS3umrvU8iuMVfw}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712051340571, brokerId=10015} partition: 27, offset: 401459, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=2yAT3R6tS3umrvU8iuMVfw}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712051340719, brokerId=10015} partition: 27, offset: 401758, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=KLtWI-SWS7eBML87LX63SA}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712053320735, brokerId=10015} partition: 27, offset: 401759, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=KLtWI-SWS7eBML87LX63SA}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712053320838, brokerId=10015} partition: 27, offset: 539496, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=8dP13VDYSaiFlubl9SNBTQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712909568625, brokerId=10015} partition: 27, offset: 539497, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=8dP13VDYSaiFlubl9SNBTQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712909568846, brokerId=10015} {code} > Leaking tiered segments > --- > > Key: KAFKA-16511 > URL: https://issues.apache.org/jira/browse/KAFKA-16511 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.7.0 >Reporter: Francois Visconte >Assignee: Kamal Chandraprakash >Priority: Major > Labels: tiered-storage > > I have some topics there were not written since a few days (having 12h > retention) where some data remains on tiered storage (in our case S3) and > they are never deleted. > > Looking at the log history, it appears that we never even tried to delete > these segments: > When looking at one of the non-leaking segment, I get the following > interesting messages: > > {code:java} > "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 >
[jira] (KAFKA-16511) Leaking tiered segments
[ https://issues.apache.org/jira/browse/KAFKA-16511 ] Francois Visconte deleted comment on KAFKA-16511: --- was (Author: JIRAUSER288982): Here is for example on one of the partition where I have the issue: {code} _zIcSPWGzlqLjUTFfw}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712036488994, brokerId=10041} partition: 12, offset: 432066, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=YTIk_zIcSPWGzlqLjUTFfw}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712036489094, brokerId=10041} partition: 12, offset: 432319, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=8mQ_JJLfTQmkP80MYcN6Ig}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712038114012, brokerId=10041} partition: 12, offset: 432320, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=8mQ_JJLfTQmkP80MYcN6Ig}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712038114110, brokerId=10041} partition: 12, offset: 432593, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=oqr_-KDrSIGiGrcLQZhzvA}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712039805803, brokerId=10041} partition: 12, offset: 432594, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=oqr_-KDrSIGiGrcLQZhzvA}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712039805901, brokerId=10041} partition: 12, offset: 432914, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=OSou-FH9S4ioH5LHYUxPMg}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712041682856, brokerId=10041} partition: 12, offset: 432915, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=OSou-FH9S4ioH5LHYUxPMg}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712041683023, brokerId=10041} partition: 12, offset: 433251, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=uSPj4yS5RIO1LuBHecl7iQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712043678337, brokerId=10041} partition: 12, offset: 433252, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=uSPj4yS5RIO1LuBHecl7iQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712043678474, brokerId=10041} partition: 12, offset: 433577, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=pcCt8vc-TUyRhHOGUJBHXg}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712045569214, brokerId=10041} partition: 12, offset: 433578, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=pcCt8vc-TUyRhHOGUJBHXg}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712045569333, brokerId=10041} partition: 12, offset: 433904, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=LFqoPyg0SOiHbscnV3Ahtw}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712047491475, brokerId=10041} partition: 12, offset: 433905, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=LFqoPyg0SOiHbscnV3Ahtw}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712047491576, brokerId=10041} partition: 12, offset: 434229, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=SSyS_hbXSEKD5rgbu1S8ug}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712049443915, brokerId=10041} partition: 12, offset: 434230, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=SSyS_hbXSEKD5rgbu1S8ug}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712049444048, brokerId=10041} partition: 12, offset: 434584, value:
[jira] [Commented] (KAFKA-16511) Leaking tiered segments
[ https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837891#comment-17837891 ] Francois Visconte commented on KAFKA-16511: --- >The issue might be due to the overlapping remote log segments after a new >leader gets elected during rolling restart. Would you please upload the past >10 segments remote-log-segment metadata events for >5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765 partition? Thanks! Here is {code:java} partition: 27, offset: 400504, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=v8QykWvQQryMxNoS1aHMCQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712045518393, brokerId=10015} partition: 27, offset: 400505, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=v8QykWvQQryMxNoS1aHMCQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712045518494, brokerId=10015} partition: 27, offset: 400828, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=6K-bSNgxSnOypIrjl9OiGA}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712047438507, brokerId=10015} partition: 27, offset: 400829, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=6K-bSNgxSnOypIrjl9OiGA}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712047438609, brokerId=10015} partition: 27, offset: 401145, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=ErPfaCVhRiS6EKx7RnL1iQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712049418775, brokerId=10015} partition: 27, offset: 401146, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=ErPfaCVhRiS6EKx7RnL1iQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712049418894, brokerId=10015} partition: 27, offset: 401458, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=2yAT3R6tS3umrvU8iuMVfw}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712051340571, brokerId=10015} partition: 27, offset: 401459, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=2yAT3R6tS3umrvU8iuMVfw}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712051340719, brokerId=10015} partition: 27, offset: 401758, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=KLtWI-SWS7eBML87LX63SA}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712053320735, brokerId=10015} partition: 27, offset: 401759, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=KLtWI-SWS7eBML87LX63SA}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712053320838, brokerId=10015} partition: 27, offset: 539496, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=8dP13VDYSaiFlubl9SNBTQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712909568625, brokerId=10015} partition: 27, offset: 539497, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=8dP13VDYSaiFlubl9SNBTQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712909568846, brokerId=10015} {code} > Leaking tiered segments > --- > > Key: KAFKA-16511 > URL: https://issues.apache.org/jira/browse/KAFKA-16511 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.7.0 >Reporter: Francois Visconte >Assignee: Kamal Chandraprakash >Priority: Major > Labels: tiered-storage > > I have some topics there were not written since a few days (having 12h > retention) where some data remains on tiered storage (in our case S3) and > they are never deleted. > > Looking at the log history, it appears that we never even tried to delete > these segments: > When looking at one of the non-leaking segment, I get the following > interesting messages: > > {code:java} >
[jira] [Assigned] (KAFKA-16567) Add New Stream Metrics based on KIP-869
[ https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walter Hernandez reassigned KAFKA-16567: Assignee: Walter Hernandez > Add New Stream Metrics based on KIP-869 > --- > > Key: KAFKA-16567 > URL: https://issues.apache.org/jira/browse/KAFKA-16567 > Project: Kafka > Issue Type: Task >Reporter: Walter Hernandez >Assignee: Walter Hernandez >Priority: Blocker > Fix For: 4.0.0 > > > Add the following metrics to the state updater: > * restoring-active-tasks: count > * restoring-standby-tasks: count > * paused-active-tasks: count > * paused-standby-tasks: count > * idle-ratio: percentage > * restore-ratio: percentage > * checkpoint-ratio: percentage > * restore-records-total: count > * restore-records-rate: rate > * restore-call-rate: rate -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16567) Add New Stream Metrics based on KIP-869
Walter Hernandez created KAFKA-16567: Summary: Add New Stream Metrics based on KIP-869 Key: KAFKA-16567 URL: https://issues.apache.org/jira/browse/KAFKA-16567 Project: Kafka Issue Type: Task Reporter: Walter Hernandez Fix For: 4.0.0 Add the following metrics to the state updater: * restoring-active-tasks: count * restoring-standby-tasks: count * paused-active-tasks: count * paused-standby-tasks: count * idle-ratio: percentage * restore-ratio: percentage * checkpoint-ratio: percentage * restore-records-total: count * restore-records-rate: rate * restore-call-rate: rate -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lianetm commented on PR #15738: URL: https://github.com/apache/kafka/pull/15738#issuecomment-2059888686 Hey @lucasbru, could you take a look at this test fix? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16336) Remove Deprecated metric standby-process-ratio
[ https://issues.apache.org/jira/browse/KAFKA-16336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walter Hernandez reassigned KAFKA-16336: Assignee: Walter Hernandez > Remove Deprecated metric standby-process-ratio > -- > > Key: KAFKA-16336 > URL: https://issues.apache.org/jira/browse/KAFKA-16336 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Walter Hernandez >Priority: Blocker > Fix For: 4.0.0 > > > Metric "standby-process-ratio" was deprecated in 3.5 release via > https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16336) Remove Deprecated metric standby-process-ratio
[ https://issues.apache.org/jira/browse/KAFKA-16336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walter Hernandez reassigned KAFKA-16336: Assignee: (was: Walter Hernandez) > Remove Deprecated metric standby-process-ratio > -- > > Key: KAFKA-16336 > URL: https://issues.apache.org/jira/browse/KAFKA-16336 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > > Metric "standby-process-ratio" was deprecated in 3.5 release via > https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16511) Leaking tiered segments
[ https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837867#comment-17837867 ] Francois Visconte commented on KAFKA-16511: --- Here is for example on one of the partition where I have the issue: {code} _zIcSPWGzlqLjUTFfw}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712036488994, brokerId=10041} partition: 12, offset: 432066, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=YTIk_zIcSPWGzlqLjUTFfw}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712036489094, brokerId=10041} partition: 12, offset: 432319, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=8mQ_JJLfTQmkP80MYcN6Ig}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712038114012, brokerId=10041} partition: 12, offset: 432320, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=8mQ_JJLfTQmkP80MYcN6Ig}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712038114110, brokerId=10041} partition: 12, offset: 432593, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=oqr_-KDrSIGiGrcLQZhzvA}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712039805803, brokerId=10041} partition: 12, offset: 432594, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=oqr_-KDrSIGiGrcLQZhzvA}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712039805901, brokerId=10041} partition: 12, offset: 432914, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=OSou-FH9S4ioH5LHYUxPMg}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712041682856, brokerId=10041} partition: 12, offset: 432915, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=OSou-FH9S4ioH5LHYUxPMg}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712041683023, brokerId=10041} partition: 12, offset: 433251, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=uSPj4yS5RIO1LuBHecl7iQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712043678337, brokerId=10041} partition: 12, offset: 433252, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=uSPj4yS5RIO1LuBHecl7iQ}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712043678474, brokerId=10041} partition: 12, offset: 433577, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=pcCt8vc-TUyRhHOGUJBHXg}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712045569214, brokerId=10041} partition: 12, offset: 433578, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=pcCt8vc-TUyRhHOGUJBHXg}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712045569333, brokerId=10041} partition: 12, offset: 433904, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=LFqoPyg0SOiHbscnV3Ahtw}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712047491475, brokerId=10041} partition: 12, offset: 433905, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=LFqoPyg0SOiHbscnV3Ahtw}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712047491576, brokerId=10041} partition: 12, offset: 434229, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=SSyS_hbXSEKD5rgbu1S8ug}, customMetadata=Optional.empty, state=DELETE_SEGMENT_STARTED, eventTimestampMs=1712049443915, brokerId=10041} partition: 12, offset: 434230, value: RemoteLogSegmentMetadataUpdate{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic2-774, id=SSyS_hbXSEKD5rgbu1S8ug}, customMetadata=Optional.empty, state=DELETE_SEGMENT_FINISHED, eventTimestampMs=1712049444048, brokerId=10041} partition: 12, offset: 434584, value:
Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]
chia7712 commented on code in PR #15719: URL: https://github.com/apache/kafka/pull/15719#discussion_r1567891428 ## core/src/main/java/kafka/server/builders/LogManagerBuilder.java: ## @@ -55,6 +55,7 @@ public class LogManagerBuilder { private Time time = Time.SYSTEM; private boolean keepPartitionMetadataFile = true; private boolean remoteStorageSystemEnable = false; +private long initialTaskDelayMs = LogConfig.DEFAULT_INITIAL_TASK_DELAY_MS; Review Comment: please add setter for it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]
lianetm opened a new pull request, #15738: URL: https://github.com/apache/kafka/pull/15738 Updating consumer system test that was failing with the new protocol, related to static membership behaviour. The behaviour regarding static consumers that join with conflicting group instance id is slightly different between the classic and new consumer protocol, so the expectations in the tests needed to be updated. If static members join with same instance id: - Classic protocol: all members join the group with the same group instance id, and then the first one will eventually fail (receives a HB error with FencedInstanceIdException) - Consumer protocol: new member with an instance Id already in use is not able to join, and first member remains active (new member with same instance Id receives an UnreleasedInstanceIdException in the response to the HB to join the group) This PR is keeping the single parametrized test that existed before, given that what's being tested and part of the test itself apply to all protocols. This is just updating the expectations that are different, based on the protocol parameter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]
chia7712 commented on code in PR #15714: URL: https://github.com/apache/kafka/pull/15714#discussion_r1567861049 ## storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java: ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InconsistentTopicIdException; + +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.nio.file.Files; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class PartitionMetadataFileTest { +private final File dir = TestUtils.tempDirectory(); + +@Test +public void testSetRecordWithDifferentTopicId() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +Uuid differentTopicId = Uuid.randomUuid(); +assertThrows(InconsistentTopicIdException.class, () -> partitionMetadataFile.record(differentTopicId)); +} + +@Test +public void testSetRecordWithSameTopicId() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +} + +@Test +public void testMaybeFlushWithTopicIdPresent() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); + +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); +assertDoesNotThrow(partitionMetadataFile::maybeFlush); + +assertDoesNotThrow(() -> { +List lines = Files.readAllLines(file.toPath()); +assertEquals(2, lines.size()); +assertEquals("version: 0", lines.get(0)); +assertEquals("topic_id: " + topicId, lines.get(1)); +}); +} + +@Test +public void testMaybeFlushWithNoTopicIdPresent() { +File file = PartitionMetadataFile.newFile(dir); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); + +assertDoesNotThrow(partitionMetadataFile::maybeFlush); +assertEquals(0, file.length()); +} + +@Test +public void testRead() { +File file = PartitionMetadataFile.newFile(dir); +LogDirFailureChannel channel = Mockito.mock(LogDirFailureChannel.class); +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, channel); + +Uuid topicId = Uuid.randomUuid(); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); Review Comment: >I used assertDoesNotThrow is because we expect the first time to record the topicId shouldn't generate any exception, since there's no dirtyTopicIdOpt I agree to that. My point was `assertDoesNotThrow` converts the checked exception to unchecked exception, and it is useful when we run assert in the lambda. However, in those test case we can add the exception to method signature, and just let it fail. In short, `assertDoesNotThrow` in those test cases is redundant wrapper. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
chia7712 commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1567855363 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -391,13 +391,20 @@ void enqueueMetadataChangeEvent( // Events handled by Migration Driver. abstract class MigrationEvent implements EventQueue.Event { +// Use no-op handler by default because the handleException will be overridden if needed +private Consumer retryHandler = NO_OP_HANDLER; + +public void retryHandler(Consumer retryHandler) { +this.retryHandler = retryHandler; +} Review Comment: Also, should we call `wakeup` (run next poll ASAP) rather that `scheduleDeferred` if the exception is retryable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
chia7712 commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1567853719 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -391,13 +391,20 @@ void enqueueMetadataChangeEvent( // Events handled by Migration Driver. abstract class MigrationEvent implements EventQueue.Event { +// Use no-op handler by default because the handleException will be overridden if needed +private Consumer retryHandler = NO_OP_HANDLER; + +public void retryHandler(Consumer retryHandler) { +this.retryHandler = retryHandler; +} Review Comment: +1 to this style! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16336) Remove Deprecated metric standby-process-ratio
[ https://issues.apache.org/jira/browse/KAFKA-16336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walter Hernandez reassigned KAFKA-16336: Assignee: Walter Hernandez > Remove Deprecated metric standby-process-ratio > -- > > Key: KAFKA-16336 > URL: https://issues.apache.org/jira/browse/KAFKA-16336 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Walter Hernandez >Priority: Blocker > Fix For: 4.0.0 > > > Metric "standby-process-ratio" was deprecated in 3.5 release via > https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
junrao commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1567796843 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: Thanks for the detailed explanation. For the `makeLeaders` path, it will call `UnifiedLog.convertToOffsetMetadataOrThrow`. Within it, `checkLogStartOffset(offset)` shouldn't throw OFFSET_OUT_OF_RANGE since we are comparing the offset with logStartOffset. Do you know which part throws OFFSET_OUT_OF_RANGE error? For the follower fetch path, it's bounded by `LogEndOffset`. So it shouldn't need to call `UnifiedLog.fetchHighWatermarkMetadata`, right? The regular consumer will call `UnifiedLog.fetchHighWatermarkMetadata`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] JMH Benchmarks for Server Side Rebalances: KIP_848 [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1567823511 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +@Param({"1000", "1"}) +private int memberCount; + +@Param({"10", "50"}) +private int partitionsPerTopicCount; + +@Param({"100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"true", "false"}) +private boolean simulateRebalanceTrigger; + +private PartitionAssignor partitionAssignor; + +private static final int numberOfRacks = 3; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = createTopicMetadata(); +subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + +createAssignmentSpec(topicMetadata); + +partitionAssignor = assignorType.assignor(); + +if (simulateRebalanceTrigger) { +simulateIncrementalRebalance(topicMetadata); +} +} + +private Map createTopicMetadata() { +Map topicMetadata = new HashMap<>(); +Map> partitionRacks =
[jira] [Comment Edited] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol
[ https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826 ] Lianet Magrans edited comment on KAFKA-16566 at 4/16/24 6:59 PM: - Hey [~ableegoldman], this is specific to the static membership, and related to the consumer system tests only, that were parametrized to run with the legacy and new protocol/consumer. You're on the right page regarding the rest: Streams tests haven't been migrated yet because it's not integrated with the new protocol. Classic protocol refers to the existing group protocol, and Consumer protocol refers to the new one introduced with KIP-848 (just using the names proposed to be used in the configs to switch between both) was (Author: JIRAUSER300183): Hey [~ableegoldman], this is specific to the static membership, and related to the consumer system tests only, that were parametrized to run with the legacy and new coordinator/consumer. You're on the right page regarding the rest: Streams tests haven't been migrated yet because it's not integrated with the new protocol. Classic protocol refers to the existing group protocol, and Consumer protocol refers to the new one introduced with KIP-848 (just using the names proposed to be used in the configs to switch between both) > Update consumer static membership fencing system test to support new protocol > - > > Key: KAFKA-16566 > URL: https://issues.apache.org/jira/browse/KAFKA-16566 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer > that verifies the sequence in which static members join a group when using > conflicting instance id. This behaviour is different in the classic and > consumer protocol, so the tests should be updated to set the right > expectations when running with the new consumer protocol. Note that what the > tests covers (params, setup), apply to both protocols. It is the expected > results that are not the same. > When conflicts between static members joining a group: > Classic protocol: all members join the group with the same group instance id, > and then the first one will eventually receive a HB error with > FencedInstanceIdException > Consumer protocol: new member with an instance Id already in use is not able > to join, receiving an UnreleasedInstanceIdException in the response to the HB > to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16565: IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned [kafka]
kirktrue opened a new pull request, #15737: URL: https://github.com/apache/kafka/pull/15737 Checking that the `TopicPartition` is in assignment before attempting to remove it. Also added some logging and refactoring. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] JMH Benchmarks for Server Side Rebalances: KIP_848 [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1567815640 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/TargetAssignmentBuilderBenchmark.java: ## @@ -0,0 +1,259 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.consumer.VersionedMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class TargetAssignmentBuilderBenchmark { + +@Param({"1000", "1"}) +private int memberCount; + +@Param({"10", "50"}) +private int partitionsPerTopicCount; + +@Param({"1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isSubscriptionUniform; + +@Param({"true", "false"}) +private boolean isRangeAssignor; + +@Param({"true", "false"}) +private boolean isRackAware; Review Comment: OKay let's use the uniform assignor with homogenous subscriptions and not rack aware -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] JMH Benchmarks for Server Side Rebalances: KIP_848 [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1567812184 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +@Param({"10", "50", "100"}) +private int partitionsPerTopicCount; + +@Param({"100"}) +private int topicCount; + +@Param({"500", "1000"}) +private int memberCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"true", "false"}) +private boolean isSubscriptionUniform; + +@Param({"true", "false"}) +private boolean isRangeAssignor; + +@Param({"true", "false"}) +private boolean isReassignment; + +private PartitionAssignor partitionAssignor; + +private final int numberOfRacks = 3; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = new HashMap<>(); +Map> partitionRacks = isRackAware ? +mkMapOfPartitionRacks(partitionsPerTopicCount) : +Collections.emptyMap(); + +for (int i = 1; i <= topicCount; i++) { +Uuid topicUuid = Uuid.randomUuid(); +String topicName = "topic" + i; +topicMetadata.put(topicUuid, new TopicMetadata( +topicUuid, topicName, partitionsPerTopicCount, partitionRacks)); +} + +addTopicSubscriptions(topicMetadata); +this.subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + +if (isRangeAssignor) { +this.partitionAssignor = new RangeAssignor(); +} else { +this.partitionAssignor = new UniformAssignor(); +} + +if (isReassignment) { +GroupAssignment initialAssignment = partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber); +Map members; + +members = initialAssignment.members(); + +// Update the AssignmentSpec with the results from the initial assignment. +Map updatedMembers = new HashMap<>(); + +members.forEach((memberId, memberAssignment) -> { +AssignmentMemberSpec memberSpec = assignmentSpec.members().get(memberId); +updatedMembers.put(memberId, new AssignmentMemberSpec( +memberSpec.instanceId(), +memberSpec.rackId(), +memberSpec.subscribedTopicIds(), +memberAssignment.targetPartitions() +)); +}); + +// Add new member to trigger a reassignment. +Optional rackId = isRackAware ? Optional.of("rack" + (memberCount + 1) % numberOfRacks) : Optional.empty(); + +updatedMembers.put("newMember", new AssignmentMemberSpec( +Optional.empty(), +rackId, +topicMetadata.keySet(), +Collections.emptyMap() +
Re: [PR] JMH Benchmarks for Server Side Rebalances: KIP_848 [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1567812184 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +@Param({"10", "50", "100"}) +private int partitionsPerTopicCount; + +@Param({"100"}) +private int topicCount; + +@Param({"500", "1000"}) +private int memberCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"true", "false"}) +private boolean isSubscriptionUniform; + +@Param({"true", "false"}) +private boolean isRangeAssignor; + +@Param({"true", "false"}) +private boolean isReassignment; + +private PartitionAssignor partitionAssignor; + +private final int numberOfRacks = 3; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = new HashMap<>(); +Map> partitionRacks = isRackAware ? +mkMapOfPartitionRacks(partitionsPerTopicCount) : +Collections.emptyMap(); + +for (int i = 1; i <= topicCount; i++) { +Uuid topicUuid = Uuid.randomUuid(); +String topicName = "topic" + i; +topicMetadata.put(topicUuid, new TopicMetadata( +topicUuid, topicName, partitionsPerTopicCount, partitionRacks)); +} + +addTopicSubscriptions(topicMetadata); +this.subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + +if (isRangeAssignor) { +this.partitionAssignor = new RangeAssignor(); +} else { +this.partitionAssignor = new UniformAssignor(); +} + +if (isReassignment) { +GroupAssignment initialAssignment = partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber); +Map members; + +members = initialAssignment.members(); + +// Update the AssignmentSpec with the results from the initial assignment. +Map updatedMembers = new HashMap<>(); + +members.forEach((memberId, memberAssignment) -> { +AssignmentMemberSpec memberSpec = assignmentSpec.members().get(memberId); +updatedMembers.put(memberId, new AssignmentMemberSpec( +memberSpec.instanceId(), +memberSpec.rackId(), +memberSpec.subscribedTopicIds(), +memberAssignment.targetPartitions() +)); +}); + +// Add new member to trigger a reassignment. +Optional rackId = isRackAware ? Optional.of("rack" + (memberCount + 1) % numberOfRacks) : Optional.empty(); + +updatedMembers.put("newMember", new AssignmentMemberSpec( +Optional.empty(), +rackId, +topicMetadata.keySet(), +Collections.emptyMap() +
Re: [PR] JMH Benchmarks for Server Side Rebalances: KIP_848 [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1567812184 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,185 @@ +package org.apache.kafka.jmh.group_coordinator; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +@Param({"10", "50", "100"}) +private int partitionsPerTopicCount; + +@Param({"100"}) +private int topicCount; + +@Param({"500", "1000"}) +private int memberCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"true", "false"}) +private boolean isSubscriptionUniform; + +@Param({"true", "false"}) +private boolean isRangeAssignor; + +@Param({"true", "false"}) +private boolean isReassignment; + +private PartitionAssignor partitionAssignor; + +private final int numberOfRacks = 3; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = new HashMap<>(); +Map> partitionRacks = isRackAware ? +mkMapOfPartitionRacks(partitionsPerTopicCount) : +Collections.emptyMap(); + +for (int i = 1; i <= topicCount; i++) { +Uuid topicUuid = Uuid.randomUuid(); +String topicName = "topic" + i; +topicMetadata.put(topicUuid, new TopicMetadata( +topicUuid, topicName, partitionsPerTopicCount, partitionRacks)); +} + +addTopicSubscriptions(topicMetadata); +this.subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + +if (isRangeAssignor) { +this.partitionAssignor = new RangeAssignor(); +} else { +this.partitionAssignor = new UniformAssignor(); +} + +if (isReassignment) { +GroupAssignment initialAssignment = partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber); +Map members; + +members = initialAssignment.members(); + +// Update the AssignmentSpec with the results from the initial assignment. +Map updatedMembers = new HashMap<>(); + +members.forEach((memberId, memberAssignment) -> { +AssignmentMemberSpec memberSpec = assignmentSpec.members().get(memberId); +updatedMembers.put(memberId, new AssignmentMemberSpec( +memberSpec.instanceId(), +memberSpec.rackId(), +memberSpec.subscribedTopicIds(), +memberAssignment.targetPartitions() +)); +}); + +// Add new member to trigger a reassignment. +Optional rackId = isRackAware ? Optional.of("rack" + (memberCount + 1) % numberOfRacks) : Optional.empty(); + +updatedMembers.put("newMember", new AssignmentMemberSpec( +Optional.empty(), +rackId, +topicMetadata.keySet(), +Collections.emptyMap() +
[jira] [Commented] (KAFKA-15250) ConsumerNetworkThread is running tight loop
[ https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837836#comment-17837836 ] Philip Nee commented on KAFKA-15250: Added a testing branch under my fork: network-thread-mbean-metrics I ran the testAsyncCommit in the integration test (because it actually polls). These are the stats: backoff time avg:0.29459381324082107 (backoff time max,92.0) poll time avg:9.2161365261735107E18 (poll time max,9.223372036854776E18) The loop doesn't really backoff. > ConsumerNetworkThread is running tight loop > --- > > Key: KAFKA-15250 > URL: https://issues.apache.org/jira/browse/KAFKA-15250 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, events > Fix For: 3.8.0 > > > The DefaultBackgroundThread is running tight loops and wasting CPU cycles. I > think we need to reexamine the timeout pass to networkclientDelegate.poll. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16566) Update consumer static membership fencing system test to support new protocol
[ https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16566: --- Summary: Update consumer static membership fencing system test to support new protocol (was: Update static membership fencing system test to support new protocol) > Update consumer static membership fencing system test to support new protocol > - > > Key: KAFKA-16566 > URL: https://issues.apache.org/jira/browse/KAFKA-16566 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer > that verifies the sequence in which static members join a group when using > conflicting instance id. This behaviour is different in the classic and > consumer protocol, so the tests should be updated to set the right > expectations when running with the new consumer protocol. Note that what the > tests covers (params, setup), apply to both protocols. It is the expected > results that are not the same. > When conflicts between static members joining a group: > Classic protocol: all members join the group with the same group instance id, > and then the first one will eventually receive a HB error with > FencedInstanceIdException > Consumer protocol: new member with an instance Id already in use is not able > to join, receiving an UnreleasedInstanceIdException in the response to the HB > to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] JMH Benchmarks for Server Side Rebalances: KIP_848 [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1567794140 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/AssignPartitionsMicroBenchmark.java: ## @@ -0,0 +1,153 @@ +package org.apache.kafka.jmh.group_coordinator; Review Comment: okay yeah I'll do that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16566) Update static membership fencing system test to support new protocol
[ https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826 ] Lianet Magrans edited comment on KAFKA-16566 at 4/16/24 6:34 PM: - Hey [~ableegoldman], this is specific to the static membership, and related to the consumer system tests only, that were parametrized to run with the legacy and new coordinator/consumer. You're on the right page regarding the rest: Streams tests haven't been migrated yet because it's not integrated with the new protocol. Classic protocol refers to the existing group protocol, and Consumer protocol refers to the new one introduced with KIP-848 (just using the names proposed to be used in the configs to switch between both) was (Author: JIRAUSER300183): Hey [~ableegoldman], this is specific to the static membership, and related to the consumer system tests only, that were parametrized to run with the legacy and new coordinator/consumer. You're on the right page regarding the rest: Streams tests haven't been migrated yet because it's not integrated with the new protocol. > Update static membership fencing system test to support new protocol > > > Key: KAFKA-16566 > URL: https://issues.apache.org/jira/browse/KAFKA-16566 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer > that verifies the sequence in which static members join a group when using > conflicting instance id. This behaviour is different in the classic and > consumer protocol, so the tests should be updated to set the right > expectations when running with the new consumer protocol. Note that what the > tests covers (params, setup), apply to both protocols. It is the expected > results that are not the same. > When conflicts between static members joining a group: > Classic protocol: all members join the group with the same group instance id, > and then the first one will eventually receive a HB error with > FencedInstanceIdException > Consumer protocol: new member with an instance Id already in use is not able > to join, receiving an UnreleasedInstanceIdException in the response to the HB > to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]
kirktrue commented on PR #15723: URL: https://github.com/apache/kafka/pull/15723#issuecomment-2059706910 @lianetm & @lucasbru—I've addressed your comments, and this is ready for another review. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]
kirktrue commented on code in PR #15723: URL: https://github.com/apache/kafka/pull/15723#discussion_r1567791850 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java: ## @@ -66,6 +67,7 @@ public RequestState(final LogContext logContext, * and the backoff is restored to its minimal configuration. */ public void reset() { +this.requestInFlight = false; this.lastSentMs = -1; Review Comment: I've removed `lastSentMs` as it is no longer needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16566) Update static membership fencing system test to support new protocol
[ https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826 ] Lianet Magrans edited comment on KAFKA-16566 at 4/16/24 6:31 PM: - Hey [~ableegoldman], this is specific to the static membership, and related to the consumer system tests only, that were parametrized to run with the legacy and new coordinator/consumer. You're on the right page regarding the rest: Streams tests haven't been migrated yet because it's not integrated with the new protocol. was (Author: JIRAUSER300183): Hey [~ableegoldman], this is specific to the static membership, and related to the consumer system tests only, that was parametrized to run with the legacy and new coordinator/consumer. You're on the right page regarding the rest: Streams tests haven't been migrated yet because it's not integrated with the new protocol. > Update static membership fencing system test to support new protocol > > > Key: KAFKA-16566 > URL: https://issues.apache.org/jira/browse/KAFKA-16566 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer > that verifies the sequence in which static members join a group when using > conflicting instance id. This behaviour is different in the classic and > consumer protocol, so the tests should be updated to set the right > expectations when running with the new consumer protocol. Note that what the > tests covers (params, setup), apply to both protocols. It is the expected > results that are not the same. > When conflicts between static members joining a group: > Classic protocol: all members join the group with the same group instance id, > and then the first one will eventually receive a HB error with > FencedInstanceIdException > Consumer protocol: new member with an instance Id already in use is not able > to join, receiving an UnreleasedInstanceIdException in the response to the HB > to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16566) Update static membership fencing system test to support new protocol
[ https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826 ] Lianet Magrans edited comment on KAFKA-16566 at 4/16/24 6:30 PM: - Hey [~ableegoldman], this is specific to the static membership, and related to the consumer system tests only, that was parametrized to run with the legacy and new coordinator/consumer. You're on the right page regarding the rest: Streams tests haven't been migrated yet because it's not integrated with the new protocol. was (Author: JIRAUSER300183): Hey [~ableegoldman], this is specific to the static membership, and related to the consumer system tests only, that were parametrized to run with the legacy and new coordinator/consumer. You're on the right page regarding the rest: Streams tests haven't been migrated yet because it's not integrated with the new protocol. > Update static membership fencing system test to support new protocol > > > Key: KAFKA-16566 > URL: https://issues.apache.org/jira/browse/KAFKA-16566 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer > that verifies the sequence in which static members join a group when using > conflicting instance id. This behaviour is different in the classic and > consumer protocol, so the tests should be updated to set the right > expectations when running with the new consumer protocol. Note that what the > tests covers (params, setup), apply to both protocols. It is the expected > results that are not the same. > When conflicts between static members joining a group: > Classic protocol: all members join the group with the same group instance id, > and then the first one will eventually receive a HB error with > FencedInstanceIdException > Consumer protocol: new member with an instance Id already in use is not able > to join, receiving an UnreleasedInstanceIdException in the response to the HB > to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16566) Update static membership fencing system test to support new protocol
[ https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837826#comment-17837826 ] Lianet Magrans commented on KAFKA-16566: Hey [~ableegoldman], this is specific to the static membership, and related to the consumer system tests only, that were parametrized to run with the legacy and new coordinator/consumer. You're on the right page regarding the rest: Streams tests haven't been migrated yet because it's not integrated with the new protocol. > Update static membership fencing system test to support new protocol > > > Key: KAFKA-16566 > URL: https://issues.apache.org/jira/browse/KAFKA-16566 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer > that verifies the sequence in which static members join a group when using > conflicting instance id. This behaviour is different in the classic and > consumer protocol, so the tests should be updated to set the right > expectations when running with the new consumer protocol. Note that what the > tests covers (params, setup), apply to both protocols. It is the expected > results that are not the same. > When conflicts between static members joining a group: > Classic protocol: all members join the group with the same group instance id, > and then the first one will eventually receive a HB error with > FencedInstanceIdException > Consumer protocol: new member with an instance Id already in use is not able > to join, receiving an UnreleasedInstanceIdException in the response to the HB > to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16566) Update static membership fencing system test to support new protocol
[ https://issues.apache.org/jira/browse/KAFKA-16566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837820#comment-17837820 ] A. Sophie Blee-Goldman commented on KAFKA-16566: I'm a bit confused – I thought we haven't migrated Streams over to the new consumer rebalancing protocol. Or is this referring to something else? What is the "classic" vs "consumer" protocol? And when/why did we migrate our system tests to using it? Does it have to do with static membership specifically? Sorry for being out of the loop here > Update static membership fencing system test to support new protocol > > > Key: KAFKA-16566 > URL: https://issues.apache.org/jira/browse/KAFKA-16566 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer > that verifies the sequence in which static members join a group when using > conflicting instance id. This behaviour is different in the classic and > consumer protocol, so the tests should be updated to set the right > expectations when running with the new consumer protocol. Note that what the > tests covers (params, setup), apply to both protocols. It is the expected > results that are not the same. > When conflicts between static members joining a group: > Classic protocol: all members join the group with the same group instance id, > and then the first one will eventually receive a HB error with > FencedInstanceIdException > Consumer protocol: new member with an instance Id already in use is not able > to join, receiving an UnreleasedInstanceIdException in the response to the HB > to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16566) Update static membership fencing system test to support new protocol
Lianet Magrans created KAFKA-16566: -- Summary: Update static membership fencing system test to support new protocol Key: KAFKA-16566 URL: https://issues.apache.org/jira/browse/KAFKA-16566 Project: Kafka Issue Type: Bug Affects Versions: 3.7.0 Reporter: Lianet Magrans Assignee: Lianet Magrans Fix For: 3.8.0 consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer that verifies the sequence in which static members join a group when using conflicting instance id. This behaviour is different in the classic and consumer protocol, so the tests should be updated to set the right expectations when running with the new consumer protocol. Note that what the tests covers (params, setup), apply to both protocols. It is the expected results that are not the same. When conflicts between static members joining a group: Classic protocol: all members join the group with the same group instance id, and then the first one will eventually receive a HB error with FencedInstanceIdException Consumer protocol: new member with an instance Id already in use is not able to join, receiving an UnreleasedInstanceIdException in the response to the HB to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Remove unneeded explicit type arguments [kafka]
mimaison opened a new pull request, #15736: URL: https://github.com/apache/kafka/pull/15736 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
OmniaGM commented on PR #15569: URL: https://github.com/apache/kafka/pull/15569#issuecomment-2059643602 > Let's just let Omnia decides which PR she prefers doing first. Then we can all review that PR, and once merged move onto the next one. This would avoid unnecessary churn as I expect she's getting tired of rebasing all of these PRs every day! Thanks @mimaison this is really thoughtful! breaking KafkaConfig out of core is one of the hardest cleanup specially if we want to move with smaller/medium PRs. So I don't see any other way than going through it. I think it might be better to take them in order so we can get them merged and the rebases can be easier. Let's merge https://github.com/apache/kafka/pull/15684 as - it is smaller - more contained than the log config - and group configs are more likely to change as part of KIP-848 and any of its followups which make the conflicts much harder and risker to get messed up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16475 add TopicImageNodeTest [kafka]
cmccabe commented on PR #15735: URL: https://github.com/apache/kafka/pull/15735#issuecomment-2059637927 Thanks, @mannoopj. Since this adds some new test cases beyond what was in #15720, can you rebase your PR on trunk? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]
mwesterby commented on PR #15733: URL: https://github.com/apache/kafka/pull/15733#issuecomment-2059603416 @chia7712 Thanks again for the review. I've implemented all of your suggested. Please let me know if you spot anything else I've missed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]
mwesterby commented on code in PR #15733: URL: https://github.com/apache/kafka/pull/15733#discussion_r1567721679 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -440,8 +440,12 @@ object StorageTool extends Logging { "Use --ignore-formatted to ignore this directory and format the others.") } if (!copier.errorLogDirs().isEmpty) { - val firstLogDir = copier.errorLogDirs().iterator().next() - throw new TerseFailure(s"I/O error trying to read log directory $firstLogDir.") + copier.errorLogDirs().forEach(errorLogDir => { +stream.println(s"I/O error trying to read log directory $errorLogDir. Ignoring...") + }) + if(metaPropertiesEnsemble.emptyLogDirs().isEmpty) { +throw new TerseFailure("No available log directories to format.") Review Comment: Discussed below. I've now added the additional check to see if `logDirProps` is empty, and only throw this failure when this is also true. This is to prevent the format storage script from failing when the only remaining directories to be formatted are unavailable, but there is at least one directory already formatted, and the script is ran with `--ignore-formatted`. In this scenario, it should succeed as there is at least one successfully formatted directory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]
mwesterby commented on code in PR #15733: URL: https://github.com/apache/kafka/pull/15733#discussion_r1567721679 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -440,8 +440,12 @@ object StorageTool extends Logging { "Use --ignore-formatted to ignore this directory and format the others.") } if (!copier.errorLogDirs().isEmpty) { - val firstLogDir = copier.errorLogDirs().iterator().next() - throw new TerseFailure(s"I/O error trying to read log directory $firstLogDir.") + copier.errorLogDirs().forEach(errorLogDir => { +stream.println(s"I/O error trying to read log directory $errorLogDir. Ignoring...") + }) + if(metaPropertiesEnsemble.emptyLogDirs().isEmpty) { +throw new TerseFailure("No available log directories to format.") Review Comment: Discussed below. I've now added the additional check to see if `logDirProps` is empty, and only throw this failure when this is also true. This is to prevent the format storage script from failing when then only remaining directories to be formatted are unavailable, but there is at least one directory already formatted, and the script is ran with `--ignore-formatted`. In this scenario, it should succeed as there is at least one successfully formatted directory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16565) IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned
[ https://issues.apache.org/jira/browse/KAFKA-16565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16565: -- Labels: consumer-threading-refactor kip-848-client-support system-tests (was: ) > IncrementalAssignmentConsumerEventHandler throws error when attempting to > remove a partition that isn't assigned > > > Key: KAFKA-16565 > URL: https://issues.apache.org/jira/browse/KAFKA-16565 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.8.0 >Reporter: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > > In {{{}verifiable_consumer.py{}}}, the Incremental > > {code:java} > def handle_partitions_revoked(self, event): > self.revoked_count += 1 > self.state = ConsumerState.Rebalancing > self.position = {} > for topic_partition in event["partitions"]: > topic = topic_partition["topic"] > partition = topic_partition["partition"] > self.assignment.remove(TopicPartition(topic, partition)) > {code} > If the {{self.assignment.remove()}} call is passed a {{TopicPartition}} that > isn't in the list, an error is thrown. For now, we should first check that > the {{TopicPartition}} is in the list, and if not, log a warning or something. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16565) IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned
[ https://issues.apache.org/jira/browse/KAFKA-16565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16565: - Assignee: Kirk True > IncrementalAssignmentConsumerEventHandler throws error when attempting to > remove a partition that isn't assigned > > > Key: KAFKA-16565 > URL: https://issues.apache.org/jira/browse/KAFKA-16565 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > > In {{{}verifiable_consumer.py{}}}, the Incremental > > {code:java} > def handle_partitions_revoked(self, event): > self.revoked_count += 1 > self.state = ConsumerState.Rebalancing > self.position = {} > for topic_partition in event["partitions"]: > topic = topic_partition["topic"] > partition = topic_partition["partition"] > self.assignment.remove(TopicPartition(topic, partition)) > {code} > If the {{self.assignment.remove()}} call is passed a {{TopicPartition}} that > isn't in the list, an error is thrown. For now, we should first check that > the {{TopicPartition}} is in the list, and if not, log a warning or something. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16565) IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned
Kirk True created KAFKA-16565: - Summary: IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned Key: KAFKA-16565 URL: https://issues.apache.org/jira/browse/KAFKA-16565 Project: Kafka Issue Type: Bug Components: clients, consumer, system tests Affects Versions: 3.8.0 Reporter: Kirk True Fix For: 3.8.0 In {{{}verifiable_consumer.py{}}}, the Incremental {code:java} def handle_partitions_revoked(self, event): self.revoked_count += 1 self.state = ConsumerState.Rebalancing self.position = {} for topic_partition in event["partitions"]: topic = topic_partition["topic"] partition = topic_partition["partition"] self.assignment.remove(TopicPartition(topic, partition)) {code} If the {{self.assignment.remove()}} call is passed a {{TopicPartition}} that isn't in the list, an error is thrown. For now, we should first check that the {{TopicPartition}} is in the list, and if not, log a warning or something. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]
mwesterby commented on code in PR #15733: URL: https://github.com/apache/kafka/pull/15733#discussion_r1567721679 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -440,8 +440,12 @@ object StorageTool extends Logging { "Use --ignore-formatted to ignore this directory and format the others.") } if (!copier.errorLogDirs().isEmpty) { - val firstLogDir = copier.errorLogDirs().iterator().next() - throw new TerseFailure(s"I/O error trying to read log directory $firstLogDir.") + copier.errorLogDirs().forEach(errorLogDir => { +stream.println(s"I/O error trying to read log directory $errorLogDir. Ignoring...") + }) + if(metaPropertiesEnsemble.emptyLogDirs().isEmpty) { +throw new TerseFailure("No available log directories to format.") Review Comment: Discussed below. I've now added the additional check to see if `logDirProps` is empty, and only throw this failure when this is also true. This is to prevent the format storage script from failing when there is at least one directory already formatted, and the only other non-formatted directories are unavailable, when it is ran with `--ignore-formatted`. In this scenario, it should succeed as there is at least one successfully formatted directory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]
mwesterby commented on code in PR #15733: URL: https://github.com/apache/kafka/pull/15733#discussion_r1567721679 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -440,8 +440,12 @@ object StorageTool extends Logging { "Use --ignore-formatted to ignore this directory and format the others.") } if (!copier.errorLogDirs().isEmpty) { - val firstLogDir = copier.errorLogDirs().iterator().next() - throw new TerseFailure(s"I/O error trying to read log directory $firstLogDir.") + copier.errorLogDirs().forEach(errorLogDir => { +stream.println(s"I/O error trying to read log directory $errorLogDir. Ignoring...") + }) + if(metaPropertiesEnsemble.emptyLogDirs().isEmpty) { +throw new TerseFailure("No available log directories to format.") Review Comment: Discussed below. I've now added the additional check to see if `logDirProps` is empty, and only throw this failure when this is also true. This is to prevent the format storage script from failing when there is at least one directory already formatted, and the only other non-formatted directories are unavailable, when it is ran with `--ignore-formatted`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]
kirktrue commented on code in PR #15723: URL: https://github.com/apache/kafka/pull/15723#discussion_r1567722373 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestStateTest.java: ## @@ -48,4 +50,51 @@ public void testRequestStateSimple() { state.reset(); assertTrue(state.canSendRequest(200)); } + +@Test +public void testTrackInflightOnSuccessfulAttempt() { +testTrackInflight(RequestState::onSuccessfulAttempt); +} + +@Test +public void testTrackInflightOnFailedAttempt() { +testTrackInflight(RequestState::onFailedAttempt); +} + +/** + * In some cases, the network layer is very fast and can send out a second request within the same + * millisecond timestamp as receiving the first response. + * + * + * + * The previous logic for tracking inflight status used timestamps: if the timestamp from the last received + * response was less than the timestamp from the last sent request, we'd interpret that as having an + * inflight request. However, this approach would incorrectly return false from + * {@link RequestState#requestInFlight()} if the two timestamps were equal. + */ Review Comment: I figure the existing PR description covers the basics. I'll just remove the test comment wholesale. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]
mwesterby commented on code in PR #15733: URL: https://github.com/apache/kafka/pull/15733#discussion_r1567721679 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -440,8 +440,12 @@ object StorageTool extends Logging { "Use --ignore-formatted to ignore this directory and format the others.") } if (!copier.errorLogDirs().isEmpty) { - val firstLogDir = copier.errorLogDirs().iterator().next() - throw new TerseFailure(s"I/O error trying to read log directory $firstLogDir.") + copier.errorLogDirs().forEach(errorLogDir => { +stream.println(s"I/O error trying to read log directory $errorLogDir. Ignoring...") + }) + if(metaPropertiesEnsemble.emptyLogDirs().isEmpty) { +throw new TerseFailure("No available log directories to format.") Review Comment: Discussed below. I've now added the additional check to see if `logDirProps` is empty, and only throw this failure when this is also true. This is to prevent the format storage script from failing when there is at least one directory already formatted, and the only other directories are unavailable, when it is ran with `--ignore-formatted`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16559) allow defining number of disks per broker in TestKitNodes
[ https://issues.apache.org/jira/browse/KAFKA-16559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16559: -- Assignee: Gaurav Narula (was: Chia-Ping Tsai) > allow defining number of disks per broker in TestKitNodes > - > > Key: KAFKA-16559 > URL: https://issues.apache.org/jira/browse/KAFKA-16559 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Gaurav Narula >Priority: Major > > from: https://github.com/apache/kafka/pull/15136#discussion_r1565571409 > That allow us to run the reassignment tests. > Also, we should enhance setNumBrokerNodes > (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/testkit/TestKitNodes.java#L81) > to accept extra argument to define the number of folders (by > setLogDirectories) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16559) allow defining number of disks per broker in TestKitNodes
[ https://issues.apache.org/jira/browse/KAFKA-16559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-16559: --- Summary: allow defining number of disks per broker in TestKitNodes (was: Support to define the number of data folders in ClusterTest) > allow defining number of disks per broker in TestKitNodes > - > > Key: KAFKA-16559 > URL: https://issues.apache.org/jira/browse/KAFKA-16559 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > from: https://github.com/apache/kafka/pull/15136#discussion_r1565571409 > That allow us to run the reassignment tests. > Also, we should enhance setNumBrokerNodes > (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/testkit/TestKitNodes.java#L81) > to accept extra argument to define the number of folders (by > setLogDirectories) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16559: allow defining number of disks per broker in ClusterTest [kafka]
gaurav-narula commented on code in PR #15730: URL: https://github.com/apache/kafka/pull/15730#discussion_r1567713128 ## core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.testkit; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +public class KafkaClusterTestKitTest { +@Test +public void testCreateClusterWithNoDisksThrows() { +try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder( Review Comment: Using `assertThrowsExactly` instead to avoid the equality check on `RuntimeException.class` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16475: add test for TopicImageNodeTest [kafka]
cmccabe merged PR #15720: URL: https://github.com/apache/kafka/pull/15720 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16475: add test for TopicImageNodeTest [kafka]
cmccabe commented on PR #15720: URL: https://github.com/apache/kafka/pull/15720#issuecomment-2059575247 Thanks, @johnnychhsu -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16559: allow defining number of disks per broker in ClusterTest [kafka]
chia7712 commented on code in PR #15730: URL: https://github.com/apache/kafka/pull/15730#discussion_r1567703623 ## core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.testkit; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +public class KafkaClusterTestKitTest { +@Test +public void testCreateClusterWithNoDisksThrows() { +try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder( Review Comment: We can use `assertThrows` to rewrite it. For example: ```java Exception e = assertThrows(Exception.class, () -> new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setBrokerNodes(1, 0). setNumControllerNodes(1).build())); assertEquals(RuntimeException.class, e.getClass()); assertEquals("Invalid value for disksPerBroker", e.getMessage()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16559: allow defining number of disks per broker in ClusterTest [kafka]
chia7712 commented on code in PR #15730: URL: https://github.com/apache/kafka/pull/15730#discussion_r1567700594 ## core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java: ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.testkit; + +import org.hamcrest.Matcher; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.nio.file.Paths; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +public class KafkaClusterTestKitTest { +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) { +try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder( +new TestKitNodes.Builder(). +setBrokerNodes(5, 2). +setCombined(combined). +setNumControllerNodes(3).build()).build()) { + +assertEquals(5, cluster.nodes().brokerNodes().size()); +assertEquals(3, cluster.nodes().controllerNodes().size()); + +cluster.nodes().brokerNodes().forEach((brokerId, node) -> { +assertEquals(2, node.logDataDirectories().size()); +Matcher> matcher = containsInAnyOrder(String.format("broker_%d_data0", brokerId), String.format("broker_%d_data1", brokerId)); Review Comment: > is there any reason not to use hamcrest in core when we do use it in other modules? I'm curious. Maybe our modules have different tastes of code style :smile: ## core/src/test/java/kafka/testkit/KafkaClusterTestKitTest.java: ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.testkit; + +import org.hamcrest.Matcher; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.nio.file.Paths; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +public class KafkaClusterTestKitTest { +@ParameterizedTest +@ValueSource(booleans = {true, false}) +public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) { +try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder( +new TestKitNodes.Builder(). +setBrokerNodes(5, 2). +setCombined(combined). +setNumControllerNodes(3).build()).build()) { + +assertEquals(5, cluster.nodes().brokerNodes().size()); +assertEquals(3, cluster.nodes().controllerNodes().size()); + +cluster.nodes().brokerNodes().forEach((brokerId, node) -> { +assertEquals(2, node.logDataDirectories().size()); +Matcher> matcher = containsInAnyOrder(String.format("broker_%d_data0", brokerId), String.format("broker_%d_data1", brokerId)); Review Comment: > is there any reason not to use hamcrest in core when we do use it in other modules?
Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]
mwesterby commented on code in PR #15733: URL: https://github.com/apache/kafka/pull/15733#discussion_r1567699963 ## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ## @@ -192,6 +192,66 @@ Found problem: } finally Utils.delete(tempDir) } + private def runFormatCommand(stream: ByteArrayOutputStream, directories: Seq[String]): Int = { +val metaProperties = new MetaProperties.Builder(). + setVersion(MetaPropertiesVersion.V1). + setClusterId("XcZZOzUqS4yHOjhMQB6JLQ"). + setNodeId(2). + build() +val bootstrapMetadata = StorageTool.buildBootstrapMetadata(MetadataVersion.latestTesting(), None, "test format command") +StorageTool.formatCommand(new PrintStream(stream), directories, metaProperties, bootstrapMetadata, MetadataVersion.latestTesting(), ignoreFormatted = false) + } + + @Test + def testFormatSucceedsIfAllDirectoriesAreAvailable(): Unit = { +val availableDir1 = TestUtils.tempDir() +val availableDir2 = TestUtils.tempDir() +try { + val stream = new ByteArrayOutputStream() + assertEquals(0, runFormatCommand(stream, Seq(availableDir1.toString, availableDir2.toString))) + assertTrue(stream.toString().contains("Formatting %s".format(availableDir1))) + assertTrue(stream.toString().contains("Formatting %s".format(availableDir2))) +} finally { + Utils.delete(availableDir1) + Utils.delete(availableDir2) +} + } + + @Test + def testFormatSucceedsIfAtLeastOneDirectoryIsAvailable(): Unit = { +val availableDir1 = TestUtils.tempDir() +val unavailableDir1 = TestUtils.tempFile() +try { + val stream = new ByteArrayOutputStream() + assertEquals(0, runFormatCommand(stream, Seq(availableDir1.toString, unavailableDir1.toString))) + assertTrue(stream.toString().contains("I/O error trying to read log directory %s. Ignoring...".format(unavailableDir1))) + assertTrue(stream.toString().contains("Formatting %s".format(availableDir1))) + assertFalse(stream.toString().contains("Formatting %s".format(unavailableDir1))) +} finally { + Utils.delete(availableDir1) + Utils.delete(unavailableDir1) +} + } + + @Test + def testFormatFailsIfAllDirectoriesAreUnavailable(): Unit = { +val unavailableDir1 = TestUtils.tempFile() +val unavailableDir2 = TestUtils.tempFile() +try { + val stream = new ByteArrayOutputStream() + try { +assertEquals(1, runFormatCommand(stream, Seq(unavailableDir1.toString, unavailableDir2.toString))) + } catch { +case e: TerseFailure => assertEquals("No available log directories to format.", e.getMessage) Review Comment: Much cleaner, I'll refactor this assertion too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16555) Consumer's RequestState has incorrect logic to determine if inflight
[ https://issues.apache.org/jira/browse/KAFKA-16555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16555: -- Issue Type: Bug (was: Task) > Consumer's RequestState has incorrect logic to determine if inflight > > > Key: KAFKA-16555 > URL: https://issues.apache.org/jira/browse/KAFKA-16555 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > When running system tests for the new consumer, I've hit an issue where the > {{HeartbeatRequestManager}} is sending out multiple concurrent > {{CONSUMER_GROUP_REQUEST}} RPCs. The effect is the coordinator creates > multiple members which causes downstream assignment problems. > Here's the order of events: > * Time 202: {{HearbeatRequestManager.poll()}} determines it's OK to send a > request. In so doing, it updates the {{RequestState}}'s {{lastSentMs}} to the > current timestamp, 202 > * Time 236: the response is received and response handler is invoked, setting > the {{RequestState}}'s {{lastReceivedMs}} to the current timestamp, 236 > * Time 236: {{HearbeatRequestManager.poll()}} is invoked again, and it sees > that it's OK to send a request. It creates another request, once again > updating the {{RequestState}}'s {{lastSentMs}} to the current timestamp, 236 > * Time 237: {{HearbeatRequestManager.poll()}} is invoked again, and > ERRONEOUSLY decides it's OK to send another request, despite one already in > flight. > Here's the problem with {{requestInFlight()}}: > {code:java} > public boolean requestInFlight() { > return this.lastSentMs > -1 && this.lastReceivedMs < this.lastSentMs; > } > {code} > On our case, {{lastReceivedMs}} is 236 and {{lastSentMs}} is _also_ 236. So > the received timestamp is _equal_ to the sent timestamp, not _less_. -- This message was sent by Atlassian Jira (v8.20.10#820010)