Re: [PR] KAFKA-16093: Fix spurious REST-related warnings on Connect startup [kafka]
vamossagar12 commented on code in PR #15149: URL: https://github.com/apache/kafka/pull/15149#discussion_r1446980325 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java: ## @@ -41,18 +41,11 @@ public class HerderRequestHandler { private final RestClient restClient; -private volatile long requestTimeoutMs; +private final RestRequestTimeout requestTimeout; Review Comment: The field `requestTimeoutMs` was made volatile as part of [here](https://github.com/apache/kafka/pull/14562/files#diff-f2311f0c356f882d7768b97cb5f0054dc7040d29d455eab74ab585725454488aR44). I don't think we need it now, but just wanted to understand why was it added over there (you can skip the explanation if it's too complex :) ) ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java: ## @@ -41,18 +41,11 @@ public class HerderRequestHandler { private final RestClient restClient; -private volatile long requestTimeoutMs; +private final RestRequestTimeout requestTimeout; -public HerderRequestHandler(RestClient restClient, long requestTimeoutMs) { +public HerderRequestHandler(RestClient restClient, RestRequestTimeout requestTimeout) { this.restClient = restClient; -this.requestTimeoutMs = requestTimeoutMs; -} - -public void requestTimeoutMs(long requestTimeoutMs) { -if (requestTimeoutMs < 1) { -throw new IllegalArgumentException("REST request timeout must be positive"); -} Review Comment: I am not sure if this validation was needed in the past as well, but now I don't see it being present. Do you think it's needed? ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java: ## @@ -45,25 +45,40 @@ public void initializeResources(Herder herder) { } @Override -protected Collection regularResources() { +protected Collection> regularResources() { return Arrays.asList( -new RootResource(herder), -new ConnectorsResource(herder, config, restClient), -new InternalConnectResource(herder, restClient), -new ConnectorPluginsResource(herder) +RootResource.class, +ConnectorsResource.class, +InternalConnectResource.class, +ConnectorPluginsResource.class ); } @Override -protected Collection adminResources() { +protected Collection> adminResources() { return Arrays.asList( -new LoggingResource(herder) +LoggingResource.class ); } @Override protected void configureRegularResources(ResourceConfig resourceConfig) { registerRestExtensions(herder, resourceConfig); +resourceConfig.register(new Binder()); +} + +private class Binder extends AbstractBinder { +@Override +protected void configure() { +bind(herder).to(Herder.class); +bind(restClient).to(RestClient.class); +bind(config).to(RestServerConfig.class); +} +} + Review Comment: nit: We could probably move the class definition to the bottom so that all `configureXXXResources` method are together ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestRequestTimeout.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.rest; + +public interface RestRequestTimeout { Review Comment: nit: Mark this interface as `@FunctionalInterface`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16090) Refactor call to storage tool from kafka docker wrapper
[ https://issues.apache.org/jira/browse/KAFKA-16090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805005#comment-17805005 ] Mickael Maison commented on KAFKA-16090: Context: https://github.com/apache/kafka/pull/15048#discussion_r1442692990 > Refactor call to storage tool from kafka docker wrapper > --- > > Key: KAFKA-16090 > URL: https://issues.apache.org/jira/browse/KAFKA-16090 > Project: Kafka > Issue Type: Sub-task >Reporter: Vedarth Sharma >Assignee: Vedarth Sharma >Priority: Major > > Once rewrite of kafka storage tool is done, refactor how we are calling > storage tool from kafka docker wrapper -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16102) about DynamicListenerConfig, the dynamic modification of the listener's port or IP does not take effect.
[ https://issues.apache.org/jira/browse/KAFKA-16102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jialun Peng updated KAFKA-16102: Reviewer: (was: Colin Patrick McCabe) > about DynamicListenerConfig, the dynamic modification of the listener's port > or IP does not take effect. > > > Key: KAFKA-16102 > URL: https://issues.apache.org/jira/browse/KAFKA-16102 > Project: Kafka > Issue Type: Bug > Components: config >Affects Versions: 3.6.0 > Environment: Must be present in any environment. >Reporter: Jialun Peng >Assignee: Jialun Peng >Priority: Minor > Fix For: 3.8.0 > > Original Estimate: 96h > Remaining Estimate: 96h > > When I dynamically modify the parameters related to Kafka listeners, such as > changing the IP or port value of a listener, the dynamic parameters under the > corresponding path in ZooKeeper are updated. However, in reality, the > modification of the IP or port for the corresponding listener does not take > effect. This phenomenon consistently occurs. And there is a slight > improvement as the error "Security protocol cannot be updated for existing > listener" will be eliminated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15735) KRaft support in SaslMultiMechanismConsumerTest
[ https://issues.apache.org/jira/browse/KAFKA-15735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-15735. --- Fix Version/s: 3.8.0 Resolution: Fixed > KRaft support in SaslMultiMechanismConsumerTest > --- > > Key: KAFKA-15735 > URL: https://issues.apache.org/jira/browse/KAFKA-15735 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Assignee: Sanskar Jhajharia >Priority: Minor > Labels: kraft, kraft-test, newbie > Fix For: 3.8.0 > > > The following tests in SaslMultiMechanismConsumerTest in > core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala > need to be updated to support KRaft > 45 : def testMultipleBrokerMechanisms(): Unit = { > Scanned 94 lines. Found 0 KRaft tests out of 1 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [KAFKA-15735] Adding KRaft test [kafka]
omkreddy merged PR #15156: URL: https://github.com/apache/kafka/pull/15156 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15594: Add version 3.6 to Kafka Streams system tests [kafka]
mjsax commented on PR #15151: URL: https://github.com/apache/kafka/pull/15151#issuecomment-1884303284 Merged #15157 and rebased this PR afterwards. Re-triggered system test run: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/6025/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: bump dev version for system tests to 3.8 [kafka]
mjsax merged PR #15157: URL: https://github.com/apache/kafka/pull/15157 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14517:Implement regex subscriptions [kafka]
JimmyWang6 commented on PR #14327: URL: https://github.com/apache/kafka/pull/14327#issuecomment-1884289264 @dajac I've updated the PR. Please take a look when you have a moment. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16071) NPE in testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress
[ https://issues.apache.org/jira/browse/KAFKA-16071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804981#comment-17804981 ] Owen C.H. Leung commented on KAFKA-16071: - I think this is related to https://issues.apache.org/jira/browse/KAFKA-15140. I've created a PR to make `TopicCommandIntegrationTest` less flaky https://github.com/apache/kafka/pull/14891 > NPE in testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress > > > Key: KAFKA-16071 > URL: https://issues.apache.org/jira/browse/KAFKA-16071 > Project: Kafka > Issue Type: Test >Reporter: Luke Chen >Priority: Major > Labels: newbie, newbie++ > > Found in the CI build result. > > h3. Error Message > java.lang.NullPointerException > h3. Stacktrace > java.lang.NullPointerException at > org.apache.kafka.tools.TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(TopicCommandIntegrationTest.java:800) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > > > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15095/1/testReport/junit/org.apache.kafka.tools/TopicCommandIntegrationTest/Build___JDK_8_and_Scala_2_12___testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress_String__zk/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14546 - Support Partitioner fallback to default [kafka]
philipnee commented on code in PR #14531: URL: https://github.com/apache/kafka/pull/14531#discussion_r1446899891 ## clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java: ## @@ -26,6 +26,15 @@ */ public interface Partitioner extends Configurable, Closeable { +/** + * Indicate if the given topic is handled. Returning {@code false} will cause the Producer to fallback to default partitioning. + * + * @param topic The topic name + */ +default boolean partitionsTopic(String topic) { +return true; Review Comment: why is it true by default? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14546 - Support Partitioner fallback to default [kafka]
philipnee commented on PR #14531: URL: https://github.com/apache/kafka/pull/14531#issuecomment-1884215706 @jimbogithub - thanks for the PR, i've got a few questions to clarify: - `KafkaProducer.partition(...) not throw IllegalArgumentException if the Partitioner returns RecordMetadata.UNKNOWN_PARTITION `: I'm not sure if any partitioner actually return this UNKNOWN_PARTITION. it should be returned by the producer.partition no? - to the question above, I think different partitions has different implementations of handling missing topics. but since they all return some partition, are the default behaviors insufficient? bottom line is I'm not 100% convincing that the fallback to default is actually needed. - there's no test for the change you made. i think it would be good to add some to demonstrate some of the use cases. - I would also update the PR description. you can just use whatever described in the PR. @jolshan - if you get time maybe you can review this? it seems like you implemented a few partitioner so you are probably the better person to speak about this... thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14546 - Support Partitioner fallback to default [kafka]
philipnee commented on code in PR #14531: URL: https://github.com/apache/kafka/pull/14531#discussion_r1446881807 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1368,7 +1368,7 @@ private int partition(ProducerRecord record, byte[] serializedKey, byte[] if (record.partition() != null) return record.partition(); -if (partitioner != null) { +if (partitioner != null && partitioner.partitionsTopic(record.topic())) { Review Comment: there's a slight change to the behavior of the current API, could you modify the documentation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14546 - Support Partitioner fallback to default [kafka]
jimbogithub commented on PR #14531: URL: https://github.com/apache/kafka/pull/14531#issuecomment-1884148153 This PR is still valid and desired, has no merge conflicts and does build despite Jenkins protestations. I do not have the ability to add Reviewers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14546 - Support Partitioner fallback to default [kafka]
github-actions[bot] commented on PR #14531: URL: https://github.com/apache/kafka/pull/14531#issuecomment-1884136280 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]
DL1231 commented on PR #15067: URL: https://github.com/apache/kafka/pull/15067#issuecomment-1884122272 @AndrewJSchofield, I've updated the PR. Please take a look again. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]
DL1231 commented on code in PR #15067: URL: https://github.com/apache/kafka/pull/15067#discussion_r1446824176 ## core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala: ## @@ -113,6 +121,22 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu val properties = new Properties() config.entrySet().forEach(e => properties.setProperty(e.getKey, e.getValue)) ClientMetricsConfigs.validate(resource.name(), properties) + case GROUP => +validateGroupName(resource.name()) +val properties = new Properties() +val nullConsumerGroupConfigs = new mutable.ArrayBuffer[String]() +config.entrySet().forEach(e => { + if (e.getValue == null) { +nullConsumerGroupConfigs += e.getKey + } else { +properties.setProperty(e.getKey, e.getValue) + } +}) +if (nullConsumerGroupConfigs.nonEmpty) { + throw new InvalidConfigurationException("Null value not supported for consumer group configs: " + Review Comment: Done ## core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala: ## @@ -114,6 +114,18 @@ class DynamicConfigPublisher( s"${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + s"in $deltaName", t) }) +case GROUP => + // Apply changes to a group's dynamic configuration. + dynamicConfigHandlers.get(ConfigType.GROUP).foreach(consumerGroupConfigHandler => Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]
DL1231 commented on code in PR #15067: URL: https://github.com/apache/kafka/pull/15067#discussion_r1446824091 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -353,7 +358,8 @@ class BrokerServer( dynamicConfigHandlers = Map[String, ConfigHandler]( ConfigType.TOPIC -> new TopicConfigHandler(replicaManager, config, quotaManagers, None), ConfigType.BROKER -> new BrokerConfigHandler(config, quotaManagers), -ConfigType.CLIENT_METRICS -> new ClientMetricsConfigHandler(clientMetricsManager)) +ConfigType.CLIENT_METRICS -> new ClientMetricsConfigHandler(clientMetricsManager), +ConfigType.GROUP -> new ConsumerGroupConfigHandler(consumerGroupConfigManager)) Review Comment: Done ## core/src/main/scala/kafka/server/ConfigHandler.scala: ## @@ -264,3 +265,12 @@ class ClientMetricsConfigHandler(private val clientMetricsManager: ClientMetrics clientMetricsManager.updateSubscription(subscriptionGroupId, properties) } } + +/** + * The GroupConfigHandler will process individual group config changes in ZK. + */ +class ConsumerGroupConfigHandler(private val consumerGroupConfigManager: ConsumerGroupConfigManager) extends ConfigHandler with Logging { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14511: extend AlterIncrementalConfigs API to support group config [kafka]
DL1231 commented on code in PR #15067: URL: https://github.com/apache/kafka/pull/15067#discussion_r1446823998 ## clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java: ## @@ -223,6 +223,7 @@ public enum ConfigSource { DYNAMIC_BROKER_CONFIG, // dynamic broker config that is configured for a specific broker DYNAMIC_DEFAULT_BROKER_CONFIG, // dynamic broker config that is configured as default for all brokers in the cluster DYNAMIC_CLIENT_METRICS_CONFIG, // dynamic client metrics subscription config that is configured for all clients +DYNAMIC_CONSUMER_GROUP_CONFIG, // dynamic consumer group config that is configured for a specific consumer group Review Comment: Done ## clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java: ## @@ -120,7 +121,8 @@ public enum ConfigSource { STATIC_BROKER_CONFIG((byte) 4, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG), DEFAULT_CONFIG((byte) 5, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DEFAULT_CONFIG), DYNAMIC_BROKER_LOGGER_CONFIG((byte) 6, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG), -CLIENT_METRICS_CONFIG((byte) 7, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG); +CLIENT_METRICS_CONFIG((byte) 7, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG), +CONSUMER_GROUP_CONFIG((byte) 8, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_CONSUMER_GROUP_CONFIG); Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
rreddy-22 commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1446822116 ## clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java: ## @@ -50,6 +50,10 @@ public ListGroupsRequest build(short version) { throw new UnsupportedVersionException("The broker only supports ListGroups " + "v" + version + ", but we need v4 or newer to request groups by states."); } +if (!data.typesFilter().isEmpty() && version < 5) { +throw new UnsupportedVersionException("The broker only supports ListGroups " + +"v" + version + ", but we need v5 or newer to request groups by type."); +} Review Comment: Oh yes! Thanks for noticing this, I missed this test! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Enable kraft test in kafka.api and kafka.network [kafka]
dengziming commented on code in PR #14595: URL: https://github.com/apache/kafka/pull/14595#discussion_r1446812841 ## core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala: ## @@ -35,31 +37,37 @@ class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwa overridingProps.put(KafkaConfig.NumPartitionsProp, numPartitions.toString) overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, replicationFactor.toString) + def generateConfigs = (0 until numServers) map { node => - TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = false, rack = Some((node / 2).toString)) + TestUtils.createBrokerConfig(node, zkConnectOrNull, enableControlledShutdown = false, rack = Some((node / 2).toString)) } map (KafkaConfig.fromProps(_, overridingProps)) private val topic = "topic" - @Test - def testAutoCreateTopic(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk")) // TODO Partition leader is not evenly distributed in kraft mode, see KAFKA-15354 Review Comment: I tried to fix it but we are using a different way to assign leader in kraft mode, the ideas behind the two algorithms are similar but the implementations are different. And the new one, `StripedReplicaPlacer`, is clearer, but it only ensure leader are distributed evenly across racks, and don't ensure leaders are evenly distributed across nodes. I wonder whether it is worth evolving it to be consistent with the older one. cc @cmccabe -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
rreddy-22 commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1446780228 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -453,19 +454,31 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE /** * Get the Group List. * - * @param statesFilter The states of the groups we want to list. - * If empty all groups are returned with their state. - * @param committedOffset A specified committed offset corresponding to this shard + * @param statesFilter The states of the groups we want to list. + * If empty, all groups are returned with their state. + * @param typesFilter The types of the groups we want to list. + * If empty, all groups are returned with their type. + * @param committedOffset A specified committed offset corresponding to this shard. * * @return A list containing the ListGroupsResponseData.ListedGroup */ +public List listGroups( +List statesFilter, +List typesFilter, +long committedOffset +) { +Predicate combinedFilter = group -> { +boolean stateCheck = statesFilter.isEmpty() || statesFilter.contains(group.stateAsString(committedOffset)); +boolean typeCheck = typesFilter.isEmpty() || typesFilter.contains(group.type().toString()); +return stateCheck && typeCheck; +}; -public List listGroups(List statesFilter, long committedOffset) { -Stream groupStream = groups.values(committedOffset).stream(); -if (!statesFilter.isEmpty()) { -groupStream = groupStream.filter(group -> statesFilter.contains(group.stateAsString(committedOffset))); -} -return groupStream.map(group -> group.asListedGroup(committedOffset)).collect(Collectors.toList()); +Stream groupStream = groups.values(committedOffset).parallelStream(); Review Comment: thanks for the comment! okay I can make the change :) My understanding was that we want to scale to a large number of groups so I was trying to optimize wherever I could! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1446778536 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -195,6 +196,11 @@ public OffsetMetadataManager build() { */ private final TimelineHashMap pendingTransactionalOffsets; +/** + * The open transactions (producer ids) keyed by group. + */ +private final TimelineHashMap> openTransactionsByGroup; Review Comment: Also these are timeline data structures but are there cases when we roll it back? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1446774595 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -668,6 +697,8 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchOffsets( OffsetFetchRequestData.OffsetFetchRequestGroup request, long lastCommittedOffset ) throws ApiException { +final boolean requireStable = lastCommittedOffset == Long.MAX_VALUE; Review Comment: is it the case that when we query stable offsets we always have the long max value for lastCommittedOffset? Or in other words, we can't query a specific offset when we require stable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1446773736 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -195,6 +196,11 @@ public OffsetMetadataManager build() { */ private final TimelineHashMap pendingTransactionalOffsets; +/** + * The open transactions (producer ids) keyed by group. + */ +private final TimelineHashMap> openTransactionsByGroup; Review Comment: just for my understanding though -- for every producer ID for a group we return, we will have an offset in pendingTransactionalOffsets, and every producer ID in pendingTransactionalOffsets will have its group in openTransactionsByGroup. I can't imagine we would have a case where something is in one map but not the other. (I can imagine that the group is not there at all or the topic partition is not there though) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
rreddy-22 commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1446773716 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -633,8 +633,8 @@ class KafkaApisTest extends Logging { val requestData = DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition) Review Comment: yep sorry about that, when I rebased and transferred the files I might've messed up -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
rreddy-22 commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1446771432 ## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala: ## @@ -327,23 +327,26 @@ class GroupCoordinatorAdapterTest { @Test def testListGroups(): Unit = { -testListGroups(null, Set.empty) -testListGroups(List(), Set.empty) -testListGroups(List("Stable"), Set("Stable")) +testListGroups(null, null, Set.empty, Set.empty) +testListGroups(List(), List(), Set.empty, Set.empty) +testListGroups(List("Stable, Empty"), List(), Set("Stable, Empty"), Set.empty) Review Comment: Yep thanks for mentioning this, let me add it in, at the time I didn't think it was necessary but I can add it now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
rreddy-22 commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1446770934 ## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala: ## @@ -327,23 +327,26 @@ class GroupCoordinatorAdapterTest { @Test def testListGroups(): Unit = { -testListGroups(null, Set.empty) -testListGroups(List(), Set.empty) -testListGroups(List("Stable"), Set("Stable")) +testListGroups(null, null, Set.empty, Set.empty) +testListGroups(List(), List(), Set.empty, Set.empty) +testListGroups(List("Stable, Empty"), List(), Set("Stable, Empty"), Set.empty) Review Comment: Yep thanks for bringing it up, we could add it, I considered it but it didn't seem to add much value to the test so I decided against it at the time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1446770811 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -656,6 +663,28 @@ public int deleteAllOffsets( return numDeletedOffsets.get(); } +/** + * @return true iif there is at least one pending transactional offsets for the given Review Comment: nit: iff, offset (unless iif is a different acronym and is not if and only if) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
rreddy-22 commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1446770934 ## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala: ## @@ -327,23 +327,26 @@ class GroupCoordinatorAdapterTest { @Test def testListGroups(): Unit = { -testListGroups(null, Set.empty) -testListGroups(List(), Set.empty) -testListGroups(List("Stable"), Set("Stable")) +testListGroups(null, null, Set.empty, Set.empty) +testListGroups(List(), List(), Set.empty, Set.empty) +testListGroups(List("Stable, Empty"), List(), Set("Stable, Empty"), Set.empty) Review Comment: Yep thanks for bringing it up, we could add it, I considered it but it didn't seem to add much value to the test so I decided against it at the time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1446770811 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -656,6 +663,28 @@ public int deleteAllOffsets( return numDeletedOffsets.get(); } +/** + * @return true iif there is at least one pending transactional offsets for the given Review Comment: nit: offset -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
rreddy-22 commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1446767928 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -1105,16 +1105,17 @@ private[group] class GroupCoordinator( } } - def handleListGroups(states: Set[String]): (Errors, List[GroupOverview]) = { + def handleListGroups(states: Set[String], groupTypes: Set[String]): (Errors, List[GroupOverview]) = { if (!isActive.get) { (Errors.COORDINATOR_NOT_AVAILABLE, List[GroupOverview]()) } else { val errorCode = if (groupManager.isLoading) Errors.COORDINATOR_LOAD_IN_PROGRESS else Errors.NONE - // if states is empty, return all groups - val groups = if (states.isEmpty) -groupManager.currentGroups - else -groupManager.currentGroups.filter(g => states.contains(g.summary.state)) + // Filter groups based on states and groupTypes. If either is empty, it won't filter on that criterion. + // If groupType is mentioned then no group is returned since the notion of groupTypes doesn't exist in the + // old group coordinator. + val groups = groupManager.currentGroups.filter { g => +(states.isEmpty || states.contains(g.summary.state)) && groupTypes.isEmpty + } Review Comment: Thanks that makes sense, I'll make the changes, was waiting on your comments on my design doc to make the final call -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
rreddy-22 commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1446766566 ## clients/src/main/java/org/apache/kafka/common/ConsumerGroupType.java: ## @@ -0,0 +1,50 @@ +/* Review Comment: Yes it is, we can consider it part of the admin client PR but there was a dependency in this one too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer
[ https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804922#comment-17804922 ] Kirk True commented on KAFKA-15475: --- [~lianetm] would you kindly point me at the code in the two {{RequestManager}} implementations that have solved this? > Timeout request might retry forever even if the user API times out in > PrototypeAsyncConsumer > > > Key: KAFKA-15475 > URL: https://issues.apache.org/jira/browse/KAFKA-15475 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor, kip-848-preview > Fix For: 3.8.0 > > > If the request timeout in the background thread, it will be completed with > TimeoutException, which is Retriable. In the TopicMetadataRequestManager and > possibly other managers, the request might continue to be retried forever. > > There are two ways to fix this > # Pass a timer to the manager to remove the inflight requests when it is > expired. > # Pass the future to the application layer and continue to retry. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer
[ https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804921#comment-17804921 ] Kirk True commented on KAFKA-15475: --- [~lianetm] / [~pnee] —I need to refresh my memory about what it means for an exception to be retriable. Does it mean that the operation is automatically retried at some layer of the client, or does it simply mean that it's a transient failure that the caller _could_ retry, if desired? > Timeout request might retry forever even if the user API times out in > PrototypeAsyncConsumer > > > Key: KAFKA-15475 > URL: https://issues.apache.org/jira/browse/KAFKA-15475 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor, kip-848-preview > Fix For: 3.8.0 > > > If the request timeout in the background thread, it will be completed with > TimeoutException, which is Retriable. In the TopicMetadataRequestManager and > possibly other managers, the request might continue to be retried forever. > > There are two ways to fix this > # Pass a timer to the manager to remove the inflight requests when it is > expired. > # Pass the future to the application layer and continue to retry. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders [kafka]
jolshan commented on code in PR #15139: URL: https://github.com/apache/kafka/pull/15139#discussion_r1446756748 ## metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java: ## @@ -27,21 +27,24 @@ public final class LocalReplicaChanges { private final Set deletes; -private final Map leaders; +private final Map electedLeaders; +private final Map updatedLeaders; Review Comment: Sorry for confusion. I've pushed the latest commit with all the renames. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15181: Improvements for TopicBaseRemoteLogMetadataManager [kafka]
junrao commented on code in PR #14127: URL: https://github.com/apache/kafka/pull/14127#discussion_r1446751658 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java: ## @@ -64,302 +63,403 @@ class ConsumerTask implements Runnable, Closeable { private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); -private static final long POLL_INTERVAL_MS = 100L; - private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); -private final KafkaConsumer consumer; -private final String metadataTopicName; +private final Consumer consumer; private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; private final RemoteLogMetadataTopicPartitioner topicPartitioner; +// The timeout for the consumer to poll records from the remote log metadata topic. +private final long pollTimeoutMs; private final Time time; -// It indicates whether the closing process has been started or not. If it is set as true, -// consumer will stop consuming messages, and it will not allow partition assignments to be updated. -private volatile boolean closing = false; - -// It indicates whether the consumer needs to assign the partitions or not. This is set when it is -// determined that the consumer needs to be assigned with the updated partitions. -private volatile boolean assignPartitions = false; +// It indicates whether the ConsumerTask is closed or not. +private volatile boolean isClosed = false; +// It indicates whether the user topic partition assignment to the consumer has changed or not. If the assignment +// has changed, the consumer will eventually start tracking the newly assigned partitions and stop tracking the +// ones it is no longer assigned to. +// The initial value is set to true to wait for partition assignment on the first execution; otherwise thread will +// be busy without actually doing anything +private volatile boolean hasAssignmentChanged = true; // It represents a lock for any operations related to the assignedTopicPartitions. private final Object assignPartitionsLock = new Object(); // Remote log metadata topic partitions that consumer is assigned to. -private volatile Set assignedMetaPartitions = Collections.emptySet(); +private volatile Set assignedMetadataPartitions = Collections.emptySet(); // User topic partitions that this broker is a leader/follower for. -private Set assignedTopicPartitions = Collections.emptySet(); +private volatile Map assignedUserTopicIdPartitions = Collections.emptyMap(); +private volatile Set processedAssignmentOfUserTopicIdPartitions = Collections.emptySet(); -// Map of remote log metadata topic partition to consumed offsets. Received consumer records -// may or may not have been processed based on the assigned topic partitions. -private final Map partitionToConsumedOffsets = new ConcurrentHashMap<>(); +private long uninitializedAt; +private boolean isAllUserTopicPartitionsInitialized; -// Map of remote log metadata topic partition to processed offsets that were synced in committedOffsetsFile. -private Map lastSyncedPartitionToConsumedOffsets = Collections.emptyMap(); +// Map of remote log metadata topic partition to consumed offsets. +private final Map readOffsetsByMetadataPartition = new ConcurrentHashMap<>(); +private final Map readOffsetsByUserTopicPartition = new HashMap<>(); -private final long committedOffsetSyncIntervalMs; -private CommittedOffsetsFile committedOffsetsFile; Review Comment: @abhijeetk88 @satishd : Sorry to chime in late on this. It seems that we removed the usage of `CommittedOffsetsFile`. 1. Does the consumer always start from the beginning offset after restart? 2. Should we remove the CommittedOffsetsFile class? 3. This actually changes the on-disk layout. Was this discussed in a KIP? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16094) BrokerRegistrationRequest.logDirs field must be ignorable
[ https://issues.apache.org/jira/browse/KAFKA-16094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-16094. -- Fix Version/s: 3.7.0 Resolution: Fixed > BrokerRegistrationRequest.logDirs field must be ignorable > - > > Key: KAFKA-16094 > URL: https://issues.apache.org/jira/browse/KAFKA-16094 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Blocker > Fix For: 3.7.0 > > > 3.7 brokers must be able to register with 3.6 and earlier controllers. So > this means that the logDirs field must be ignorable (aka, not sent) if the > highest BrokerRegistrationRequest version we can negotiate is older than v2. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders [kafka]
jolshan commented on code in PR #15139: URL: https://github.com/apache/kafka/pull/15139#discussion_r1446743080 ## metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java: ## @@ -27,21 +27,24 @@ public final class LocalReplicaChanges { private final Set deletes; -private final Map leaders; +private final Map electedLeaders; +private final Map updatedLeaders; Review Comment: Looks like I missed some files though so I will fix that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders [kafka]
jolshan commented on code in PR #15139: URL: https://github.com/apache/kafka/pull/15139#discussion_r1446742642 ## metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java: ## @@ -27,21 +27,24 @@ public final class LocalReplicaChanges { private final Set deletes; -private final Map leaders; +private final Map electedLeaders; +private final Map updatedLeaders; Review Comment: I've updated the PR to use the names you suggested earlier today :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders [kafka]
artemlivshits commented on code in PR #15139: URL: https://github.com/apache/kafka/pull/15139#discussion_r1446724899 ## metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java: ## @@ -27,21 +27,24 @@ public final class LocalReplicaChanges { private final Set deletes; -private final Map leaders; +private final Map electedLeaders; +private final Map updatedLeaders; Review Comment: We can add comments to the fields as well. E.g. the fact that `electedLeaders` is a strict subset of `updatedLeaders` (or whatever name we end up choosing for it) is something that I think is instrumental in understanding why we have 2 sets of leaders. > I really struggled with the name "leaders" That's why I said comments ... Btw, I'm not sure that updatedLeaders provides better hint on what's going on here. Only electedLeaders has really the leaders that are updated (newly elected), the updatedLeaders contains all partitions that had changes somewhere (followers etc.) that this broker is a leader for. Same thing for the followers -- all changes that this broker is a follower for, not necessarily related to this broker per se. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1446722719 ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + private def getPartitionMetadataForDescribeTopicResponse( +image: MetadataImage, +topicName: String, +listenerName: ListenerName + ): Option[List[DescribeTopicPartitionsResponsePartition]] = { +Option(image.topics().getTopic(topicName)) match { + case None => None + case Some(topic) => { +val partitions = Some(topic.partitions().entrySet().asScala.map { entry => + val partitionId = entry.getKey + val partition = entry.getValue + val filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas, +listenerName, false) + val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, +false) + val offlineReplicas = getOfflineReplicas(image, partition, listenerName) + val maybeLeader = getAliveEndpoint(image, partition.leader, listenerName) + maybeLeader match { +case None => + val error = if (!image.cluster().brokers.containsKey(partition.leader)) { Review Comment: Good question, actually I am not super sure about these errors. They may have a history as they are included in the getPartitionMetadata method. @mumrah Do you have an idea? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
artemlivshits commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1446639923 ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + private def getPartitionMetadataForDescribeTopicResponse( +image: MetadataImage, +topicName: String, +listenerName: ListenerName + ): Option[List[DescribeTopicPartitionsResponsePartition]] = { +Option(image.topics().getTopic(topicName)) match { + case None => None + case Some(topic) => { +val partitions = Some(topic.partitions().entrySet().asScala.map { entry => + val partitionId = entry.getKey + val partition = entry.getValue + val filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas, +listenerName, false) + val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, +false) + val offlineReplicas = getOfflineReplicas(image, partition, listenerName) + val maybeLeader = getAliveEndpoint(image, partition.leader, listenerName) + maybeLeader match { +case None => + val error = if (!image.cluster().brokers.containsKey(partition.leader)) { +debug(s"Error while fetching metadata for $topicName-$partitionId: leader not available") +Errors.LEADER_NOT_AVAILABLE + } else { +debug(s"Error while fetching metadata for $topicName-$partitionId: listener $listenerName " + + s"not found on leader ${partition.leader}") +Errors.LISTENER_NOT_FOUND + } + new DescribeTopicPartitionsResponsePartition() +.setErrorCode(error.code) +.setPartitionIndex(partitionId) +.setLeaderId(MetadataResponse.NO_LEADER_ID) +.setLeaderEpoch(partition.leaderEpoch) +.setReplicaNodes(filteredReplicas) +.setIsrNodes(filteredIsr) +.setOfflineReplicas(offlineReplicas) +case Some(leader) => + val error = if (filteredReplicas.size < partition.replicas.length) { +debug(s"Error while fetching metadata for $topicName-$partitionId: replica information not available for " + + s"following brokers ${partition.replicas.filterNot(filteredReplicas.contains).mkString(",")}") +Errors.REPLICA_NOT_AVAILABLE Review Comment: Not quite sure why this is an error. We should just describe the state -- assigned replicas, active replicas, replicas in ISR, replicas in ELR, etc. ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1409,6 +1426,77 @@ class KafkaApis(val requestChannel: RequestChannel, )) } + def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): Unit = { +metadataCache match { + case _: ZkMetadataCache => +throw new InvalidRequestException("ZK cluster does not handle DescribeTopicPartitions request") + case _ => +} +val KRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache] + +val describeTopicPartitionsRequest = request.body[DescribeTopicPartitionsRequest].data() +var topics = scala.collection.mutable.Set[String]() +describeTopicPartitionsRequest.topics().forEach(topic => topics.add(topic.name())) + +val cursor = describeTopicPartitionsRequest.cursor() +val fetchAllTopics = topics.isEmpty +if (fetchAllTopics) { + metadataCache.getAllTopics().foreach(topic => topics.add(topic)) Review Comment: Looks like we copy topics multiple times. I think we can (a) filter out topic that are below cursor before copying, (b) use sorted set so that we can build the desired data structure once. ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + private def getPartitionMetadataForDescribeTopicResponse( +image: MetadataImage, +topicName: String, +listenerName: ListenerName + ): Option[List[DescribeTopicPartitionsResponsePartition]] = { +Option(image.topics().getTopic(topicName)) match { + case None => None + case Some(topic) => { +val partitions = Some(topic.partitions().entrySet().asScala.map { entry => + val partitionId = entry.getKey + val partition = entry.getValue + val filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas, +listenerName, false) + val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, +false) + val offlineReplicas = getOfflineReplicas(image, partition, listenerName) + val maybeLeader = g
Re: [PR] KAFKA-16094: BrokerRegistrationRequest.logDirs field must be ignorable [kafka]
cmccabe merged PR #15153: URL: https://github.com/apache/kafka/pull/15153 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1446688518 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: Yes -- that part has not changed. We "wrap" any callback from the transaction coordinator to the request handler thread. Right now though, wrap only schedules to the request thread if we are already on a request thread. Otherwise we execute directly. If we start verification from a non-request handler thread, maybe this already works as you intend. Alternatively, I could pass in a parameter to optionally wrap the callback (send it to the request thread) or not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]
OmniaGM commented on code in PR #15158: URL: https://github.com/apache/kafka/pull/15158#discussion_r1446673095 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.transaction; + +public class TransactionLogConfig { Review Comment: I feel the same but also not sure where they should move. They don't fit in server module either. I don't see any Jiras to move transaction coordinator out of server but maybe I can start a new module for transaction coordinator similar to the group one. Would this make more sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]
dajac commented on code in PR #15158: URL: https://github.com/apache/kafka/pull/15158#discussion_r1446669238 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.transaction; + +public class TransactionLogConfig { Review Comment: Hey @OmniaGM. It is a bit weird to have those transaction classes in the group-coordinator module. It does not seem to be the correct place. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1446663978 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: btw, when the validation completed, we schedule an event on the coordinator’s thread pool so we don’t really need to execute it on the request handler thread. with your refactor, do you plan to keep the re-scheduling within the replica manager? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1446657520 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: Thanks! I will try to run through it today too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1446654869 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: ah, right. i forgot about that one. let me check this tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15946) AsyncKafkaConsumer should retry commits on the application thread instead of auto-retry
[ https://issues.apache.org/jira/browse/KAFKA-15946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15946. Fix Version/s: 3.7.0 (was: 3.8.0) Assignee: Lianet Magrans (was: Kirk True) Resolution: Fixed 3.7 includes fix to make sure that only sync commits are retried, with a timeout, and async commits are not (just passing failure to the callback). There is also a follow ticket https://issues.apache.org/jira/browse/KAFKA-16033 > AsyncKafkaConsumer should retry commits on the application thread instead of > auto-retry > --- > > Key: KAFKA-15946 > URL: https://issues.apache.org/jira/browse/KAFKA-15946 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.7.0 > > > The original design was that the network thread always completes the future > whether succeeds or fails. However, in the current patch, I mis-added > auto-retry functionality because commitSync wasn't retrying. What we should > be doing is, the commit sync API should catch the RetriableExceptions and > resend another commit until timesout. > > {code:java} > if (error.exception() instanceof RetriableException) { > log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, > error.message()); > handleRetriableError(error, response); > retry(responseTime); <--- We probably shouldn't do this. > return; > } {code} > > {code:java} > @Override > public void commitSync(Map offsets, > Duration timeout) { > acquireAndEnsureOpen(); > long commitStart = time.nanoseconds(); > try > { CompletableFuture commitFuture = commit(offsets, true); <-- we > probably should retry here ConsumerUtils.getResult(commitFuture, > time.timer(timeout)); } > finally > { wakeupTrigger.clearTask(); > kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart); > release(); } > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15967) Fix revocation in reconcilation logic
[ https://issues.apache.org/jira/browse/KAFKA-15967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15967. Fix Version/s: 3.7.0 (was: 3.8.0) Assignee: Lianet Magrans Resolution: Fixed > Fix revocation in reconcilation logic > - > > Key: KAFKA-15967 > URL: https://issues.apache.org/jira/browse/KAFKA-15967 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support > Fix For: 3.7.0 > > > Looks like there is a problem in the reconciliation logic. > We are getting 6 partitions from an HB, we add them to > {{{}assignmentReadyToReconcile{}}}. Next HB we get only 4 partitions (2 are > revoked), we also add them to {{{}assignmentReadyToReconcile{}}}, but the 2 > partitions that were supposed to be removed from the assignment are never > removed because they are still in {{{}assignmentReadyToReconcile{}}}. > This was discovered during integration testing of > [https://github.com/apache/kafka/pull/14878] - part of the test > testRemoteAssignorRange was disabled and should be re-enabled once this is > fixed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15553) Review consumer positions update
[ https://issues.apache.org/jira/browse/KAFKA-15553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-15553: --- Summary: Review consumer positions update (was: Review consumer positions update using committed offset) > Review consumer positions update > > > Key: KAFKA-15553 > URL: https://issues.apache.org/jira/browse/KAFKA-15553 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > From the existing comment: If there are any partitions which do not have a > valid position and are not awaiting reset, then we need to fetch committed > offsets. > In the async consumer: I wonder if it would make sense to refresh the > position on the event loop continuously. > The logic to refresh offsets in the poll loop is quite fragile and works > largely by side-effects of the code that it calls. For example, the behaviour > of the "cached" value is really not that straightforward and simply reading > the cached value is not sufficient to start consuming data in all cases. > This area needs a bit of a refactor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15553) Review consumer positions update using committed offset
[ https://issues.apache.org/jira/browse/KAFKA-15553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-15553: --- Summary: Review consumer positions update using committed offset (was: Review committed offset refresh logic) > Review consumer positions update using committed offset > --- > > Key: KAFKA-15553 > URL: https://issues.apache.org/jira/browse/KAFKA-15553 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > From the existing comment: If there are any partitions which do not have a > valid position and are not awaiting reset, then we need to fetch committed > offsets. > In the async consumer: I wonder if it would make sense to refresh the > position on the event loop continuously. > The logic to refresh offsets in the poll loop is quite fragile and works > largely by side-effects of the code that it calls. For example, the behaviour > of the "cached" value is really not that straightforward and simply reading > the cached value is not sufficient to start consuming data in all cases. > This area needs a bit of a refactor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16004) Review new consumer inflight offset commit logic
[ https://issues.apache.org/jira/browse/KAFKA-16004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16004: --- Description: New consumer logic for committing offsets handles inflight requests, to validate that no commit requests are sent if a previous one hasn't received a response. Review how that logic is currently applied to both, sync and async commits and validate against the legacy coordinator, who seems to apply it only for async commits. Review considering behaviour for auto-commits too. (was: New consumer logic for committing offsets handles inflight requests, to validate that no commit requests are sent if a previous one hasn't received a response. Review how that logic is currently applied to both, sync and async commits and validate against the legacy coordinator, who seems to apply it only for async commits.) > Review new consumer inflight offset commit logic > > > Key: KAFKA-16004 > URL: https://issues.apache.org/jira/browse/KAFKA-16004 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > Fix For: 3.8.0 > > > New consumer logic for committing offsets handles inflight requests, to > validate that no commit requests are sent if a previous one hasn't received a > response. Review how that logic is currently applied to both, sync and async > commits and validate against the legacy coordinator, who seems to apply it > only for async commits. Review considering behaviour for auto-commits too. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Metadata schema checker [kafka]
mannoopj commented on code in PR #14389: URL: https://github.com/apache/kafka/pull/14389#discussion_r1446640522 ## tools/src/main/java/org/apache/kafka/tools/SchemaChecker/MetadataSchemaChecker.java: ## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools.SchemaChecker; + + + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.eclipse.jgit.api.CheckoutCommand; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.InitCommand; +import org.eclipse.jgit.api.errors.GitAPIException; +import org.eclipse.jgit.internal.storage.file.FileRepository; +import org.eclipse.jgit.lib.*; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.revwalk.RevTree; +import org.eclipse.jgit.revwalk.RevWalk; +import org.eclipse.jgit.storage.file.FileRepositoryBuilder; +import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider; +import org.eclipse.jgit.treewalk.TreeWalk; +import org.eclipse.jgit.treewalk.filter.PathFilter; + + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; + +public class MetadataSchemaChecker { + +static int latestTag = -1; +static int latestTagVersion = -1; +static int oldLatestVersion = -1; +static int oldFirstVersion = -1; +static int newLatestVersion = -1; +static int newFirstVersion = -1; + +static String[] filesCheckMetadata = {"AccessControlEntryRecord.json", "BrokerRegistrationChangeRecord.json", "ClientQuotaRecord.json", +"ConfigRecord.json", "DelegationTokenRecord.json", "FeatureLevelRecord.json", "FenceBrokerRecord.json", "NoOpRecord.json", +"PartitionChangeRecord.json", "PartitionRecord.json", "ProducerIdsRecord.json", "RegisterBrokerRecord.json", +"RemoveAccessControlEntryRecord.json", "RemoveTopicRecord.json", "RemoveUserScramCredentialRecord.json", "TopicRecord.json", +"UnfenceBrokerRecord.json", "UnregisterBrokerRecord.json", "UserScramCredentialRecord.json", "ZkMigrationRecord.json"}; +public static void main(String[] args) throws Exception { + +try { +List localContent = new ArrayList<>(); +for(String jsonSchema: filesCheckMetadata) { +final String dir = System.getProperty("user.dir"); +String path = dir + "/metadata/src/main/resources/common/metadata/" + jsonSchema; +BufferedReader reader = new BufferedReader(new FileReader(path)); +for (int i = 0; i < 15; i++) { Review Comment: Addressed in a similar comment below -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15872) Investigate autocommit retry logic
[ https://issues.apache.org/jira/browse/KAFKA-15872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15872. Fix Version/s: (was: 3.8.0) Resolution: Duplicate > Investigate autocommit retry logic > -- > > Key: KAFKA-15872 > URL: https://issues.apache.org/jira/browse/KAFKA-15872 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > > This is purely an investigation ticket. > Currently, we send an autocommit only if there isn't an inflight one; > however, this logic might not be correct because I think we should: > # expires the request if it is not completed in time > # always send an autocommit on the clock -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
philipnee commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1446637026 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -175,16 +182,40 @@ public static boolean validateRequiredResourceLabels(Map metadat } public static CompressionType preferredCompressionType(List acceptedCompressionTypes) { -// TODO: Support compression in client telemetry. +if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { +// Broker is providing the compression types in order of preference. Grab the +// first one. +return acceptedCompressionTypes.get(0); Review Comment: thanks for the clarification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15455) Add support for OffsetCommit version 9 in consumer
[ https://issues.apache.org/jira/browse/KAFKA-15455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15455. Fix Version/s: 3.7.0 (was: 3.8.0) Resolution: Fixed > Add support for OffsetCommit version 9 in consumer > -- > > Key: KAFKA-15455 > URL: https://issues.apache.org/jira/browse/KAFKA-15455 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: David Jacot >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > Fix For: 3.7.0 > > > We need to handle the new error codes as specified here: > [https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetCommitResponse.json#L46|https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetCommitRequest.json#L35] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1446625255 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -175,16 +182,40 @@ public static boolean validateRequiredResourceLabels(Map metadat } public static CompressionType preferredCompressionType(List acceptedCompressionTypes) { -// TODO: Support compression in client telemetry. +if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { +// Broker is providing the compression types in order of preference. Grab the +// first one. +return acceptedCompressionTypes.get(0); Review Comment: Thanks for looking at PR Philip. Never for Java client as we support all compression types in java client. Below is what KIP says ` The broker will return a prioritized list of supported compression types in the GetTelemetrySubscriptionsResponse.AcceptedCompressionTypes array, the client is free to pick any supported compression type but should pick the first mutually supported type in the returned list. If the AcceptedCompressionTypes array is empty the client must send metrics uncompressed. The default compression types list as returned from the broker should be: ZStd, LZ4, GZip, Snappy.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
philipnee commented on code in PR #15148: URL: https://github.com/apache/kafka/pull/15148#discussion_r1446620286 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java: ## @@ -175,16 +182,40 @@ public static boolean validateRequiredResourceLabels(Map metadat } public static CompressionType preferredCompressionType(List acceptedCompressionTypes) { -// TODO: Support compression in client telemetry. +if (acceptedCompressionTypes != null && !acceptedCompressionTypes.isEmpty()) { +// Broker is providing the compression types in order of preference. Grab the +// first one. +return acceptedCompressionTypes.get(0); Review Comment: out of curiousity - when would we not use the first one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15807: Added support for compression of metrics (KIP-714) [kafka]
apoorvmittal10 commented on PR #15148: URL: https://github.com/apache/kafka/pull/15148#issuecomment-1883799949 Build passed on all environments with unrelated tests failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition
[ https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804880#comment-17804880 ] Proven Provenzano commented on KAFKA-16082: --- [~gnarula] added an improvement for the handling of case 3 above:[ https://github.com/apache/kafka/pull/15136|https://github.com/apache/kafka/pull/15136] > JBOD: Possible dataloss when moving leader partition > > > Key: KAFKA-16082 > URL: https://issues.apache.org/jira/browse/KAFKA-16082 > Project: Kafka > Issue Type: Bug > Components: jbod >Affects Versions: 3.7.0 >Reporter: Proven Provenzano >Assignee: Gaurav Narula >Priority: Blocker > Fix For: 3.7.0 > > > There is a possible dataloss scenario > when using JBOD, > when moving the partition leader log from one directory to another on the > same broker, > when after the destination log has caught up to the source log and after the > broker has sent an update to the partition assignment > if the broker accepts and commits a new record for the partition and then the > broker restarts and the original partition leader log is lost > then the destination log would not contain the new record. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition
[ https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804879#comment-17804879 ] Proven Provenzano edited comment on KAFKA-16082 at 1/9/24 8:49 PM: --- For the case of 3: If I understand this correctly, the scenario is that the broker restarts and sees that `dir2` is supposed to own `tp0` from the metadata log replay, however it doesn't see the log in `dir2` because the failed future replica hasn't been renamed and so it will create a new replica for `tp0` in `dir2` and populate it with data from other replicas. Can we create a unit test to validate this? It may also be possible to reuse the current future replica so long as the broker at restart went through a stage where the leader of the partition was moved to a different broker. Now it can treat the partition as an out of sync replica and do the rename and catch up immediately. Note it cannot do the rename until after the partition leadership has been moved away from the broker in case the broker again restarts. was (Author: JIRAUSER298332): For the case of 3: If I understand this correctly, the scenario is that the broker restarts and sees that `dir2` is supposed to own `tp0` from the metadata log replay, however it doesn't see the log in `dir2` because the failed future replica hasn't been renamed and so it will create a new replica for `tp0` in `dir2` and populate it with data from other replicas. Can we create a unit test to validate this? It may also be possible to reuse the current future replica so long as the broker at restart went through a stage where the leader of the partition was moved to a different broker. Now it can treat the partition as an out of sync replica and do the rename and catch up immediately. Note it cannot do the rename until after the partition leadership has been moved away from the broker in case the broker again restarts. {quote} {quote} > JBOD: Possible dataloss when moving leader partition > > > Key: KAFKA-16082 > URL: https://issues.apache.org/jira/browse/KAFKA-16082 > Project: Kafka > Issue Type: Bug > Components: jbod >Affects Versions: 3.7.0 >Reporter: Proven Provenzano >Assignee: Gaurav Narula >Priority: Blocker > Fix For: 3.7.0 > > > There is a possible dataloss scenario > when using JBOD, > when moving the partition leader log from one directory to another on the > same broker, > when after the destination log has caught up to the source log and after the > broker has sent an update to the partition assignment > if the broker accepts and commits a new record for the partition and then the > broker restarts and the original partition leader log is lost > then the destination log would not contain the new record. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Add reviewers GitHub action [kafka]
mumrah commented on PR #15115: URL: https://github.com/apache/kafka/pull/15115#issuecomment-1883771689 > Although that way we won't be able to merge it via browser That's a non-starter IMO. I wonder if we could write our own bot for these kinds of automations. https://probot.github.io/ looks interesting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition
[ https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804879#comment-17804879 ] Proven Provenzano commented on KAFKA-16082: --- For the case of 3: If I understand this correctly, the scenario is that the broker restarts and sees that `dir2` is supposed to own `tp0` from the metadata log replay, however it doesn't see the log in `dir2` because the failed future replica hasn't been renamed and so it will create a new replica for `tp0` in `dir2` and populate it with data from other replicas. Can we create a unit test to validate this? It may also be possible to reuse the current future replica so long as the broker at restart went through a stage where the leader of the partition was moved to a different broker. Now it can treat the partition as an out of sync replica and do the rename and catch up immediately. Note it cannot do the rename until after the partition leadership has been moved away from the broker in case the broker again restarts. {quote} {quote} > JBOD: Possible dataloss when moving leader partition > > > Key: KAFKA-16082 > URL: https://issues.apache.org/jira/browse/KAFKA-16082 > Project: Kafka > Issue Type: Bug > Components: jbod >Affects Versions: 3.7.0 >Reporter: Proven Provenzano >Assignee: Gaurav Narula >Priority: Blocker > Fix For: 3.7.0 > > > There is a possible dataloss scenario > when using JBOD, > when moving the partition leader log from one directory to another on the > same broker, > when after the destination log has caught up to the source log and after the > broker has sent an update to the partition assignment > if the broker accepts and commits a new record for the partition and then the > broker restarts and the original partition leader log is lost > then the destination log would not contain the new record. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15853: Move AuthorizerUtils and its dependencies to server module [kafka]
OmniaGM opened a new pull request, #15167: URL: https://github.com/apache/kafka/pull/15167 Blocker for #15103 - Moving AuthorizerUtils and Session into server module ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Enable kraft test in kafka.api and kafka.network [kafka]
mimaison commented on code in PR #14595: URL: https://github.com/apache/kafka/pull/14595#discussion_r1446558104 ## core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala: ## @@ -35,31 +37,37 @@ class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwa overridingProps.put(KafkaConfig.NumPartitionsProp, numPartitions.toString) overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, replicationFactor.toString) + def generateConfigs = (0 until numServers) map { node => - TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = false, rack = Some((node / 2).toString)) + TestUtils.createBrokerConfig(node, zkConnectOrNull, enableControlledShutdown = false, rack = Some((node / 2).toString)) } map (KafkaConfig.fromProps(_, overridingProps)) private val topic = "topic" - @Test - def testAutoCreateTopic(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk")) // TODO Partition leader is not evenly distributed in kraft mode, see KAFKA-15354 Review Comment: Do we need to fix KAFKA-15354 first? Or can this ticket be resolved? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1446538966 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java: ## @@ -116,4 +120,21 @@ long appendEndTransactionMarker( int coordinatorEpoch, TransactionResult result ) throws KafkaException; + +/** + * Verify the transaction. + * + * @param tpThe partition to write records to. + * @param transactionalId The transactional id. + * @param producerIdThe producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. Review Comment: There is one case where I think we still throw an error even if the partition is already verified. When we look up the partition to see if it needs verification, we could throw an error if the partition isn't on the broker. Not a huge deal though and we probably don't need to include. But an alternate description could be something like it returns any error encountered or the verification guard if it needed verification and the sentinel if it did not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1446536290 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: Right right, I remember this now. I'm wondering though is that ok with the async verification callback. Is it the case we don't evaluate which one to use until we execute the callback. As an aside, did we decide that the callback is ok to execute on the request handler thread? I didn't go through and trace the threads being used yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16106) group size counters do not reflect the actual sizes when operations fail
Jeff Kim created KAFKA-16106: Summary: group size counters do not reflect the actual sizes when operations fail Key: KAFKA-16106 URL: https://issues.apache.org/jira/browse/KAFKA-16106 Project: Kafka Issue Type: Sub-task Reporter: Jeff Kim Assignee: Jeff Kim An expire-group-metadata operation generates tombstone records, updates the `groups` state and decrements group size counters, then performs a write to the log. If there is a __consumer_offsets partition reassignment, this operation fails. The `groups` state is reverted to an earlier snapshot but classic group size counters are not. This begins an inconsistency between the metrics and the actual groups size. This applies to all unsuccessful write operations that alter the `groups` state. The issue is exacerbated because the expire group metadata operation is retried possibly indefinitely. The solution to this is to make the counters also a timeline data structure (TimelineLong) so that in the event of a failed write operation we revert the counters as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16100) Consistent handling of timeouts and responses for new consumer ApplicationEvents
[ https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16100: -- Fix Version/s: 3.8.0 > Consistent handling of timeouts and responses for new consumer > ApplicationEvents > > > Key: KAFKA-16100 > URL: https://issues.apache.org/jira/browse/KAFKA-16100 > Project: Kafka > Issue Type: Sub-task > Components: clients >Reporter: Andrew Schofield >Priority: Major > Fix For: 3.8.0 > > > The handling of timeouts and responses for the various kinds of > ApplicationEvents in the new consumer is not consistent. A small amount of > refactoring would make the code more maintainable and give consistent > behaviour for the different requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16100) Consistent handling of timeouts and responses for new consumer ApplicationEvents
[ https://issues.apache.org/jira/browse/KAFKA-16100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16100: -- Labels: consumer-threading-refactor (was: ) > Consistent handling of timeouts and responses for new consumer > ApplicationEvents > > > Key: KAFKA-16100 > URL: https://issues.apache.org/jira/browse/KAFKA-16100 > Project: Kafka > Issue Type: Sub-task > Components: clients >Reporter: Andrew Schofield >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The handling of timeouts and responses for the various kinds of > ApplicationEvents in the new consumer is not consistent. A small amount of > refactoring would make the code more maintainable and give consistent > behaviour for the different requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16104) Enable additional PlaintextConsumerTest tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16104: -- Fix Version/s: 3.8.0 > Enable additional PlaintextConsumerTest tests for new consumer > -- > > Key: KAFKA-16104 > URL: https://issues.apache.org/jira/browse/KAFKA-16104 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Andrew Schofield >Priority: Minor > Fix For: 3.8.0 > > > It should be possible to enable: > * testAutoCommitOnClose > * testAutoCommitOnCloseAfterWakeup > * testExpandingTopicSubscriptions > * testShrinkingTopicSubscriptions > * testMultiConsumerSessionTimeoutOnClose (KAFKA-16011 has been fixed) > * testAutoCommitOnRebalance > * testPerPartitionLeadMetricsCleanUpWithSubscribe > * testPerPartitionLagMetricsCleanUpWithSubscribe > * testStaticConsumerDetectsNewPartitionCreatedAfterRestart -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16104) Enable additional PlaintextConsumerTest tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16104: -- Labels: consumer-threading-refactor (was: ) > Enable additional PlaintextConsumerTest tests for new consumer > -- > > Key: KAFKA-16104 > URL: https://issues.apache.org/jira/browse/KAFKA-16104 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Andrew Schofield >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > It should be possible to enable: > * testAutoCommitOnClose > * testAutoCommitOnCloseAfterWakeup > * testExpandingTopicSubscriptions > * testShrinkingTopicSubscriptions > * testMultiConsumerSessionTimeoutOnClose (KAFKA-16011 has been fixed) > * testAutoCommitOnRebalance > * testPerPartitionLeadMetricsCleanUpWithSubscribe > * testPerPartitionLagMetricsCleanUpWithSubscribe > * testStaticConsumerDetectsNewPartitionCreatedAfterRestart -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16083: Exclude throttle time when expiring inflight requests on a connection [kafka]
jolshan commented on PR #15130: URL: https://github.com/apache/kafka/pull/15130#issuecomment-1883672884 ^ Those are issues I see frequently and are likely unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15557) Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in assignFromUserNoId
[ https://issues.apache.org/jira/browse/KAFKA-15557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15557: -- Fix Version/s: 4.0.0 (was: 3.8.0) > Investigate FetcherTest's/FetchRequestManager's duplicate metadata update in > assignFromUserNoId > --- > > Key: KAFKA-15557 > URL: https://issues.apache.org/jira/browse/KAFKA-15557 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 4.0.0 > > > The unit tests {{FetcherTest}} and {{FetchRequestManagerTest}} have methods > named {{assignFromUser()}} and {{assignFromUserNoId()}} that appear to > perform duplicate metadata updates: > {code:java} > private void assignFromUser(Set partitions) { > subscriptions.assignFromUser(partitions); > client.updateMetadata(initialUpdateResponse); > // A dummy metadata update to ensure valid leader epoch. > metadata.updateWithCurrentRequestVersion( > RequestTestUtils.metadataUpdateWithIds( > "dummy", > 1, > Collections.emptyMap(), > singletonMap(topicName, 4), > tp -> validLeaderEpoch, topicIds > ), > false, > 0L > ); > } > {code} > {{client.updateMetadata()}} eventually calls > {{metadata.updateWithCurrentRequestVersion()}}. Determine why the test is > updating the cluster metadata twice with different values. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15250) DefaultBackgroundThread is running tight loop
[ https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804861#comment-17804861 ] Kirk True commented on KAFKA-15250: --- This is still an issue. If you enable detailed logging, it writes thousands of lines of logging within seconds. > DefaultBackgroundThread is running tight loop > - > > Key: KAFKA-15250 > URL: https://issues.apache.org/jira/browse/KAFKA-15250 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The DefaultBackgroundThread is running tight loops and wasting CPU cycles. I > think we need to reexamine the timeout pass to networkclientDelegate.poll. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-15250) DefaultBackgroundThread is running tight loop
[ https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reopened KAFKA-15250: --- Assignee: Kirk True (was: Philip Nee) > DefaultBackgroundThread is running tight loop > - > > Key: KAFKA-15250 > URL: https://issues.apache.org/jira/browse/KAFKA-15250 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > > The DefaultBackgroundThread is running tight loops and wasting CPU cycles. I > think we need to reexamine the timeout pass to networkclientDelegate.poll. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15250) DefaultBackgroundThread is running tight loop
[ https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15250: -- Fix Version/s: 3.8.0 > DefaultBackgroundThread is running tight loop > - > > Key: KAFKA-15250 > URL: https://issues.apache.org/jira/browse/KAFKA-15250 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The DefaultBackgroundThread is running tight loops and wasting CPU cycles. I > think we need to reexamine the timeout pass to networkclientDelegate.poll. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15941) Flaky test: shouldRestoreNullRecord() – org.apache.kafka.streams.integration.RestoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy resolved KAFKA-15941. Resolution: Cannot Reproduce > Flaky test: shouldRestoreNullRecord() – > org.apache.kafka.streams.integration.RestoreIntegrationTest > --- > > Key: KAFKA-15941 > URL: https://issues.apache.org/jira/browse/KAFKA-15941 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Apoorv Mittal >Priority: Major > Labels: flaky-test > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/24/tests/ > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 6. > Did not receive all [KeyValue(2, \x00\x00\x00)] records from topic output > (got []) ==> expected: but was: > Stacktraceorg.opentest4j.AssertionFailedError: Condition not met > within timeout 6. Did not receive all [KeyValue(2, \x00\x00\x00)] records > from topic output (got []) ==> expected: but was: at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) >at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:878) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:827) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:790) > at > org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRestoreNullRecord(RestoreIntegrationTest.java:244) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-15941) Flaky test: shouldRestoreNullRecord() – org.apache.kafka.streams.integration.RestoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy closed KAFKA-15941. -- Assignee: Lucas Brutschy > Flaky test: shouldRestoreNullRecord() – > org.apache.kafka.streams.integration.RestoreIntegrationTest > --- > > Key: KAFKA-15941 > URL: https://issues.apache.org/jira/browse/KAFKA-15941 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Apoorv Mittal >Assignee: Lucas Brutschy >Priority: Major > Labels: flaky-test > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/24/tests/ > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 6. > Did not receive all [KeyValue(2, \x00\x00\x00)] records from topic output > (got []) ==> expected: but was: > Stacktraceorg.opentest4j.AssertionFailedError: Condition not met > within timeout 6. Did not receive all [KeyValue(2, \x00\x00\x00)] records > from topic output (got []) ==> expected: but was: at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) >at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:878) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:827) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:790) > at > org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRestoreNullRecord(RestoreIntegrationTest.java:244) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15941) Flaky test: shouldRestoreNullRecord() – org.apache.kafka.streams.integration.RestoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804856#comment-17804856 ] Lucas Brutschy commented on KAFKA-15941: Test hasn't failed in the last month so I'm closing this https://ge.apache.org/scans/tests?search.names=Git%20branch&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe%2FBerlin&search.values=trunk&tests.container=org.apache.kafka.streams.integration.RestoreIntegrationTest&tests.sortField=FLAKY&tests.test=shouldRestoreNullRecord() > Flaky test: shouldRestoreNullRecord() – > org.apache.kafka.streams.integration.RestoreIntegrationTest > --- > > Key: KAFKA-15941 > URL: https://issues.apache.org/jira/browse/KAFKA-15941 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Apoorv Mittal >Priority: Major > Labels: flaky-test > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/24/tests/ > {code:java} > org.opentest4j.AssertionFailedError: Condition not met within timeout 6. > Did not receive all [KeyValue(2, \x00\x00\x00)] records from topic output > (got []) ==> expected: but was: > Stacktraceorg.opentest4j.AssertionFailedError: Condition not met > within timeout 6. Did not receive all [KeyValue(2, \x00\x00\x00)] records > from topic output (got []) ==> expected: but was: at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) >at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:878) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:827) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:790) > at > org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRestoreNullRecord(RestoreIntegrationTest.java:244) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16093: Fix spurious REST-related warnings on Connect startup [kafka]
C0urante commented on code in PR #15149: URL: https://github.com/apache/kafka/pull/15149#discussion_r1446506255 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java: ## @@ -226,20 +235,20 @@ protected final void initializeResources() { if (adminListeners == null) { log.info("Adding admin resources to main listener"); adminResourceConfig = resourceConfig; -Collection adminResources = adminResources(); -resources.addAll(adminResources); +Collection> adminResources = adminResources(); adminResources.forEach(adminResourceConfig::register); configureAdminResources(adminResourceConfig); } else if (adminListeners.size() > 0) { // TODO: we need to check if these listeners are same as 'listeners' // TODO: the following code assumes that they are different log.info("Adding admin resources to admin listener"); adminResourceConfig = new ResourceConfig(); +adminResourceConfig.register(requestTimeout.binder()); adminResourceConfig.register(new JacksonJsonProvider()); -Collection adminResources = adminResources(); -resources.addAll(adminResources); +Collection> adminResources = adminResources(); adminResources.forEach(adminResourceConfig::register); adminResourceConfig.register(ConnectExceptionMapper.class); + adminResourceConfig.property(ServerProperties.WADL_FEATURE_DISABLE, true); Review Comment: Good call, done. I've also cleaned up the configuration of the `adminResourceConfig` to hopefully prevent other kinds of duplication-related issues in the future; LMKWYT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
mumrah commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1446495993 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1409,6 +1426,77 @@ class KafkaApis(val requestChannel: RequestChannel, )) } + def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): Unit = { Review Comment: Can you move this code into a new class? KafkaApis is already much too large. ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1409,6 +1426,77 @@ class KafkaApis(val requestChannel: RequestChannel, )) } + def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): Unit = { +metadataCache match { + case _: ZkMetadataCache => +throw new InvalidRequestException("ZK cluster does not handle DescribeTopicPartitions request") + case _ => +} +val KRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache] + +val describeTopicPartitionsRequest = request.body[DescribeTopicPartitionsRequest].data() +var topics = scala.collection.mutable.Set[String]() +describeTopicPartitionsRequest.topics().forEach(topic => topics.add(topic.name())) + +val cursor = describeTopicPartitionsRequest.cursor() +val fetchAllTopics = topics.isEmpty +if (fetchAllTopics) { + metadataCache.getAllTopics().foreach(topic => topics.add(topic)) Review Comment: If we're paginating through all topics and have a cursor, we can avoid gather only the desired topics during this O(n) loop through all topics. That would let us avoid another O(n) operation below to filter the undesired topics -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16093: Fix spurious REST-related warnings on Connect startup [kafka]
C0urante commented on code in PR #15149: URL: https://github.com/apache/kafka/pull/15149#discussion_r1446490147 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java: ## @@ -159,7 +163,8 @@ public class ConnectorsResourceTest { public void setUp() throws NoSuchMethodException { when(serverConfig.topicTrackingEnabled()).thenReturn(true); when(serverConfig.topicTrackingResetEnabled()).thenReturn(true); -connectorsResource = new ConnectorsResource(herder, serverConfig, restClient); +RestRequestTimeout requestTimeout = () -> RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS; Review Comment: Good catch, thanks 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders [kafka]
jolshan commented on code in PR #15139: URL: https://github.com/apache/kafka/pull/15139#discussion_r1446488616 ## metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java: ## @@ -27,21 +27,24 @@ public final class LocalReplicaChanges { private final Set deletes; -private final Map leaders; +private final Map electedLeaders; +private final Map updatedLeaders; Review Comment: I included these comments btw: ``` * 1. partitions for which the broker is not a replica anymore * 2. partitions for which the broker is now a leader (leader epoch bump on the leader) * 3. partitions for which the isr or replicas change if the broker is a leader (partition epoch bump on the leader) * 4. partitions for which the broker is now a follower or follower with isr or replica updates (partition epoch bump on follower) ``` I really struggled with the name "leaders" when i was reading the code which is why i didn't want to leave it blank. But I will think on it again an update the comments to be even more clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14683 Migrate #testStartPaused to Mockito [kafka]
gharris1727 commented on PR #14663: URL: https://github.com/apache/kafka/pull/14663#issuecomment-1883616431 Hi @hgeraldino Thanks for taking on the migration! I understand the idea behind your refactor-then-deduplicate strategy, but I think the excessive duplication is making it difficult (at least for me) to review the change. What do you think about starting a new test class, and moving the migrated tests into that new class? This would allow you to use synonymous variable names, annotation mocks, and method names? At the end we can delete the original class and move the new class back to the original class name. This will separate the added and removed parts in the diff, where they are currently inline. But the mocking libraries are so substantially different that the inline parts of the diff are not very helpful anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer
[ https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804846#comment-17804846 ] Lianet Magrans commented on KAFKA-15475: Heads up, the TopicMetadataManager and CommitRequestManager already solved this, in a similar way. Still needed to be verified/fixed in other requests if applicable. > Timeout request might retry forever even if the user API times out in > PrototypeAsyncConsumer > > > Key: KAFKA-15475 > URL: https://issues.apache.org/jira/browse/KAFKA-15475 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor, kip-848-preview > Fix For: 3.8.0 > > > If the request timeout in the background thread, it will be completed with > TimeoutException, which is Retriable. In the TopicMetadataRequestManager and > possibly other managers, the request might continue to be retried forever. > > There are two ways to fix this > # Pass a timer to the manager to remove the inflight requests when it is > expired. > # Pass the future to the application layer and continue to retry. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15588) Purge the unsent offset commits/fetches when the member is fenced/failed
[ https://issues.apache.org/jira/browse/KAFKA-15588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans reassigned KAFKA-15588: -- Assignee: Lianet Magrans (was: Philip Nee) > Purge the unsent offset commits/fetches when the member is fenced/failed > > > Key: KAFKA-15588 > URL: https://issues.apache.org/jira/browse/KAFKA-15588 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support > Fix For: 3.8.0 > > > When the member is fenced/failed, we should purge the inflight offset commits > and fetches. HeartbeatRequestManager should be able to handle this -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16099) Handle timeouts for AsyncKafkaConsumer.commitSync
[ https://issues.apache.org/jira/browse/KAFKA-16099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-16099. Fix Version/s: 3.7.0 Resolution: Fixed > Handle timeouts for AsyncKafkaConsumer.commitSync > - > > Key: KAFKA-16099 > URL: https://issues.apache.org/jira/browse/KAFKA-16099 > Project: Kafka > Issue Type: Sub-task > Components: clients >Reporter: Andrew Schofield >Priority: Major > Fix For: 3.7.0 > > > The handling of synchronous offset commits in the background thread does not > observe the caller's timeout. In the situation that a commit request needs to > be retried, the retries should not extend beyond the caller's timeout. The > CommitApplicationEvent should contain the timeout and not continue beyond > that time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15853: Move ProcessRole to server module [kafka]
OmniaGM opened a new pull request, #15166: URL: https://github.com/apache/kafka/pull/15166 prepare to move KafkaConfig ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16105) Reassignment of tiered topics is failing due to RemoteStorageException
[ https://issues.apache.org/jira/browse/KAFKA-16105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anatolii Popov updated KAFKA-16105: --- Description: When partition reassignment is happening for a tiered topic in most of the cases it's stuck with RemoteStorageException's on follower nodes saying that it can not construct remote log auxilary state: {code:java} [2024-01-09 08:34:02,899] ERROR [ReplicaFetcher replicaId=7, leaderId=6, fetcherId=2] Error building remote log auxiliary state for test-24 (kafka.server.ReplicaFetcherThread) org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't build the state from remote store for partition: test-24, currentLeaderEpoch: 8, leaderLocalLogStartOffset: 209, leaderLogStartOffset: 0, epoch: 0 as the previous remote log segment metadata was not found at kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:259) at kafka.server.ReplicaFetcherTierStateMachine.start(ReplicaFetcherTierStateMachine.java:106) at kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:762) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:413) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:332) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:331) at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:407) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:403) at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:321) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:331) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130) {code} Scenario: A cluster of 3 nodes with a single topic with 30 partitions. All partitions have tiered segments. Adding 3 more nodes to the cluster and making a reassignment to move all the data to new nodes. Behavior: For most of the partitions reassignment is happening smoothly. For some of the partitions when a new node starts to get assignments it reads __remote_log_metadata topic and tries to initialize the metadata cache on records with COPY_SEGMENT_STARTED. If it's reading such a message for the partition before the partition was assigned to this specific node it ignores the message, so skips the cache initialization and marks the partition as assigned. So reassignment is stuck since COPY_SEGMENT_STARTED is never properly processed. Expected behavior: The partitions should not be marked as assigned before the cache is initialized to be able to re-read COPY_SEGMENT_STARTED message and initialize the cache. Some notes: This is most probably happening when there are messages in a single metadata partition and the order of the messages does not correspond to the order of assignment. So the follower reads the COPY_SEGMENT_STARTED message, sees that the user partition is not yet assigned to this node, skips the message, and marks the user partition as assigned. On the next iteration, it resets to beginning ONLY
[jira] [Created] (KAFKA-16105) Reassignment of tiered topics is failing due to RemoteStorageException
Anatolii Popov created KAFKA-16105: -- Summary: Reassignment of tiered topics is failing due to RemoteStorageException Key: KAFKA-16105 URL: https://issues.apache.org/jira/browse/KAFKA-16105 Project: Kafka Issue Type: Bug Components: Tiered-Storage Reporter: Anatolii Popov When partition reassignment is happening for a tiered topic in most of the cases it's stuck with RemoteStorageException's on follower nodes saying that it can not construct remote log auxilary state: {code:java} [2024-01-09 08:34:02,899] ERROR [ReplicaFetcher replicaId=7, leaderId=6, fetcherId=2] Error building remote log auxiliary state for test-24 (kafka.server.ReplicaFetcherThread) org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't build the state from remote store for partition: test-24, currentLeaderEpoch: 8, leaderLocalLogStartOffset: 209, leaderLogStartOffset: 0, epoch: 0 as the previous remote log segment metadata was not found at kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:259) at kafka.server.ReplicaFetcherTierStateMachine.start(ReplicaFetcherTierStateMachine.java:106) at kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:762) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:413) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:332) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:331) at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:407) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:403) at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:321) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:331) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130) {code} Scenario: A cluster of 3 nodes with a single topic with 30 partitions. All partitions have tiered segments. Adding 3 more nodes to the cluster and making a reassignment to move all the data to new nodes. Behavior: For most of the partitions reassignment is happening smoothly. For some of the partitions when a new node starts to get assignments it reads __remote_log_metadata topic and tries to initialize the metadata cache on records with COPY_SEGMENT_STARTED. If it's reading such a message for the partition before the partition was assigned to this specific node it ignores the message, so skips the cache initialization and marks the partition as assigned. So reassignment is stuck since COPY_SEGMENT_STARTED is never properly processed. Expected behavior: The partitions should not be marked as assigned before the cache is initialized to be able to re-read COPY_SEGMENT_STARTED message and initialize the cache. Some notes: This is most probably happening when there are messages in a single metadata partition and the order of the messages does not correspond to the order of assignment. So the follower reads the COPY_SEGMENT_STARTED m
[PR] Reassignment fix [kafka]
AnatolyPopov opened a new pull request, #15165: URL: https://github.com/apache/kafka/pull/15165 When partition reassignment is happening for a tiered topic in most of the cases it's stuck with RemoteStorageException's on follower nodes saying that it can not construct remote log auxilary state: ``` [2024-01-09 08:34:02,899] ERROR [ReplicaFetcher replicaId=7, leaderId=6, fetcherId=2] Error building remote log auxiliary state for test-24 (kafka.server.ReplicaFetcherThread) org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't build the state from remote store for partition: test-24, currentLeaderEpoch: 8, leaderLocalLogStartOffset: 209, leaderLogStartOffset: 0, epoch: 0 as the previous remote log segment metadata was not found at kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:259) at kafka.server.ReplicaFetcherTierStateMachine.start(ReplicaFetcherTierStateMachine.java:106) at kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:762) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:413) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:332) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:331) at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:407) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:403) at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:321) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:331) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129) at scala.Option.foreach(Option.scala:437) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130) ``` Scenario: A cluster of 3 nodes with a single topic with 30 partitions. All partitions have tiered segments. Adding 3 more nodes to the cluster and making a reassignment to move all the data to new nodes. Behavior: For most of the partitions reassignment is happening smoothly. For some of the partitions when a new node starts to get assignments it reads __remote_log_metadata topic and tries to initialize the metadata cache on records with COPY_SEGMENT_STARTED. If it's reading such a message for the partition before the partition was assigned to this specific node it ignores the message, so skips the cache initialization and marks the partition as assigned. So reassignment is stuck since COPY_SEGMENT_STARTED is never properly processed. Expected behavior: The partitions should not be marked as assigned the cache is initialized to be able to re-read COPY_SEGMENT_STARTED message and initialize the cache. Some notes: This is most probably happening when there are messages in a single metadata partition and the order of the messages does not correspond to the order of assignment. So the follower reads the COPY_SEGMENT_STARTED message, sees that the user partition is not yet assigned to this node, skips the message, and marks the user partition as assigned. On the next iteration, it resets to beginning ONLY t
[PR] KAFKA-15853: Move PasswordEncoder to server module [kafka]
OmniaGM opened a new pull request, #15164: URL: https://github.com/apache/kafka/pull/15164 blocked on #15158 - Tests will fail ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15721: KRaft support in DeleteTopicsRequestWithDeletionDisabledTest [kafka]
jolshan commented on PR #15124: URL: https://github.com/apache/kafka/pull/15124#issuecomment-1883534800 Sorry I don't seem to get notified for tags until the PR gets merged. I need to look at my notification settings 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15284) Implement ConsumerGroupProtocolVersionResolver to determine consumer group protocol
[ https://issues.apache.org/jira/browse/KAFKA-15284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15284: -- Fix Version/s: 4.0.0 (was: 3.8.0) > Implement ConsumerGroupProtocolVersionResolver to determine consumer group > protocol > --- > > Key: KAFKA-15284 > URL: https://issues.apache.org/jira/browse/KAFKA-15284 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 4.0.0 > > > At client initialization, we need to determine which of the > {{ConsumerDelegate}} implementations to use: > # {{LegacyKafkaConsumerDelegate}} > # {{AsyncKafkaConsumerDelegate}} > There are conditions defined by KIP-848 that determine client eligibility to > use the new protocol. This will be modeled by the—deep > breath—{{{}ConsumerGroupProtocolVersionResolver{}}}. > Known tasks: > * Determine at what point in the {{Consumer}} initialization the network > communication should happen > * Determine what RPCs to invoke in order to determine eligibility (API > versions, IBP version, etc.) > * Implement the network client lifecycle (startup, communication, shutdown, > etc.) > * Determine the fallback path in case the client is not eligible to use the > protocol -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16097: Add suspended tasks back to the state updater when reassigned [kafka]
lucasbru opened a new pull request, #15163: URL: https://github.com/apache/kafka/pull/15163 When a partition is revoked, the corresponding task gets a pending action "SUSPEND". This pending action may overwrite a previous pending action. If the task was previously removed from the state updater, e.g. because we were fenced, the pending action is overwritten with suspend, and in handleAssigned, upon reassignment of that task, then SUSPEND action is removed. Then, once the state updater executes the removal, no pending action is registered anymore, and we run into an IllegalStateException. This commit solves the problem by adding back reassigned tasks to the state updater, since they may have been removed from the state updater for another reason than being restored completely. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14404) Fix & update docs on client configs controlled by Streams
[ https://issues.apache.org/jira/browse/KAFKA-14404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ayoub Omari updated KAFKA-14404: External issue URL: (was: https://github.com/apache/kafka/pull/15162) > Fix & update docs on client configs controlled by Streams > - > > Key: KAFKA-14404 > URL: https://issues.apache.org/jira/browse/KAFKA-14404 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Ayoub Omari >Priority: Major > Labels: docs, newbie > > There are a handful of client configs that can't be set by Streams users for > various reasons, such as the group id, but we seem to have missed a few of > them in the documentation > [here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]: > the partitioner assignor (Consumer) and partitioner (Producer). > This section of the docs also just needs to be cleaned up in general as there > is overlap between the [Default > Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values] > and [Parameters controlled by Kafka > Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26] > sections, and the table of contents is messed up presumably due to an issue > with the section headers. > We should separate these with one section covering (only) configs where > Streams sets a different default but this can still be overridden by the > user, and the other section covering the configs that Streams hardcodes and > users can never override. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14404) Fix & update docs on client configs controlled by Streams
[ https://issues.apache.org/jira/browse/KAFKA-14404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ayoub Omari updated KAFKA-14404: External issue URL: https://github.com/apache/kafka/pull/15162 > Fix & update docs on client configs controlled by Streams > - > > Key: KAFKA-14404 > URL: https://issues.apache.org/jira/browse/KAFKA-14404 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Ayoub Omari >Priority: Major > Labels: docs, newbie > > There are a handful of client configs that can't be set by Streams users for > various reasons, such as the group id, but we seem to have missed a few of > them in the documentation > [here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]: > the partitioner assignor (Consumer) and partitioner (Producer). > This section of the docs also just needs to be cleaned up in general as there > is overlap between the [Default > Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values] > and [Parameters controlled by Kafka > Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26] > sections, and the table of contents is messed up presumably due to an issue > with the section headers. > We should separate these with one section covering (only) configs where > Streams sets a different default but this can still be overridden by the > user, and the other section covering the configs that Streams hardcodes and > users can never override. -- This message was sent by Atlassian Jira (v8.20.10#820010)