Re: [PR] MINOR: Fix GroupCoordinatorShardTest stubbing [kafka]
dajac merged PR #14637: URL: https://github.com/apache/kafka/pull/14637 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15653) NPE in ChunkedByteStream
[ https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17786206#comment-17786206 ] Henry Cai commented on KAFKA-15653: --- Is this Jira resolved? I saw all the three linked PRs are merged in already. > NPE in ChunkedByteStream > > > Key: KAFKA-15653 > URL: https://issues.apache.org/jira/browse/KAFKA-15653 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.6.0 > Environment: Docker container on a Linux laptop, using the latest > release. >Reporter: Travis Bischel >Assignee: Justine Olshan >Priority: Major > Attachments: repro.sh > > > When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR > from producing. The broker logs for the failing request: > > {noformat} > [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing > append operation on partition > 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 > (kafka.server.ReplicaManager) > java.lang.NullPointerException > at > org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) > at > org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105) > at > org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) > at > org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) > at > org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:805) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198) > at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874) > at > kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130) > at java.base/java.lang.Thread.run(Unknown Source) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15809) Adding TS to broker's features list and updating broker's metadata schema to include TS enable status
[ https://issues.apache.org/jira/browse/KAFKA-15809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17786204#comment-17786204 ] Phuc Hong Tran commented on KAFKA-15809: Nvm I got the answer > Adding TS to broker's features list and updating broker's metadata schema to > include TS enable status > - > > Key: KAFKA-15809 > URL: https://issues.apache.org/jira/browse/KAFKA-15809 > Project: Kafka > Issue Type: Sub-task >Reporter: Phuc Hong Tran >Assignee: Phuc Hong Tran >Priority: Minor > Labels: KIP-405 > Fix For: 3.7.0 > > > Currently controller doesn't have the visibility of all brokers's TS enable > status. As mentioned in KAFKA-15341, we need to add metadata about TS enable > status of brokers so that controller can check for these status before > enabling TS per topic -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15470: Allow creating connectors in a stopped state [kafka]
yashmayya merged PR #14704: URL: https://github.com/apache/kafka/pull/14704 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15470: Allow creating connectors in a stopped state [kafka]
yashmayya commented on PR #14704: URL: https://github.com/apache/kafka/pull/14704#issuecomment-1811862334 Thanks Chris! Test failures appear unrelated, merging to `trunk`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15470: Allow creating connectors in a stopped state [kafka]
yashmayya commented on code in PR #14704: URL: https://github.com/apache/kafka/pull/14704#discussion_r1393673421 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java: ## @@ -108,6 +108,19 @@ public interface Herder { */ void putConnectorConfig(String connName, Map config, boolean allowReplace, Callback> callback); +/** + * Set the configuration for a connector, along with a target state optionally. This supports creation and updating. + * @param connName name of the connector + * @param config the connector's configuration + * @param targetState the desired target state for the connector; may be {@code null} if no target state change is desired. Note that the default + *target state is {@link TargetState#STARTED} if no target state exists previously + * @param allowReplace if true, allow overwriting previous configs; if false, throw {@link AlreadyExistsException} + * if a connector with the same name already exists + * @param callback callback to invoke when the configuration has been written + */ +void putConnectorConfig(String connName, Map config, TargetState targetState, boolean allowReplace, Review Comment: I agree, that makes sense to me 👍 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java: ## @@ -108,6 +108,19 @@ public interface Herder { */ void putConnectorConfig(String connName, Map config, boolean allowReplace, Callback> callback); +/** + * Set the configuration for a connector, along with a target state optionally. This supports creation and updating. + * @param connName name of the connector + * @param config the connector's configuration + * @param targetState the desired target state for the connector; may be {@code null} if no target state change is desired. Note that the default + *target state is {@link TargetState#STARTED} if no target state exists previously + * @param allowReplace if true, allow overwriting previous configs; if false, throw {@link AlreadyExistsException} + * if a connector with the same name already exists + * @param callback callback to invoke when the configuration has been written + */ +void putConnectorConfig(String connName, Map config, TargetState targetState, boolean allowReplace, Review Comment: I agree, that makes sense to me 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15445: Add JVM Docker image [kafka]
VedarthConfluent commented on code in PR #14552: URL: https://github.com/apache/kafka/pull/14552#discussion_r1393649057 ## docker/docker_release.py: ## @@ -0,0 +1,80 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Python script to build and push docker image +Usage: docker_release.py + +Interactive utility to push the docker image to dockerhub +""" + +import subprocess +from distutils.dir_util import copy_tree +from datetime import date +import shutil + +def push_jvm(image, kafka_url): +copy_tree("resources", "jvm/resources") +subprocess.run(["docker", "buildx", "build", "-f", "jvm/Dockerfile", "--build-arg", f"kafka_url={kafka_url}", "--build-arg", f"build_date={date.today()}", +"--push", +"--platform", "linux/amd64,linux/arm64", +"--tag", image, "jvm"]) +shutil.rmtree("jvm/resources") + +def login(): +status = subprocess.run(["docker", "login"]) +if status.returncode != 0: +print("Docker login failed, aborting the docker release") +raise PermissionError + +def create_builder(): +subprocess.run(["docker", "buildx", "create", "--name", "kafka-builder", "--use"]) + +def remove_builder(): +subprocess.run(["docker", "buildx", "rm", "kafka-builder"]) + +if __name__ == "__main__": Review Comment: We have decided to keep the release and promotion process to be driven by local scripts for now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1393640277 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1355,6 +1402,60 @@ class KafkaApis(val requestChannel: RequestChannel, )) } + def handleDescribeTopicsRequest(request: RequestChannel.Request): Unit = { +val describeTopicsRequest = request.body[DescribeTopicsRequest] + +val topics = scala.collection.mutable.Map[String, Int]() +describeTopicsRequest.data.topics.forEach { topic => + if (topic.name == null || topic.firstPartitionId() < 0) { +throw new InvalidRequestException(s"Topic name and first partition id must be set.") + } + topics.put(topic.name(), topic.firstPartitionId()) +} + +val fetchAllTopics = topics.isEmpty Review Comment: If there is no topic in the request, it means the client wants all the topics. firstPartitionIndex is a topic level argument. It only works in the case where topics are provided. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1393639171 ## clients/src/main/resources/common/message/DescribeTopicsRequest.json: ## @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 73, + "type": "request", + "listeners": ["broker"], + "name": "DescribeTopicsRequest", Review Comment: Updated the name. Thanks for the advice! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1393638945 ## clients/src/main/resources/common/message/DescribeTopicsResponse.json: ## @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 73, + "type": "response", + "name": "DescribeTopicsResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, +{ "name": "Topics", "type": "[]DescribeTopicsResponseTopic", "versions": "0+", + "about": "Each topic in the response.", "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", +"about": "The topic error, or 0 if there was no error." }, + { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "nullableVersions": "0+", +"about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The topic id." }, + { "name": "IsInternal", "type": "bool", "versions": "0+", "default": "false", "ignorable": true, +"about": "True if the topic is internal." }, + { "name": "Partitions", "type": "[]DescribeTopicsResponsePartition", "versions": "0+", +"about": "Each partition in the topic.", "fields": [ +{ "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The partition error, or 0 if there was no error." }, +{ "name": "PartitionIndex", "type": "int32", "versions": "0+", + "about": "The partition index." }, +{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId", + "about": "The ID of the leader broker." }, +{ "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true, + "about": "The leader epoch of this partition." }, +{ "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId", + "about": "The set of all nodes that host this partition." }, +{ "name": "IsrNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId", + "about": "The set of nodes that are in sync with the leader for this partition." }, +{ "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId", + "versions": "0+", "nullableVersions": "0+", + "about": "The new eligible leader replicas otherwise." }, +{ "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId", + "versions": "0+", "nullableVersions": "0+", + "about": "The last known ELR." }, +{ "name": "OfflineReplicas", "type": "[]int32", "versions": "0+", "ignorable": true, "entityType": "brokerId", + "about": "The set of offline replicas of this partition." }]}, + { "name": "TopicAuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648", +"about": "32-bit bitfield to represent authorized operations for this topic." }, + { "name": "NextPartition", "type": "int32", "versions": "0+", "default": "-1", +"about": "The first partition that exceed the request limit. " }]} Review Comment: Correct, it should be empty in most of the cases. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1393636842 ## clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicsRequest.java: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.DescribeTopicsRequestData; +import org.apache.kafka.common.message.DescribeTopicsResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; + +public class DescribeTopicsRequest extends AbstractRequest { +public static class Builder extends AbstractRequest.Builder { +private final DescribeTopicsRequestData data; + +public Builder(DescribeTopicsRequestData data) { +super(ApiKeys.DESCRIBE_TOPICS); +this.data = data; +} + +public Builder(List topics) { +super(ApiKeys.DESCRIBE_TOPICS, ApiKeys.DESCRIBE_TOPICS.oldestVersion(), ApiKeys.DESCRIBE_TOPICS.latestVersion()); +DescribeTopicsRequestData data = new DescribeTopicsRequestData(); +topics.forEach(topicName -> data.topics().add(new DescribeTopicsRequestData.TopicRequest().setName(topicName))); +this.data = data; +} + +@Override +public DescribeTopicsRequest build(short version) { +return new DescribeTopicsRequest(data, version); +} + +@Override +public String toString() { +return data.toString(); +} + +} + +private final DescribeTopicsRequestData data; + +public DescribeTopicsRequest(DescribeTopicsRequestData data) { +super(ApiKeys.DESCRIBE_TOPICS, (short) 0); +this.data = data; +} + +public DescribeTopicsRequest(DescribeTopicsRequestData data, short version) { +super(ApiKeys.DESCRIBE_TOPICS, version); +this.data = data; +} + +@Override +public DescribeTopicsRequestData data() { +return data; +} + +@Override +public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { +Errors error = Errors.forException(e); +DescribeTopicsResponseData responseData = new DescribeTopicsResponseData(); +for (DescribeTopicsRequestData.TopicRequest topic : data.topics()) { Review Comment: Can you help elaborate here? what kind of top-level error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1393636378 ## clients/src/main/java/org/apache/kafka/common/protocol/Errors.java: ## @@ -386,7 +387,8 @@ public enum Errors { STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry after receiving its updated member epoch via the ConsumerGroupHeartbeat API.", StaleMemberEpochException::new), MISMATCHED_ENDPOINT_TYPE(114, "The request was sent to an endpoint of the wrong type.", MismatchedEndpointTypeException::new), UNSUPPORTED_ENDPOINT_TYPE(115, "This endpoint type is not supported yet.", UnsupportedEndpointTypeException::new), -UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", UnknownControllerIdException::new); +UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", UnknownControllerIdException::new), +REQUEST_LIMIT_REACHED(117, "The request has reached the limit.", RequestLimitReachedException::new); Review Comment: Yes. Updated the comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1393634656 ## clients/src/main/java/org/apache/kafka/common/errors/RequestLimitReachedException.java: ## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class RequestLimitReachedException extends ApiException { Review Comment: Yes. Comments added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1393633978 ## clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java: ## @@ -114,6 +114,7 @@ public enum ApiKeys { CONSUMER_GROUP_HEARTBEAT(ApiMessageType.CONSUMER_GROUP_HEARTBEAT), CONSUMER_GROUP_DESCRIBE(ApiMessageType.CONSUMER_GROUP_DESCRIBE), CONTROLLER_REGISTRATION(ApiMessageType.CONTROLLER_REGISTRATION), +DESCRIBE_TOPICS(ApiMessageType.DESCRIBE_TOPICS), Review Comment: Sorry, it is misaligned in what way? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1393633762 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -116,6 +117,7 @@ class KafkaApis(val requestChannel: RequestChannel, val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time) val aclApis = new AclApis(authHelper, authorizer, requestHelper, "broker", config) val configManager = new ConfigAdminManager(brokerId, config, configRepository) + val partitionRequestLimit = 2000 Review Comment: Sure, added the config. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] WIP: initial simple and incomplete implementation [kafka]
github-actions[bot] commented on PR #14144: URL: https://github.com/apache/kafka/pull/14144#issuecomment-1811755757 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [DRAFT] EXAMPLE: dual write recipe example for KIP-939 [kafka]
github-actions[bot] commented on PR #14231: URL: https://github.com/apache/kafka/pull/14231#issuecomment-1811755708 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15829) How to build Kafka 2.7 with maven instead of gradle?
rain.liang created KAFKA-15829: -- Summary: How to build Kafka 2.7 with maven instead of gradle? Key: KAFKA-15829 URL: https://issues.apache.org/jira/browse/KAFKA-15829 Project: Kafka Issue Type: Wish Affects Versions: 2.7.2 Reporter: rain.liang It's difficult to upgrade the version of gradle in kafka building. Is there a solution to build kafka 2.7 with maven? -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15346: add support for 'single key single timestamp' IQs with versioned state stores (KIP-960) [kafka]
mjsax commented on PR #14596: URL: https://github.com/apache/kafka/pull/14596#issuecomment-1811669805 Jenkins did crash. Re-started the build. Can merge after we got a green run. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15174: Ensure CommitAsync propagate the exception to the user [kafka]
kirktrue commented on PR #14680: URL: https://github.com/apache/kafka/pull/14680#issuecomment-1811620894 @lucasbru As @philipnee said, the two mechanisms appear similar and do have overlap. However, not only is it "awkward" to mix them, but it raises correctness questions. `poll()` processes _all_ of the events in the queue from the background thread. However, when the user calls `commit`*, we want to execute only the commit callbacks. As a result, we would have to _filter and remove_ the relevant events from the background thread. The questions that arose: * What if, when processing the commit callbacks, we see error events in the queue? Do we skip them? Execute them? Or something else? * Will removing the events from the background event queue cause unforeseen issues later when calling `poll()`? The answer may be that it's totally fine. We kind of "punted" by doing it this way, not only because of those questions, but because it seemed a bit awkward 🤷♂️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
hachikuji commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1393479827 ## clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicsRequest.java: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.DescribeTopicsRequestData; +import org.apache.kafka.common.message.DescribeTopicsResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; + +public class DescribeTopicsRequest extends AbstractRequest { +public static class Builder extends AbstractRequest.Builder { +private final DescribeTopicsRequestData data; + +public Builder(DescribeTopicsRequestData data) { +super(ApiKeys.DESCRIBE_TOPICS); +this.data = data; +} + +public Builder(List topics) { +super(ApiKeys.DESCRIBE_TOPICS, ApiKeys.DESCRIBE_TOPICS.oldestVersion(), ApiKeys.DESCRIBE_TOPICS.latestVersion()); +DescribeTopicsRequestData data = new DescribeTopicsRequestData(); +topics.forEach(topicName -> data.topics().add(new DescribeTopicsRequestData.TopicRequest().setName(topicName))); +this.data = data; +} + +@Override +public DescribeTopicsRequest build(short version) { +return new DescribeTopicsRequest(data, version); +} + +@Override +public String toString() { +return data.toString(); +} + +} + +private final DescribeTopicsRequestData data; + +public DescribeTopicsRequest(DescribeTopicsRequestData data) { +super(ApiKeys.DESCRIBE_TOPICS, (short) 0); +this.data = data; +} + +public DescribeTopicsRequest(DescribeTopicsRequestData data, short version) { +super(ApiKeys.DESCRIBE_TOPICS, version); +this.data = data; +} + +@Override +public DescribeTopicsRequestData data() { +return data; +} + +@Override +public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { +Errors error = Errors.forException(e); +DescribeTopicsResponseData responseData = new DescribeTopicsResponseData(); +for (DescribeTopicsRequestData.TopicRequest topic : data.topics()) { Review Comment: Perhaps this is telling us that we need a top-level error in the response. ## clients/src/main/java/org/apache/kafka/common/protocol/Errors.java: ## @@ -386,7 +387,8 @@ public enum Errors { STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry after receiving its updated member epoch via the ConsumerGroupHeartbeat API.", StaleMemberEpochException::new), MISMATCHED_ENDPOINT_TYPE(114, "The request was sent to an endpoint of the wrong type.", MismatchedEndpointTypeException::new), UNSUPPORTED_ENDPOINT_TYPE(115, "This endpoint type is not supported yet.", UnsupportedEndpointTypeException::new), -UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", UnknownControllerIdException::new); +UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", UnknownControllerIdException::new), +REQUEST_LIMIT_REACHED(117, "The request has reached the limit.", RequestLimitReachedException::new); Review Comment: The phrasing is very vague. I guess it is a request _size_ limit? ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1355,6 +1402,60 @@ class KafkaApis(val requestChannel: RequestChannel, )) } + def handleDescribeTopicsRequest(request: RequestChannel.Request): Unit = { +val describeTopicsRequest = request.body[DescribeTopicsRequest] + +val topics = scala.collection.mutable.Map[String, Int]() +describeTopicsRequest.data.topics.forEach { topic => + if (topic.name == null || topic.firstPartitionId() < 0) { +throw new InvalidRequestException(s"Topic name and first partition id must be set.") + } + topics.put(topic.name()
[jira] [Updated] (KAFKA-15828) Protect clients from broker hostname reuse
[ https://issues.apache.org/jira/browse/KAFKA-15828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15828: Labels: needs-kip (was: ) > Protect clients from broker hostname reuse > -- > > Key: KAFKA-15828 > URL: https://issues.apache.org/jira/browse/KAFKA-15828 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, producer >Reporter: Jason Gustafson >Priority: Major > Labels: needs-kip > > In some environments such as k8s, brokers may be assigned to nodes > dynamically from an available pool. When a cluster is rolling, it is possible > for the client to see the same node advertised for different broker IDs in a > short period of time. For example, kafka-1 might be initially assigned to > node1. Before the client is able to establish a connection, it could be that > kafka-3 is now on node1 instead. Currently there is no protection in the > client or in the protocol for this scenario. If the connection succeeds, the > client will assume it has a good connection to kafka-1. Until something > disrupts the connection, it will continue under this assumption even if the > hostname for kafka-1 changes. > We have observed this scenario in practice. The client connected to the wrong > broker through stale hostname information. It was unable to produce data > because of persistent NOT_LEADER errors. The only way to recover in the end > was by restarting the client to force a reconnection. > We have discussed a couple potential solutions to this problem: > # Let the client be smarter managing the connection/hostname mapping. When > it detects that a hostname has changed, it should force a disconnect to > ensure it connects to the right node. > # We can modify the protocol to verify that the client has connected to the > intended broker. For example, we can add a field to ApiVersions to indicate > the intended broker ID. The broker receiving the request can return an error > if its ID does not match that in the request. > Are there alternatives? > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15828) Protect clients from broker hostname reuse
[ https://issues.apache.org/jira/browse/KAFKA-15828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15828: Component/s: clients consumer producer > Protect clients from broker hostname reuse > -- > > Key: KAFKA-15828 > URL: https://issues.apache.org/jira/browse/KAFKA-15828 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, producer >Reporter: Jason Gustafson >Priority: Major > > In some environments such as k8s, brokers may be assigned to nodes > dynamically from an available pool. When a cluster is rolling, it is possible > for the client to see the same node advertised for different broker IDs in a > short period of time. For example, kafka-1 might be initially assigned to > node1. Before the client is able to establish a connection, it could be that > kafka-3 is now on node1 instead. Currently there is no protection in the > client or in the protocol for this scenario. If the connection succeeds, the > client will assume it has a good connection to kafka-1. Until something > disrupts the connection, it will continue under this assumption even if the > hostname for kafka-1 changes. > We have observed this scenario in practice. The client connected to the wrong > broker through stale hostname information. It was unable to produce data > because of persistent NOT_LEADER errors. The only way to recover in the end > was by restarting the client to force a reconnection. > We have discussed a couple potential solutions to this problem: > # Let the client be smarter managing the connection/hostname mapping. When > it detects that a hostname has changed, it should force a disconnect to > ensure it connects to the right node. > # We can modify the protocol to verify that the client has connected to the > intended broker. For example, we can add a field to ApiVersions to indicate > the intended broker ID. The broker receiving the request can return an error > if its ID does not match that in the request. > Are there alternatives? > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
kirktrue commented on PR #14758: URL: https://github.com/apache/kafka/pull/14758#issuecomment-1811599069 @philipnee can you tag with `ctr` and `KIP-848` 🥺 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15276) Implement event plumbing for ConsumerRebalanceListener callbacks
[ https://issues.apache.org/jira/browse/KAFKA-15276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15276: -- Summary: Implement event plumbing for ConsumerRebalanceListener callbacks (was: Implement partition assignment reconciliation callbacks) > Implement event plumbing for ConsumerRebalanceListener callbacks > > > Key: KAFKA-15276 > URL: https://issues.apache.org/jira/browse/KAFKA-15276 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > Provide the Java client support for the consumer group partition assignment > logic, including: > * Calculate the difference between the current partition assignment and that > returned in the {{ConsumerGroupHeartbeatResponse}} RPC response > * Ensure we handle the case where changes to the assignment take multiple > passes of {{RequestManager.poll()}} > * Integrate the mechanism to invoke the user’s rebalance callback > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15276) Implement partition assignment reconciliation callbacks
[ https://issues.apache.org/jira/browse/KAFKA-15276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15276: -- Summary: Implement partition assignment reconciliation callbacks (was: Complete partition assignment reconciliation callbacks) > Implement partition assignment reconciliation callbacks > --- > > Key: KAFKA-15276 > URL: https://issues.apache.org/jira/browse/KAFKA-15276 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > Provide the Java client support for the consumer group partition assignment > logic, including: > * Calculate the difference between the current partition assignment and that > returned in the {{ConsumerGroupHeartbeatResponse}} RPC response > * Ensure we handle the case where changes to the assignment take multiple > passes of {{RequestManager.poll()}} > * Integrate the mechanism to invoke the user’s rebalance callback > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Small LogValidator clean ups [kafka]
hachikuji commented on PR #14697: URL: https://github.com/apache/kafka/pull/14697#issuecomment-1811585213 @ex172000 @junrao Thanks for reviewing. Please take another look when you have time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]
jolshan merged PR #14753: URL: https://github.com/apache/kafka/pull/14753 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue closed pull request #14670: KAFKA-15277: Design & implement support for internal Consumer delegates URL: https://github.com/apache/kafka/pull/14670 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on PR #14670: URL: https://github.com/apache/kafka/pull/14670#issuecomment-1811579631 Update on test failures: - `kafka.api.ClientIdQuotaTest` - `testQuotaOverrideDelete`: ran 50 times locally with no errors - `testThrottledProducerConsumer`: ran 75 times locally with no errors - `org.apache.kafka.connect.integration.StartAndStopLatchTest` - `shouldReturnFalseWhenAwaitingForStopToNeverComplete`: ran 400 times locally with no errors Closing and re-opening to trigger build... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]
jolshan commented on PR #14753: URL: https://github.com/apache/kafka/pull/14753#issuecomment-1811575702 Given the scope of the change, these test failures are not related. Will followup elsewhere. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Small LogValidator clean ups [kafka]
hachikuji commented on code in PR #14697: URL: https://github.com/apache/kafka/pull/14697#discussion_r1393453106 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -818,9 +818,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs) appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestampMs) appendInfo.setLastOffset(offset.value - 1) - appendInfo.setRecordConversionStats(validateAndOffsetAssignResult.recordConversionStats) -if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) - appendInfo.setLogAppendTime(validateAndOffsetAssignResult.logAppendTimeMs) Review Comment: Yes, this was a mistake. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15824: SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet [kafka]
wcarlson5 merged PR #14757: URL: https://github.com/apache/kafka/pull/14757 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15824: SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet [kafka]
wcarlson5 commented on PR #14757: URL: https://github.com/apache/kafka/pull/14757#issuecomment-1811568924 Execution failed for task ':streams:upgrade-system-tests-0101:test'. is unrelated to this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Small LogValidator clean ups [kafka]
hachikuji commented on code in PR #14697: URL: https://github.com/apache/kafka/pull/14697#discussion_r1393447344 ## core/src/test/scala/unit/kafka/log/LogValidatorTest.scala: ## @@ -399,12 +402,20 @@ class LogValidatorTest { assertEquals(i, offsetCounter.value); assertEquals(now + 1, validatingResults.maxTimestampMs, s"Max timestamp should be ${now + 1}") -assertEquals(1, validatingResults.shallowOffsetOfMaxTimestampMs, + +val expectedShallowOffsetOfMaxTimestamp = if (magic >= RecordVersion.V2.value) { Review Comment: If the behavior changes, breaking the test seems correct? Very unlikely we would ever go back to a non-batching format in any case, so this seems like a reasonable default assertion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15816: Fix leaked sockets in runtime tests [kafka]
gharris1727 opened a new pull request, #14764: URL: https://github.com/apache/kafka/pull/14764 These tests contain typos that leak sockets, mostly through clients that are left open. The WorkerTest had rather involved resource leaks, which arose because the typical lifetime of the WorkerTask was partially or totally mocked. The WorkerTask subclass constructors accept clients as arguments, and take ownership of those clients, and the partial mocking prevented those clients from being closed appropriately. Now, all of (WorkerSourceTask, ExactlyOnceWorkerSourceTask, WorkerSinkTask) have their constructors mocked, and those constructors close the passed-in resources immediately. This also allows the test to avoid waiting for some cancellation timeouts to expire, making the test faster to run. The other systematic typo that was present was the OffsetStore not being configured or closed, which caused a consumer to be leaked. This is because the constructor is instantiated in the factory method (e.g. offsetStoreForExactlyOnceSourceTask), but the KafkaOffsetBackingStore.offsetLog field field is only initialized inside of configure(WorkerConfig). Rather than making OffsetBackingStore implementations close the consumer even when configure is not called, I made the test use a more realistic lifecycle and actually call configure(WorkerConfig) and stop(). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1393368519 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest { s"The current assignment is ${consumer.assignment()}") } - @Test - def testConsumingWithNullGroupId(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // Null group ID only supported for generic group protocol + def testConsumingWithNullGroupId(groupProtocol: String): Unit = { val topic = "test_topic" -val partition = 0; +val partition = 0 Review Comment: I missed one :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1391737904 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -60,17 +62,39 @@ public PushTelemetryRequest(PushTelemetryRequestData data, short version) { @Override public PushTelemetryResponse getErrorResponse(int throttleTimeMs, Throwable e) { -PushTelemetryResponseData responseData = new PushTelemetryResponseData() -.setErrorCode(Errors.forException(e).code()) -.setThrottleTimeMs(throttleTimeMs); -return new PushTelemetryResponse(responseData); +return errorResponse(throttleTimeMs, Errors.forException(e)); Review Comment: This is an existing issue, but `getErrorResponse` can just be `errorResponse`. ## core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java: ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsInstanceTest { + +private Uuid uuid; +private ClientMetricsInstanceMetadata instanceMetadata; +private ClientMetricsInstance clientInstance; + +@BeforeEach +public void setUp() throws UnknownHostException { +uuid = Uuid.randomUuid(); +instanceMetadata = new ClientMetricsInstanceMetadata(uuid, +ClientMetricsTestUtils.requestContext()); Review Comment: merge with previous line? ## core/src/test/java/kafka/server/ClientMetricsManagerTest.java: ## @@ -0,0 +1,921 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server; + +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsTestUtils; +import kafka.server.ClientMetricsManager.SubscriptionInfo; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryRequest.Builder; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.ju
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on PR #14670: URL: https://github.com/apache/kafka/pull/14670#issuecomment-1811460391 @dajac—here are the unique test failures with notes: - integration.kafka.server.FetchFromFollowerIntegrationTest: testRackAwareRangeAssignor(String).quorum=kraft - KAFKA-15020 - integration.kafka.server.FetchFromFollowerIntegrationTest: testRackAwareRangeAssignor(String).quorum=zk - KAFKA-15020, same as above - kafka.api.ClientIdQuotaTest: testQuotaOverrideDelete(String).quorum=kraft - KAFKA-8107 - **_Will investigate_** - kafka.api.ClientIdQuotaTest: testThrottledProducerConsumer(String).quorum=zk - KAFKA-8108 - **_Will investigate_** - kafka.api.GroupEndToEndAuthorizationTest: testAuthentications(String).quorum=kraft - No Jira, but [it is flaky on `trunk` and PR builds](https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&tests.container=kafka.api.GroupEndToEndAuthorizationTest&tests.test=testAuthentications(String)%5B2%5D) - kafka.api.TransactionsTest: testBumpTransactionalEpoch(String).quorum=kraft - KAFKA-15099 - org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest: testMultiWorkerRestartOnlyConnector - KAFKA-15675 - org.apache.kafka.connect.integration.StartAndStopLatchTest: shouldReturnFalseWhenAwaitingForStopToNeverComplete - **_Will investigate_** - org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest: testReplication() - No Jira, but [it is flaky on `trunk` and PR builds](https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&tests.container=org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest&tests.test=testReplication()) - org.apache.kafka.controller.QuorumControllerTest: testBalancePartitionLeaders() - KAFKA-15052, marked as fixed - org.apache.kafka.controller.QuorumControllerTest: testFenceMultipleBrokers() - No Jira, but [it is flaky on `trunk` and PR builds](https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&tests.container=org.apache.kafka.controller.QuorumControllerTest&tests.test=testFenceMultipleBrokers()) - org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest: testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() - No Jira, but [it is flaky on `trunk` and PR builds](https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&tests.container=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest&tests.test=testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs()) - org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest: shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_v2] - KAFKA-9868, marked as fixed - org.apache.kafka.streams.integration.IQv2StoreIntegrationTest; verifyStore[cache=true, log=true, supplier=TIME_ROCKS_WINDOW, kind=DSL] - KAFKA-13714, marked as fixed - org.apache.kafka.streams.integration.RestoreIntegrationTest: shouldInvokeUserDefinedGlobalStateRestoreListener() - KAFKA-15659, marked as fixed - org.apache.kafka.tools.MetadataQuorumCommandTest: [1] Type=Raft-Combined, Name=testDescribeQuorumStatusSuccessful, MetadataVersion=3.7-IV1, Security=PLAINTEXT - KAFKA-15104 - org.apache.kafka.trogdor.coordinator.CoordinatorTest: testTaskRequestWithOldStartMsGetsUpdated() - KAFKA-8115 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1393368519 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest { s"The current assignment is ${consumer.assignment()}") } - @Test - def testConsumingWithNullGroupId(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // Null group ID only supported for generic group protocol + def testConsumingWithNullGroupId(groupProtocol: String): Unit = { val topic = "test_topic" -val partition = 0; +val partition = 0 Review Comment: I missed one :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1393368202 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest { s"The current assignment is ${consumer.assignment()}") } - @Test - def testConsumingWithNullGroupId(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // Null group ID only supported for generic group protocol + def testConsumingWithNullGroupId(groupProtocol: String): Unit = { val topic = "test_topic" -val partition = 0; +val partition = 0 val tp = new TopicPartition(topic, partition) createTopic(topic, 1, 1) -TestUtils.waitUntilTrue(() => { - this.zkClient.topicExists(topic) -}, "Failed to create topic") - Review Comment: I'm not convinced we ever did. Again, this is ZK-specific. The tests work on both variants without this check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1393367240 ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -34,13 +35,15 @@ import scala.collection.Seq */ abstract class BaseConsumerTest extends AbstractConsumerTest { - @Test - def testSimpleConsumption(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) Review Comment: Perhaps, but I want to be able to have a different array for each test to enable them to be turned on individually. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1393365674 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1657,35 +1759,37 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(numMessages - records.count, lag.metricValue.asInstanceOf[Double], epsilon, s"The lag should be ${numMessages - records.count}") } - @Test - def testQuotaMetricsNotCreatedIfNoQuotasConfigured(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testQuotaMetricsNotCreatedIfNoQuotasConfigured(groupProtocol: String): Unit = { val numRecords = 1000 val producer = createProducer() val startingTimestamp = System.currentTimeMillis() sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) val consumer = createConsumer() consumer.assign(List(tp).asJava) consumer.seek(tp, 0) consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) -def assertNoMetric(broker: KafkaServer, name: String, quotaType: QuotaType, clientId: String): Unit = { -val metricName = broker.metrics.metricName("throttle-time", - quotaType.toString, - "", - "user", "", - "client-id", clientId) -assertNull(broker.metrics.metric(metricName), "Metric should not have been created " + metricName) +def assertNoMetric(broker: KafkaBroker, name: String, quotaType: QuotaType, clientId: String): Unit = { + val metricName = broker.metrics.metricName("throttle-time", +quotaType.toString, +"", +"user", "", +"client-id", clientId) + assertNull(broker.metrics.metric(metricName), "Metric should not have been created " + metricName) } -servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, producerClientId)) -servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, producerClientId)) -servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, consumerClientId)) -servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, consumerClientId)) +brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, producerClientId)) +brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, producerClientId)) +brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, consumerClientId)) +brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, consumerClientId)) -servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, producerClientId)) -servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, producerClientId)) -servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, consumerClientId)) -servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, consumerClientId)) +brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, producerClientId)) +brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, producerClientId)) +brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, consumerClientId)) +brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, consumerClientId)) Review Comment: In this context, `servers` and `brokers` are interchangeable. This change makes the test work for ZK or KRaft. Previously, it was ZK-only. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15756: Migrate existing integration tests to run old protocol in new coordinator [kafka]
dongnuo123 commented on code in PR #14675: URL: https://github.com/apache/kafka/pull/14675#discussion_r1393317417 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -547,7 +565,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(2, parts.size) } - @Test + @Test // TODO: doesn't pass for kraft and kraft+kip848 Review Comment: `consumer.partitionsFor("non-exist-topic")` is currently not supported in the new protocol -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15828) Protect clients from broker hostname reuse
Jason Gustafson created KAFKA-15828: --- Summary: Protect clients from broker hostname reuse Key: KAFKA-15828 URL: https://issues.apache.org/jira/browse/KAFKA-15828 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson In some environments such as k8s, brokers may be assigned to nodes dynamically from an available pool. When a cluster is rolling, it is possible for the client to see the same node advertised for different broker IDs in a short period of time. For example, kafka-1 might be initially assigned to node1. Before the client is able to establish a connection, it could be that kafka-3 is now on node1 instead. Currently there is no protection in the client or in the protocol for this scenario. If the connection succeeds, the client will assume it has a good connection to kafka-1. Until something disrupts the connection, it will continue under this assumption even if the hostname for kafka-1 changes. We have observed this scenario in practice. The client connected to the wrong broker through stale hostname information. It was unable to produce data because of persistent NOT_LEADER errors. The only way to recover in the end was by restarting the client to force a reconnection. We have discussed a couple potential solutions to this problem: # Let the client be smarter managing the connection/hostname mapping. When it detects that a hostname has changed, it should force a disconnect to ensure it connects to the right node. # We can modify the protocol to verify that the client has connected to the intended broker. For example, we can add a field to ApiVersions to indicate the intended broker ID. The broker receiving the request can return an error if its ID does not match that in the request. Are there alternatives? -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15756: Migrate existing integration tests to run old protocol in new coordinator [kafka]
dongnuo123 commented on code in PR #14675: URL: https://github.com/apache/kafka/pull/14675#discussion_r1393317417 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -547,7 +565,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(2, parts.size) } - @Test + @Test // TODO: doesn't pass for kraft and kraft+kip848 Review Comment: `consumer.partitionsFor("non-exist-topic")` is currently not supported in kraft -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
kirktrue commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1393305110 ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -34,13 +35,15 @@ import scala.collection.Seq */ abstract class BaseConsumerTest extends AbstractConsumerTest { - @Test - def testSimpleConsumption(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) Review Comment: Is it possible to have a different type of 'source' that can be defined once vs. on each test? More of a nit, but curious. ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1805,17 +1909,14 @@ class PlaintextConsumerTest extends BaseConsumerTest { s"The current assignment is ${consumer.assignment()}") } - @Test - def testConsumingWithNullGroupId(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic")) // Null group ID only supported for generic group protocol + def testConsumingWithNullGroupId(groupProtocol: String): Unit = { val topic = "test_topic" -val partition = 0; +val partition = 0 val tp = new TopicPartition(topic, partition) createTopic(topic, 1, 1) -TestUtils.waitUntilTrue(() => { - this.zkClient.topicExists(topic) -}, "Failed to create topic") - Review Comment: Why don't we need this now? ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1657,35 +1759,37 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(numMessages - records.count, lag.metricValue.asInstanceOf[Double], epsilon, s"The lag should be ${numMessages - records.count}") } - @Test - def testQuotaMetricsNotCreatedIfNoQuotasConfigured(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testQuotaMetricsNotCreatedIfNoQuotasConfigured(groupProtocol: String): Unit = { val numRecords = 1000 val producer = createProducer() val startingTimestamp = System.currentTimeMillis() sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) val consumer = createConsumer() consumer.assign(List(tp).asJava) consumer.seek(tp, 0) consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) -def assertNoMetric(broker: KafkaServer, name: String, quotaType: QuotaType, clientId: String): Unit = { -val metricName = broker.metrics.metricName("throttle-time", - quotaType.toString, - "", - "user", "", - "client-id", clientId) -assertNull(broker.metrics.metric(metricName), "Metric should not have been created " + metricName) +def assertNoMetric(broker: KafkaBroker, name: String, quotaType: QuotaType, clientId: String): Unit = { + val metricName = broker.metrics.metricName("throttle-time", +quotaType.toString, +"", +"user", "", +"client-id", clientId) + assertNull(broker.metrics.metric(metricName), "Metric should not have been created " + metricName) } -servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, producerClientId)) -servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, producerClientId)) -servers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, consumerClientId)) -servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, consumerClientId)) +brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Produce, producerClientId)) +brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Produce, producerClientId)) +brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.Fetch, consumerClientId)) +brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Fetch, consumerClientId)) -servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, producerClientId)) -servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, producerClientId)) -servers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, consumerClientId)) -servers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, consumerClientId)) +brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, producerClientId)) +brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, producerClientId)) +brokers.foreach(assertNoMetric(_, "request-time", QuotaType.Request, consumerClientId)) +brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.Request, consumerClientId)) Review Comment: So this is incorrect in `trunk`, right? ## core/src/test/scala/integration/kafka/
[PR] KAFKA-15827: Prevent KafkaBasedLog subclasses from leaking passed-in clients [kafka]
gharris1727 opened a new pull request, #14763: URL: https://github.com/apache/kafka/pull/14763 The KafkaBasedLog normally creates clients during start() and closes them in stop(). Some KafkaBasedLog subclasses accept already-created clients, and close them in stop() if start() is called first. These clients should also be closed if stop() is called without first calling start(). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15827) KafkaBasedLog.withExistingClients leaks clients if start is not called
Greg Harris created KAFKA-15827: --- Summary: KafkaBasedLog.withExistingClients leaks clients if start is not called Key: KAFKA-15827 URL: https://issues.apache.org/jira/browse/KAFKA-15827 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 3.6.0 Reporter: Greg Harris Assignee: Greg Harris The KafkaBasedLog base implementation creates consumers and producers, and closes them after they are instantiated. There are subclasses of the KafkaBasedLog which accept pre-created consumers and producers, and have the responsibility for closing the clients when the KafkaBasedLog is stopped. It appears that the KafkaBasedLog subclasses do not close the clients when start() is skipped and stop() is called directly. This happens in a few tests, and causes the passed-in clients to be leaked. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
eduwercamacaro commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1393218354 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final StreamTask activeTask, final Se } private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final Set partitions) { -return activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final StreamTask streamTask = activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final ProcessorStateManager stateManager = standbyTask.stateManager(); +for (final TopicPartition partition : partitions) { +final ProcessorStateManager.StateStoreMetadata storeMetadata = stateManager.storeMetadata(partition); +if (storeMetadata != null && storeMetadata.endOffset() != null) { +standbyTaskUpdateListener.onUpdateSuspended(partition, storeMetadata.store().name(), storeMetadata.offset(), storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED); Review Comment: Thanks! I agree. I'm going to implement this in the `StoreChangelogReader` adding a new argument for the `#unregister ` method and cover the case for `MIGRATED` ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -701,7 +706,15 @@ private StandbyTask convertActiveToStandby(final StreamTask activeTask, final Se } private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final Set partitions) { -return activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final StreamTask streamTask = activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, mainConsumer); +final ProcessorStateManager stateManager = standbyTask.stateManager(); +for (final TopicPartition partition : partitions) { +final ProcessorStateManager.StateStoreMetadata storeMetadata = stateManager.storeMetadata(partition); +if (storeMetadata != null && storeMetadata.endOffset() != null) { +standbyTaskUpdateListener.onUpdateSuspended(partition, storeMetadata.store().name(), storeMetadata.offset(), storeMetadata.endOffset(), StandbyUpdateListener.SuspendReason.PROMOTED); Review Comment: Thanks! I agree. I'm going to implement this in the `StoreChangelogReader` adding a new argument for the `#unregister` method and cover the case for `MIGRATED` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
agavra commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1393203579 ## streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java: ## @@ -97,21 +128,21 @@ protected Materialized(final Materialized materialized) { this.cachingEnabled = materialized.cachingEnabled; this.topicConfig = materialized.topicConfig; this.retention = materialized.retention; -this.storeType = materialized.storeType; +this.storeSuppliers = materialized.storeSuppliers; } /** - * Materialize a {@link StateStore} with the given {@link StoreType}. + * Materialize a {@link StateStore} with the given {@link DslStoreSuppliers}. * - * @param storeType the type of the state store - * @paramkey type of the store - * @paramvalue type of the store - * @paramtype of the {@link StateStore} + * @param storeSuppliers the type of the state store + * @param key type of the store + * @param value type of the store + * @param type of the {@link StateStore} * @return a new {@link Materialized} instance with the given storeName */ -public static Materialized as(final StoreType storeType) { -Objects.requireNonNull(storeType, "store type can't be null"); -return new Materialized<>(storeType); +public static Materialized as(final DslStoreSuppliers storeSuppliers) { Review Comment: yup! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
agavra commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1393194762 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -536,6 +537,11 @@ public class StreamsConfig extends AbstractConfig { public static final String ROCKS_DB = "rocksDB"; public static final String IN_MEMORY = "in_memory"; +/** {@code dsl.store.suppliers.class } */ +public static final String DSL_STORE_SUPPLIERS_CLASS_CONFIG = "dsl.store.suppliers.class"; +public static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store implementations to plug in to DSL operators. Must implement the org.apache.kafka.streams.state.DslStoreSuppliers interface."; Review Comment: kept package private so it can be used in `TopologyConfig` as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13152: Kip 770 buffer size fix [kafka]
vamossagar12 commented on PR #13283: URL: https://github.com/apache/kafka/pull/13283#issuecomment-1811121235 Hey @sean-rossignol , thanks for explaining your use case and how this can be useful for you! I hope to push it in the upcoming 3.7 release. I haven't gotten the chance to resolve the merge conflicts. Will try to do so this week and maybe you could help with the reviews? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15513) KRaft cluster fails with SCRAM authentication enabled for control-plane
[ https://issues.apache.org/jira/browse/KAFKA-15513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17786046#comment-17786046 ] Proven Provenzano commented on KAFKA-15513: --- The role of "{{{}kafka-storage --add-scram{}}}" is to allow brokers to use SCRAM to talk with each other without having to use another authentication scheme to bootstrap. Using ZK the process of getting broker to broker authentication to use SCRAM involved setting up ZK first, then adding the SCRAM credentials directly to ZK, and then starting the brokers up. In KRAFT you cannot add SCRAM credentials directly to controllers so to facilitate the bootstrap we allow metadata records to be added at Controller initialization. > KRaft cluster fails with SCRAM authentication enabled for control-plane > --- > > Key: KAFKA-15513 > URL: https://issues.apache.org/jira/browse/KAFKA-15513 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.6.0, 3.5.1 >Reporter: migruiz4 >Priority: Major > > We have observed a scenario where a KRaft cluster fails to bootstrap when > using SCRAM authentication for controller-to-controller communications. > The steps to reproduce are simple: > * Deploy (at least) 2 Kafka servers using latest version 3.5.1. > * Configure a KRaft cluster, where the controller listener uses > SASL_PLAINTEXT + SCRAM-SHA-256 or SCRAM-SHA-512. In my case, I'm using the > recommended in-line jaas config > '{{{}listener.name..scram-sha-512.sasl.jaas.config{}}}' > * Run 'kafka-storage.sh' in both nodes using option '--add-scram' to create > the SCRAM user. > When initialized, Controllers will fail to connect to each other with an > authentication error: > > {code:java} > [2023-08-01 11:12:45,295] ERROR [kafka-1-raft-outbound-request-thread]: > Failed to send the following request due to authentication error: > ClientRequest(expectResponse=true, > callback=kafka.raft.KafkaNetworkChannel$$Lambda$687/0x7f27d443fc60@2aba6075, > destination=0, correlationId=129, clientId=raft-client-1, > createdTimeMs=1690888364960, > requestBuilder=VoteRequestData(clusterId='abcdefghijklmnopqrstug', > topics=[TopicData(topicName='__cluster_metadata', > partitions=[PartitionData(partitionIndex=0, candidateEpoch=4, candidateId=1, > lastOffsetEpoch=0, lastOffset=0)])])) (kafka.raft.RaftSendThread) {code} > Some additional details about the scenario that we tested out: > * Controller listener does work when configured with SASL+PLAIN > * The issue only affects the Controller listener, SCRAM users created using > the same method work for data-plane listeners and inter-broker listeners. > > Below you can find the exact configuration and command used to deploy: > * server.properties > {code:java} > listeners=INTERNAL://:9092,CLIENT://:9091,CONTROLLER://:9093 > advertised.listeners=INTERNAL://kafka-0:9092,CLIENT://:9091 > listener.security.protocol.map=INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,CONTROLLER:SASL_PLAINTEXT > num.network.threads=3 > num.io.threads=8 > socket.send.buffer.bytes=102400 > socket.receive.buffer.bytes=102400 > socket.request.max.bytes=104857600 > log.dirs=/bitnami/kafka/data > num.partitions=1 > num.recovery.threads.per.data.dir=1 > offsets.topic.replication.factor=1 > transaction.state.log.replication.factor=1 > transaction.state.log.min.isr=1 > log.retention.hours=168 > log.retention.check.interval.ms=30 > controller.listener.names=CONTROLLER > controller.quorum.voters=0@kafka-0:9093,1@kafka-1:9093 > inter.broker.listener.name=INTERNAL > node.id=0 > process.roles=controller,broker > sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512 > sasl.mechanism.controller.protocol=SCRAM-SHA-512 > listener.name.controller.sasl.enabled.mechanisms=SCRAM-SHA-512 > listener.name.controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule > required username="controller_user" password="controller_password";{code} > * kafka-storage.sh command > {code:java} > kafka-storage.sh format --config /path/to/server.properties > --ignore-formatted --cluster-id abcdefghijklmnopqrstuv --add-scram > SCRAM-SHA-512=[name=controller_user,password=controller_password] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15825) KRaft controller writes empty state to ZK after migration
[ https://issues.apache.org/jira/browse/KAFKA-15825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-15825: - Component/s: kraft > KRaft controller writes empty state to ZK after migration > - > > Key: KAFKA-15825 > URL: https://issues.apache.org/jira/browse/KAFKA-15825 > Project: Kafka > Issue Type: Bug > Components: controller, kraft >Affects Versions: 3.6.0 >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > Fix For: 3.7.0, 3.6.1 > > > Immediately following the ZK migration, there is a race condition where the > KRaftMigrationDriver can use an empty MetadataImage when performing the full > "SYNC_KRAFT_TO_ZK" reconciliation. > After the next controller failover, or when the controller loads a metadata > snapshot, the correct state will be written to ZK. > The symptom of this bug is that we see the migration complete, and then all > the metadata removed from ZK. For example, > {code} > [KRaftMigrationDriver id=9990] Completed migration of metadata from ZooKeeper > to KRaft. 573 records were generated in 2204 ms across 51 batches. The record > types were {TOPIC_RECORD=41, PARTITION_RECORD=410, CONFIG_RECORD=121, > PRODUCER_IDS_RECORD=1}. The current metadata offset is now 503794 with an > epoch of 21. Saw 6 brokers in the migrated metadata [0, 1, 2, 3, 4, 5]. > {code} > immediately followed by: > {code} > [KRaftMigrationDriver id=9990] Made the following ZK writes when reconciling > with KRaft state: {DeleteBrokerConfig=7, DeleteTopic=41, UpdateTopicConfig=41} > {code} > If affected by this, a quick workaround is to cause the controller to > failover. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15818) Implement max poll internval
[ https://issues.apache.org/jira/browse/KAFKA-15818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-15818: --- Labels: consumer consumer-threading-refactor (was: ) > Implement max poll internval > > > Key: KAFKA-15818 > URL: https://issues.apache.org/jira/browse/KAFKA-15818 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Priority: Blocker > Labels: consumer, consumer-threading-refactor > > The consumer needs to be polled at a candance lower than > MAX_POLL_INTERVAL_MAX otherwise the consumer should try to leave the group. > Currently, we send an acknowledgment event to the network thread per poll. > The event only triggers update on autocommit state, we need to implement > updating the poll timer so that the consumer can leave the group when the > timer expires. > > The current logic looks like this: > {code:java} > if (heartbeat.pollTimeoutExpired(now)) { > // the poll timeout has expired, which means that the foreground thread > has stalled > // in between calls to poll(). > log.warn("consumer poll timeout has expired. This means the time between > subsequent calls to poll() " + > "was longer than the configured max.poll.interval.ms, which typically > implies that " + > "the poll loop is spending too much time processing messages. You can > address this " + > "either by increasing max.poll.interval.ms or by reducing the maximum > size of batches " + > "returned in poll() with max.poll.records."); > maybeLeaveGroup("consumer poll timeout has expired."); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15826: Close consumer when sink task is cancelled [kafka]
gharris1727 opened a new pull request, #14762: URL: https://github.com/apache/kafka/pull/14762 Currently the only place that the consumer is closed is on the task thread, which may be blocked indefinitely by the task plugin. Similar to AbstractWorkerSourceTask, we should close the consumer here to allow for cleanup to take place. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]
jolshan commented on code in PR #14753: URL: https://github.com/apache/kafka/pull/14753#discussion_r1393093702 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -898,20 +900,19 @@ class ReplicaManager(val config: KafkaConfig, } actionQueue.add { - () => -allResults.foreach { case (topicPartition, result) => - val requestKey = TopicPartitionOperationKey(topicPartition) - result.info.leaderHwChange match { -case LeaderHwChange.INCREASED => - // some delayed operations may be unblocked after HW changed - delayedProducePurgatory.checkAndComplete(requestKey) - delayedFetchPurgatory.checkAndComplete(requestKey) - delayedDeleteRecordsPurgatory.checkAndComplete(requestKey) -case LeaderHwChange.SAME => - // probably unblock some follower fetch requests since log end offset has been updated - delayedFetchPurgatory.checkAndComplete(requestKey) -case LeaderHwChange.NONE => -// nothing + () => allResults.foreach { case (topicPartition, result) => +val requestKey = TopicPartitionOperationKey(topicPartition) +result.info.leaderHwChange match { + case LeaderHwChange.INCREASED => +// some delayed operations may be unblocked after HW changed +delayedProducePurgatory.checkAndComplete(requestKey) +delayedFetchPurgatory.checkAndComplete(requestKey) +delayedDeleteRecordsPurgatory.checkAndComplete(requestKey) + case LeaderHwChange.SAME => +// probably unblock some follower fetch requests since log end offset has been updated +delayedFetchPurgatory.checkAndComplete(requestKey) + case LeaderHwChange.NONE => + // nothing Review Comment: I think it was the same here, but I can move it. https://github.com/apache/kafka/commit/7d147cf2413e5d361422728e5c9306574658c78d -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15816: Fix leaked sockets in mirror [kafka]
gharris1727 opened a new pull request, #14761: URL: https://github.com/apache/kafka/pull/14761 These socket leaks were caused by typos, either an extra call to start() or a missing call to close(). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15826) WorkerSinkTask leaks Consumer if plugin start or stop blocks indefinitely
Greg Harris created KAFKA-15826: --- Summary: WorkerSinkTask leaks Consumer if plugin start or stop blocks indefinitely Key: KAFKA-15826 URL: https://issues.apache.org/jira/browse/KAFKA-15826 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 3.6.0 Reporter: Greg Harris Assignee: Greg Harris The WorkerSourceTask cancel() method closes the Producer, releasing it's resources. The WorkerSInkTask does not do the same for the Consumer, as it does not override the cancel() method. WorkerSinkTask should close the consumer if the task is cancelled, as progress for a cancelled task will be discarded anyway. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15825) KRaft controller writes empty state to ZK after migration
[ https://issues.apache.org/jira/browse/KAFKA-15825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur resolved KAFKA-15825. -- Resolution: Fixed This bug was fixed as part of KAFKA-15605 > KRaft controller writes empty state to ZK after migration > - > > Key: KAFKA-15825 > URL: https://issues.apache.org/jira/browse/KAFKA-15825 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 3.6.0 >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > Fix For: 3.7.0, 3.6.1 > > > Immediately following the ZK migration, there is a race condition where the > KRaftMigrationDriver can use an empty MetadataImage when performing the full > "SYNC_KRAFT_TO_ZK" reconciliation. > After the next controller failover, or when the controller loads a metadata > snapshot, the correct state will be written to ZK. > The symptom of this bug is that we see the migration complete, and then all > the metadata removed from ZK. For example, > {code} > [KRaftMigrationDriver id=9990] Completed migration of metadata from ZooKeeper > to KRaft. 573 records were generated in 2204 ms across 51 batches. The record > types were {TOPIC_RECORD=41, PARTITION_RECORD=410, CONFIG_RECORD=121, > PRODUCER_IDS_RECORD=1}. The current metadata offset is now 503794 with an > epoch of 21. Saw 6 brokers in the migrated metadata [0, 1, 2, 3, 4, 5]. > {code} > immediately followed by: > {code} > [KRaftMigrationDriver id=9990] Made the following ZK writes when reconciling > with KRaft state: {DeleteBrokerConfig=7, DeleteTopic=41, UpdateTopicConfig=41} > {code} > If affected by this, a quick workaround is to cause the controller to > failover. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15825) KRaft controller writes empty state to ZK after migration
David Arthur created KAFKA-15825: Summary: KRaft controller writes empty state to ZK after migration Key: KAFKA-15825 URL: https://issues.apache.org/jira/browse/KAFKA-15825 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 3.6.0 Reporter: David Arthur Assignee: David Arthur Fix For: 3.7.0, 3.6.1 Immediately following the ZK migration, there is a race condition where the KRaftMigrationDriver can use an empty MetadataImage when performing the full "SYNC_KRAFT_TO_ZK" reconciliation. After the next controller failover, or when the controller loads a metadata snapshot, the correct state will be written to ZK. The symptom of this bug is that we see the migration complete, and then all the metadata removed from ZK. For example, {code} [KRaftMigrationDriver id=9990] Completed migration of metadata from ZooKeeper to KRaft. 573 records were generated in 2204 ms across 51 batches. The record types were {TOPIC_RECORD=41, PARTITION_RECORD=410, CONFIG_RECORD=121, PRODUCER_IDS_RECORD=1}. The current metadata offset is now 503794 with an epoch of 21. Saw 6 brokers in the migrated metadata [0, 1, 2, 3, 4, 5]. {code} immediately followed by: {code} [KRaftMigrationDriver id=9990] Made the following ZK writes when reconciling with KRaft state: {DeleteBrokerConfig=7, DeleteTopic=41, UpdateTopicConfig=41} {code} If affected by this, a quick workaround is to cause the controller to failover. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15605) Topics marked for deletion in ZK are incorrectly migrated to KRaft
[ https://issues.apache.org/jira/browse/KAFKA-15605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur resolved KAFKA-15605. -- Fix Version/s: 3.7.0 Resolution: Fixed > Topics marked for deletion in ZK are incorrectly migrated to KRaft > -- > > Key: KAFKA-15605 > URL: https://issues.apache.org/jira/browse/KAFKA-15605 > Project: Kafka > Issue Type: Bug > Components: controller, kraft >Affects Versions: 3.6.0 >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > Fix For: 3.7.0, 3.6.1 > > > When migrating topics from ZooKeeper, the KRaft controller reads all the > topic and partition metadata from ZK directly. This includes topics which > have been marked for deletion by the ZK controller. After being migrated to > KRaft, the pending topic deletions are never completed, so it is as if the > delete topic request never happened. > Since the client request to delete these topics has already been returned as > successful, it would be confusing to the client that the topic still existed. > An operator or application would need to issue another topic deletion to > remove these topics once the controller had moved to KRaft. If they tried to > create a new topic with the same name, they would receive a > TOPIC_ALREADY_EXISTS error. > The migration logic should carry over pending topic deletions and resolve > them either as part of the migration or shortly after. > *Note to operators:* > To determine if a migration was affected by this, an operator can check the > contents of {{/admin/delete_topics}} after the KRaft controller has migrated > the metadata. If any topics are listed under this ZNode, they were not > deleted and will still be present in KRaft. At this point the operator can > make a determination if the topics should be re-deleted (using > "kafka-topics.sh --delete") or left in place. In either case, the topics > should be removed from {{/admin/delete_topics}} to prevent unexpected topic > deletion in the event of a fallback to ZK. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15823: disconnect from controller on AuthenticationException [kafka]
gaurav-narula opened a new pull request, #14760: URL: https://github.com/apache/kafka/pull/14760 This PR changes the handling of authenticationException on a request from the node to the controller. We disconnect controller connection and invalidate the cache so that the next run of the thread will establish a connection with the (potentially) updated controller. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]
junrao commented on PR #14632: URL: https://github.com/apache/kafka/pull/14632#issuecomment-1810855040 @apoorvmittal10 : Thanks for looking into the test failures. There is an ongoing discussion on requiring a green build before merging the PR. I will need to wait for the result of that discussion before merging the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]
philipnee commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1810853645 Basically future.get() API only return 3 types of exceptions: ExecutionException, InterruptedException, and Cancellation per documentation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]
philipnee commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1810851585 Hi @cadonna - When the consumer is woken up. The WakeupTrigger should complete the future exceptionally with WakeupException. To rethrow that exception during future.get(), you will need to examine the ExecutionException kind of like this: ``` catch (ExecutionException e) { Throwable t = e.getCause(); if (t instanceof WakeupException) throw new WakeupException(); else if (t instanceof KafkaException) throw (KafkaException) t; else throw new KafkaException(t); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15802: Validate remote segment state before fetching index [kafka]
jeqo commented on PR #14759: URL: https://github.com/apache/kafka/pull/14759#issuecomment-1810780948 cc @satishd -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15174: Ensure CommitAsync propagate the exception to the user [kafka]
philipnee commented on PR #14680: URL: https://github.com/apache/kafka/pull/14680#issuecomment-1810775229 Hi @lucasbru - Yes. It is not quite feasible to use the backgroundEventQueue because of the behavior of the current API, i.e., the callback needs to be invoked ouside of the poll call. If we use the backgroundEventQueue, then the operation can become quite complicated, i.e. we will need to scan through the queue, execute the callback, and remove the callback from the queue. @kirktrue - mind comment here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]
hachikuji commented on code in PR #14753: URL: https://github.com/apache/kafka/pull/14753#discussion_r1392979091 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -898,20 +900,19 @@ class ReplicaManager(val config: KafkaConfig, } actionQueue.add { - () => -allResults.foreach { case (topicPartition, result) => - val requestKey = TopicPartitionOperationKey(topicPartition) - result.info.leaderHwChange match { -case LeaderHwChange.INCREASED => - // some delayed operations may be unblocked after HW changed - delayedProducePurgatory.checkAndComplete(requestKey) - delayedFetchPurgatory.checkAndComplete(requestKey) - delayedDeleteRecordsPurgatory.checkAndComplete(requestKey) -case LeaderHwChange.SAME => - // probably unblock some follower fetch requests since log end offset has been updated - delayedFetchPurgatory.checkAndComplete(requestKey) -case LeaderHwChange.NONE => -// nothing + () => allResults.foreach { case (topicPartition, result) => +val requestKey = TopicPartitionOperationKey(topicPartition) +result.info.leaderHwChange match { + case LeaderHwChange.INCREASED => +// some delayed operations may be unblocked after HW changed +delayedProducePurgatory.checkAndComplete(requestKey) +delayedFetchPurgatory.checkAndComplete(requestKey) +delayedDeleteRecordsPurgatory.checkAndComplete(requestKey) + case LeaderHwChange.SAME => +// probably unblock some follower fetch requests since log end offset has been updated +delayedFetchPurgatory.checkAndComplete(requestKey) + case LeaderHwChange.NONE => + // nothing Review Comment: nit: looks misaligned -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15802: Validate remote segment state before fetching index [kafka]
jeqo opened a new pull request, #14759: URL: https://github.com/apache/kafka/pull/14759 Cherrypick from https://github.com/apache/kafka/pull/14727 into trunk ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]
jolshan commented on code in PR #14753: URL: https://github.com/apache/kafka/pull/14753#discussion_r1392978203 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -813,7 +813,8 @@ class ReplicaManager(val config: KafkaConfig, recordConversionStatsCallback, timeout, responseCallback, - delayedProduceLock + delayedProduceLock, + actionQueue Review Comment: added the new name to the class value -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: WakeupTrigger cleanup [kafka]
philipnee commented on PR #14752: URL: https://github.com/apache/kafka/pull/14752#issuecomment-1810767025 Hi @kirktrue Thanks for taking time to clean up the task. I left some comments there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: WakeupTrigger cleanup [kafka]
philipnee commented on code in PR #14752: URL: https://github.com/apache/kafka/pull/14752#discussion_r1392968241 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java: ## @@ -24,85 +25,133 @@ import java.util.concurrent.atomic.AtomicReference; /** - * Ensures blocking APIs can be woken up by the consumer.wakeup(). + * Ensures blocking APIs can be woken up by {@link Consumer#wakeup()}. */ -public class WakeupTrigger { -private final AtomicReference pendingTask = new AtomicReference<>(null); +class WakeupTrigger { + +private final static Object FAIL_NEXT_MARKER = new Object(); +private final AtomicReference activeTask = new AtomicReference<>(null); /** - * Wakeup a pending task. If there isn't any pending task, return a WakeupFuture, so that the subsequent call - * would know wakeup was previously called. - * - * If there are active tasks, complete it with WakeupException, then unset pending task (return null here. - * If the current task has already been woken-up, do nothing. + * Wakeup a pending task. + * + * + * + * There are three cases that can happen when this method is invoked: + * + * + * + * If there is no active task from a previous call to {@code setActiveTask}, we set an + * internal indicator that the next attempt to start a long-running task (via a call to + * {@link #setActiveTask(CompletableFuture)}) will fail with a {@link WakeupException}. + * + * + * If there is an active task (i.e. there was a previous call to + * {@link #setActiveTask(CompletableFuture)} for a long-running task), fail it via + * {@link CompletableFuture#completeExceptionally(Throwable)} and then clear the active task. + * + * + * If there is already an pending wakeup from a previous call to {@link Consumer#wakeup()}, do nothing. + * We keep the internal state as is so that the future calls to {@link #setActiveTask(CompletableFuture)} + * will fail as expected. + * + * */ -public void wakeup() { -pendingTask.getAndUpdate(task -> { -if (task == null) { -return new WakeupFuture(); -} else if (task instanceof ActiveFuture) { -ActiveFuture active = (ActiveFuture) task; -active.future().completeExceptionally(new WakeupException()); +void wakeup() { +activeTask.getAndUpdate(existingTask -> { +if (existingTask == null) { +// If there isn't an existing task, return our marker, so that the subsequent call will +// know wakeup was previously called. +return FAIL_NEXT_MARKER; +} else if (existingTask instanceof CompletableFuture) { +// If there is an existing "active" task, complete it with WakeupException. +CompletableFuture active = (CompletableFuture) existingTask; +active.completeExceptionally(new WakeupException()); + +// We return a null here to effectively unset the "active" task. return null; } else { -return task; +// This is the case where the existing task is the wakeup marker. So the user has apparently +// called Consumer.wakeup() more than once. +return existingTask; } }); } /** - * If there is no pending task, set the pending task active. - * If wakeup was called before setting an active task, the current task will complete exceptionally with - * WakeupException right - * away. - * if there is an active task, throw exception. - * @param currentTask - * @param - * @return + * This method should be called before execution a blocking operation in the {@link Consumer}. This will + * store an internal reference to the given active {@link CompletableFuture task} that can be + * {@link CompletableFuture#completeExceptionally(Throwable) forcibly failed} if the user invokes the + * {@link Consumer#wakeup()} call before or during its execution. + * + * + * + * There are three cases that can happen when this method is invoked: + * + * + * + * If there is no active task from a previous call to {@code setActiveTask} and no + * previous calls to {@link #wakeup()}, set the given {@link CompletableFuture} as the + * active task. + * + * + * If there was a previous call to {@link #wakeup()}, the given {@link CompletableFuture task} will fail + * via {@link CompletableFuture#completeExceptionally(Throwable)} and the active task will be Review Comment: similarly - should we ment
Re: [PR] MINOR: WakeupTrigger cleanup [kafka]
philipnee commented on code in PR #14752: URL: https://github.com/apache/kafka/pull/14752#discussion_r1392964486 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java: ## @@ -24,85 +25,133 @@ import java.util.concurrent.atomic.AtomicReference; /** - * Ensures blocking APIs can be woken up by the consumer.wakeup(). + * Ensures blocking APIs can be woken up by {@link Consumer#wakeup()}. */ -public class WakeupTrigger { -private final AtomicReference pendingTask = new AtomicReference<>(null); +class WakeupTrigger { + +private final static Object FAIL_NEXT_MARKER = new Object(); +private final AtomicReference activeTask = new AtomicReference<>(null); /** - * Wakeup a pending task. If there isn't any pending task, return a WakeupFuture, so that the subsequent call - * would know wakeup was previously called. - * - * If there are active tasks, complete it with WakeupException, then unset pending task (return null here. - * If the current task has already been woken-up, do nothing. + * Wakeup a pending task. + * + * + * + * There are three cases that can happen when this method is invoked: + * + * + * + * If there is no active task from a previous call to {@code setActiveTask}, we set an + * internal indicator that the next attempt to start a long-running task (via a call to + * {@link #setActiveTask(CompletableFuture)}) will fail with a {@link WakeupException}. + * + * + * If there is an active task (i.e. there was a previous call to + * {@link #setActiveTask(CompletableFuture)} for a long-running task), fail it via + * {@link CompletableFuture#completeExceptionally(Throwable)} and then clear the active task. Review Comment: should we mention about WakeupException - something like active task compelted exceptionally via WakeupException -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: WakeupTrigger cleanup [kafka]
philipnee commented on code in PR #14752: URL: https://github.com/apache/kafka/pull/14752#discussion_r1392962733 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java: ## @@ -24,85 +25,133 @@ import java.util.concurrent.atomic.AtomicReference; /** - * Ensures blocking APIs can be woken up by the consumer.wakeup(). + * Ensures blocking APIs can be woken up by {@link Consumer#wakeup()}. */ -public class WakeupTrigger { -private final AtomicReference pendingTask = new AtomicReference<>(null); +class WakeupTrigger { + +private final static Object FAIL_NEXT_MARKER = new Object(); +private final AtomicReference activeTask = new AtomicReference<>(null); /** - * Wakeup a pending task. If there isn't any pending task, return a WakeupFuture, so that the subsequent call - * would know wakeup was previously called. - * - * If there are active tasks, complete it with WakeupException, then unset pending task (return null here. - * If the current task has already been woken-up, do nothing. + * Wakeup a pending task. + * + * + * + * There are three cases that can happen when this method is invoked: + * + * + * + * If there is no active task from a previous call to {@code setActiveTask}, we set an + * internal indicator that the next attempt to start a long-running task (via a call to + * {@link #setActiveTask(CompletableFuture)}) will fail with a {@link WakeupException}. + * + * + * If there is an active task (i.e. there was a previous call to + * {@link #setActiveTask(CompletableFuture)} for a long-running task), fail it via + * {@link CompletableFuture#completeExceptionally(Throwable)} and then clear the active task. + * + * + * If there is already an pending wakeup from a previous call to {@link Consumer#wakeup()}, do nothing. Review Comment: "a pending" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1392954694 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +347,537 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]
jolshan commented on code in PR #14753: URL: https://github.com/apache/kafka/pull/14753#discussion_r1392953096 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -813,7 +813,8 @@ class ReplicaManager(val config: KafkaConfig, recordConversionStatsCallback, timeout, responseCallback, - delayedProduceLock + delayedProduceLock, + actionQueue Review Comment: yeah. I think it could be useful to rename action queue so that the compiler catches such a change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Re-add action queue parameter removed from appendRecords [kafka]
artemlivshits commented on code in PR #14753: URL: https://github.com/apache/kafka/pull/14753#discussion_r1392946436 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -813,7 +813,8 @@ class ReplicaManager(val config: KafkaConfig, recordConversionStatsCallback, timeout, responseCallback, - delayedProduceLock + delayedProduceLock, + actionQueue Review Comment: Wow, this is so subtle, I stared at the change for some time to understand what it actually does; it's almost impossible to spot in the code review and the compiler cannot help either, what can we do to catch these issues in the future? I think we should rename the member variable to be something like `defaultActionQueue` so that if we don't pass the `actionQueue` the compiler would catch it. Another question (probably to @dajac) -- is passing the `actionQueue` in the argument just an optimization or a correctness issue? If it's just an optimization, I wonder what would be the effects of removing it and reduce the overall system complexity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
soarez commented on PR #14369: URL: https://github.com/apache/kafka/pull/14369#issuecomment-1810719729 > avoids calling exit(1) in junit tests, which will kill Jenkins dead (even after 3 decades of Java, we don't have the technology to intercept exit() in unit testrs >:( ) @cmccabe I'm confused, I thought that was the whole point of `kafka.utils.Exit` https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/Exit.scala#L21-L24 https://github.com/apache/kafka/blob/00e5803cd3af89011254e734232308618403309d/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java The `FaultHandler` concept in `QuorumController` seems great, but I'm not sure how it fits here: afaict the fault is always fatal and we can intercept it in tests - what am I missing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield opened a new pull request, #14758: URL: https://github.com/apache/kafka/pull/14758 This PR parameterizes the consumer integration tests so they can be run against the existing "generic" group protocol and the new "consumer" group protocol introduced in KIP-848. The KIP-848 client code is under construction so some of the tests do not run on both variants to start with, but the idea is that the tests can be enabled as the gaps in functionality are closed. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet
[ https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayank Shekhar Narula updated KAFKA-15824: -- Reviewer: (was: Walker Carlson) > SubscriptionState's maybeValidatePositionForCurrentLeader should handle > partition which isn't subscribed yet > > > Key: KAFKA-15824 > URL: https://issues.apache.org/jira/browse/KAFKA-15824 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Mayank Shekhar Narula >Assignee: Mayank Shekhar Narula >Priority: Major > Fix For: 3.7.0 > > > As can be seen [here|#L453], maybeValidatePositionForCurrentLeader doesn't > check if partition is subscribed by checking TopicPartitionState cached is > null or not, as done by > [maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]]. > So it throws IllegalStateException for a partition that is yet not > subscribed. > Lack of this check writing thread-safe code w.r.t SubscriptionState class is > awkward. This can be seen from the example code below. For example, at line 1 > partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could > be removed from subscribed partitions(in a separate thread). So this forces > the user of this class to handle IllegalStateException which is awkward. > {code:java} > // Following is example code for the user of > SubscriptionState::maybeValidatePositionForCurrentLeader > Set allCurrentlySubscribedTopics = > subscriptionState.assignedPartitions(); // line 1 > if(allCurrentlySubscribedTopics.contains(tp)) { > ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = > metadata.currentLeader(tp); > try() { > subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, > leaderAndEpoch); // line 2 > } catch (IllegalStateException e) { >// recover from it. // line 3 > } > }{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet
[ https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayank Shekhar Narula updated KAFKA-15824: -- Description: As can be seen [here|#L453], maybeValidatePositionForCurrentLeader doesn't check if partition is subscribed by checking TopicPartitionState cached is null or not, as done by [maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]]. So it throws IllegalStateException for a partition that is yet not subscribed. Lack of this check writing thread-safe code w.r.t SubscriptionState class is awkward. This can be seen from the example code below. For example, at line 1 partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be removed from subscribed partitions(in a separate thread). So this forces the user of this class to handle IllegalStateException which is awkward. {code:java} // Following is example code for the user of SubscriptionState::maybeValidatePositionForCurrentLeader Set allCurrentlySubscribedTopics = subscriptionState.assignedPartitions(); // line 1 if(allCurrentlySubscribedTopics.contains(tp)) { ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); try() { subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, leaderAndEpoch); // line 2 } catch (IllegalStateException e) { // recover from it. // line 3 } }{code} was: As can be seen [here|#L453]], maybeValidatePositionForCurrentLeader doesn't check if partition is subscribed by checking TopicPartitionState cached is null or not, as done by [maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]]. So it throws IllegalStateException for a partition that is yet not subscribed. Lack of this check writing thread-safe code w.r.t SubscriptionState class is awkward. This can be seen from the example code below. For example, at line 1 partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be removed from subscribed partitions(in a separate thread). So this forces the user of this class to handle IllegalStateException which is awkward. {code:java} // Following is example code for the user of SubscriptionState::maybeValidatePositionForCurrentLeader Set allCurrentlySubscribedTopics = subscriptionState.assignedPartitions(); // line 1 if(allCurrentlySubscribedTopics.contains(tp)) { ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); try() { subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, leaderAndEpoch); // line 2 } catch (IllegalStateException e) { // recover from it. // line 3 } }{code} > SubscriptionState's maybeValidatePositionForCurrentLeader should handle > partition which isn't subscribed yet > > > Key: KAFKA-15824 > URL: https://issues.apache.org/jira/browse/KAFKA-15824 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Mayank Shekhar Narula >Assignee: Mayank Shekhar Narula >Priority: Major > Fix For: 3.7.0 > > > As can be seen [here|#L453], maybeValidatePositionForCurrentLeader doesn't > check if partition is subscribed by checking TopicPartitionState cached is > null or not, as done by > [maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]]. > So it throws IllegalStateException for a partition that is yet not > subscribed. > Lack of this check writing thread-safe code w.r.t SubscriptionState class is > awkward. This can be seen from the example code below. For example, at line 1 > partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could > be removed from subscribed partitions(in a separate thread). So this forces > the user of this class to handle IllegalStateException which is awkward. > {code:java} > // Following is example code for the user of > SubscriptionState::maybeValidatePositionForCurrentLeader > Set allCurrentlySubscribedTopics = > subscriptionState.assignedPartitions(); // line 1 > if(allCurrentlySubscribedTopics.contains(tp)) { > ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = > metadata.currentLeader(tp); > try() { > subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, > leaderAndEpoch); // line 2 > } catch (IllegalStateException e)
[PR] Kip 951 SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet [kafka]
msn-tldr opened a new pull request, #14757: URL: https://github.com/apache/kafka/pull/14757 See the motivation in jira description https://issues.apache.org/jira/browse/KAFKA-15824 Noticed this was discovered as `ReassignReplicaShrinkTest` started to fail with KIP-951 changes. KIP-951 changes since then have been reverted([PR]( https://github.com/apache/kafka/pull/1)), would be put back on this is in. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet
[ https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayank Shekhar Narula updated KAFKA-15824: -- Description: As can be seen [here|#L453]], maybeValidatePositionForCurrentLeader doesn't check if partition is subscribed by checking TopicPartitionState cached is null or not, as done by [maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]]. So it throws IllegalStateException for a partition that is yet not subscribed. Lack of this check writing thread-safe code w.r.t SubscriptionState class is awkward. This can be seen from the example code below. For example, at line 1 partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be removed from subscribed partitions(in a separate thread). So this forces the user of this class to handle IllegalStateException which is awkward. {code:java} // Following is example code for the user of SubscriptionState::maybeValidatePositionForCurrentLeader Set allCurrentlySubscribedTopics = subscriptionState.assignedPartitions(); // line 1 if(allCurrentlySubscribedTopics.contains(tp)) { ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); try() { subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, leaderAndEpoch); // line 2 } catch (IllegalStateException e) { // recover from it. // line 3 } }{code} was: As can be seen [here|#L453]], maybeValidatePositionForCurrentLeader doesn't check if partition is subscribed by checking TopicPartitionState cached is null or not, as done by [maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]]. So it throws IllegalStateException for a partition that is yet not subscribed. Lack of this check prevents writing thread-safe code w.r.t SubscriptionState class, this can be seen from the example code below. For example, at line 1 partA would be in allCurrentlySubscribedTopics, but at line 2 it could be removed from subscribed partitions. {code:java} // Following is example code for the user of SubscriptionState::maybeValidatePositionForCurrentLeader Set allCurrentlySubscribedTopics = subscriptionState.assignedPartitions(); // line 1 if(allCurrentlySubscribedTopics.contains(tp)) { ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); try() { subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, leaderAndEpoch); // line 2 } catch (IllegatonStateException e) { // recover from it. // line 3 } }{code} > SubscriptionState's maybeValidatePositionForCurrentLeader should handle > partition which isn't subscribed yet > > > Key: KAFKA-15824 > URL: https://issues.apache.org/jira/browse/KAFKA-15824 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Mayank Shekhar Narula >Assignee: Mayank Shekhar Narula >Priority: Major > Fix For: 3.7.0 > > > As can be seen [here|#L453]], maybeValidatePositionForCurrentLeader doesn't > check if partition is subscribed by checking TopicPartitionState cached is > null or not, as done by > [maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]]. > So it throws IllegalStateException for a partition that is yet not > subscribed. > Lack of this check writing thread-safe code w.r.t SubscriptionState class is > awkward. This can be seen from the example code below. For example, at line 1 > partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could > be removed from subscribed partitions(in a separate thread). So this forces > the user of this class to handle IllegalStateException which is awkward. > {code:java} > // Following is example code for the user of > SubscriptionState::maybeValidatePositionForCurrentLeader > Set allCurrentlySubscribedTopics = > subscriptionState.assignedPartitions(); // line 1 > if(allCurrentlySubscribedTopics.contains(tp)) { > ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = > metadata.currentLeader(tp); > try() { > subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, > leaderAndEpoch); // line 2 > } catch (IllegalStateException e) { >// recover from it. // line 3 > } > }{code} > -- This message was sent by Atlassian Jira (v8.20.10#
[jira] [Updated] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet
[ https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayank Shekhar Narula updated KAFKA-15824: -- Description: As can be seen [here|#L453]], maybeValidatePositionForCurrentLeader doesn't check if partition is subscribed by checking TopicPartitionState cached is null or not, as done by [maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]]. So it throws IllegalStateException for a partition that is yet not subscribed. Lack of this check prevents writing thread-safe code w.r.t SubscriptionState class, this can be seen from the example code below. For example, at line 1 partA would be in allCurrentlySubscribedTopics, but at line 2 it could be removed from subscribed partitions. {code:java} // Following is example code for the user of SubscriptionState::maybeValidatePositionForCurrentLeader Set allCurrentlySubscribedTopics = subscriptionState.assignedPartitions(); // line 1 if(allCurrentlySubscribedTopics.contains(tp)) { ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); try() { subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, leaderAndEpoch); // line 2 } catch (IllegatonStateException e) { // recover from it. // line 3 } }{code} was:As can be seen [here|[https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L453]], it doesn't check if partition is subscribed by checking TopicPartitionState cached is null or not, as done here by maybe > SubscriptionState's maybeValidatePositionForCurrentLeader should handle > partition which isn't subscribed yet > > > Key: KAFKA-15824 > URL: https://issues.apache.org/jira/browse/KAFKA-15824 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Mayank Shekhar Narula >Assignee: Mayank Shekhar Narula >Priority: Major > Fix For: 3.7.0 > > > As can be seen [here|#L453]], maybeValidatePositionForCurrentLeader doesn't > check if partition is subscribed by checking TopicPartitionState cached is > null or not, as done by > [maybeCompleteValidation|[http://example.com|https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L477]]. > So it throws IllegalStateException for a partition that is yet not > subscribed. > Lack of this check prevents writing thread-safe code w.r.t SubscriptionState > class, this can be seen from the example code below. For example, at line 1 > partA would be in allCurrentlySubscribedTopics, but at line 2 it could be > removed from subscribed partitions. > {code:java} > // Following is example code for the user of > SubscriptionState::maybeValidatePositionForCurrentLeader > Set allCurrentlySubscribedTopics = > subscriptionState.assignedPartitions(); // line 1 > if(allCurrentlySubscribedTopics.contains(tp)) { > ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = > metadata.currentLeader(tp); > try() { > subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, > leaderAndEpoch); // line 2 > } catch (IllegatonStateException e) { >// recover from it. // line 3 > } > }{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15376) Explore options of removing data earlier to the current leader's leader epoch lineage for topics enabled with tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785956#comment-17785956 ] Kamal Chandraprakash edited comment on KAFKA-15376 at 11/14/23 4:29 PM: [~divijvaidya] The [example|https://github.com/apache/kafka/pull/13561#discussion_r1293286722] provided in the discussion is misleading. Let's divide the example into two to navigate it easier: Assume that there are two replicas Broker A and Broker B for partition tp0: *Case-1* Both the replicas A and B are insync on startup and they hold the leader-epoch 0. Then, the brokers started to go down in ping-pong fashion. Each broker will hold the following epoch in it's leader-epoch-checkpoint file: A: 0, 2, 4, 6, 8 B: 0, 1, 3, 5, 7 Since this is unclean-leader-election, the logs of Broker A and B might be diverged. As long as anyone of them is online, they continue to serve all the records according to the leader-epoch-checkpoint file. Once both the brokers becomes online, the follower truncates itself up-to the largest common log prefix offset so that the logs won't be diverged between the leader and follower. In this case, we continue to serve the data from the remote storage as no segments will be removed due to leader-epoch-cache truncation since both of them holds the LE0. Note that the approach taken here is similar to local-log where the broker will serve the log that they have until they sync with each other. *Case-2* Both the replicas A and B are out-of-sync on startup and the follower doesn't hold leader-epoch 0. Assume that Broker A is the leader and B is the follower & doesn't hold any data about the partition (empty-disk). When the Broker A goes down, there will be offline partition and B will be elected as unclean leader, the log-end-offset of the partition will be reset back to 0. >From the example provided in the discussion: At T1, Broker A {code:java} - leader-epoch | start-offset | - 0 0 1 180 2 400 - {code} At T2, Broker B, the start-offset will be reset back to 0: (Note that the leader does not interact with remote storage to find the next offset trade-off b/w availability and durabilty) {code:java} - leader-epoch | start-offset | - 3 0 4 780 6 900 7 990 - {code} Now, if we hold the data for both the lineage and ping-pong the brokers, we will be serving the diverged data back to the client for the same fetch-offset depends on the broker which is online. Once, the replicas start to interact with each other, they truncate the remote data themselves based on the current leader epoch lineage. The example provided in the discussion is applicable only for case-2 where the replicas never interacted among themselves at-least once. was (Author: ckamal): [~divijvaidya] The [example|https://github.com/apache/kafka/pull/13561#discussion_r1293286722] provided in the discussion is misleading. Let's divide the example into two to navigate it easier: Assume that there are two replicas Broker A and Broker B for partition tp0: *Case-1* Both the replicas A and B are insync on startup and they hold the leader-epoch 0. Then, the brokers started to go down in ping-pong fashion. Each broker will hold the following epoch in it's leader-epoch-checkpoint file: A: 0, 2, 4, 6, 8 B: 0, 1, 3, 5, 7 Since this is unclean-leader-election, the logs of Broker A and B might be diverged. As long as anyone of them is online, they continue to serve all the records according to the leader-epoch-checkpoint file. Once both the brokers becomes online, the follower truncates itself up-to the largest common log prefix offset so that the logs won't be diverged between the leader and follower. In this case, we continue to serve the data from the remote storage as no segments will be removed due to leader-epoch-cache truncation since both of them holds the LE0. Note that the approach taken here is similar to local-log where the broker will serve the log that they have until they sync with each other. *Case-2* Both the replicas A and B are out-of-sync on startup and the follower doesn't hold leader-epoch 0. Assume that Broker A is the leader and B is the follower & doesn't hold any data about the partition (empty-disk). When the Broker A goes down, there will be offline partition and B will be elected as unclean leader, the log-end-offset of the partition will be reset back to 0. >From the example provided in the discussion: At T1, Broker A {code:java} - leader-epoch | start-offset | -
Re: [PR] KAFKA-15802: validate remote segment state before fetching index [kafka]
satishd merged PR #14727: URL: https://github.com/apache/kafka/pull/14727 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15376) Explore options of removing data earlier to the current leader's leader epoch lineage for topics enabled with tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785962#comment-17785962 ] Kamal Chandraprakash commented on KAFKA-15376: -- With unclean-leader-election enabled, there can be log-divergence, log-loss, and exactly-once-delivery is not applicable. We are trying to extend the same contract that is for local storage to remote when this feature is enabled. There are pros and cons to this feature: *Pros* 1. The replica will serve the data that it seen so far back to the client even if it never interact with any other replica. *Cons* 1. RemoteStorageManager / RemoteLogManager will have additional work to maintain the unreferenced segments and cleaning up them. > Explore options of removing data earlier to the current leader's leader epoch > lineage for topics enabled with tiered storage. > - > > Key: KAFKA-15376 > URL: https://issues.apache.org/jira/browse/KAFKA-15376 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Satish Duggana >Priority: Major > Fix For: 3.7.0 > > > Followup on the discussion thread: > [https://github.com/apache/kafka/pull/13561#discussion_r1288778006] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15513) KRaft cluster fails with SCRAM authentication enabled for control-plane
[ https://issues.apache.org/jira/browse/KAFKA-15513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785958#comment-17785958 ] Christian Lefebvre commented on KAFKA-15513: Hi [~pprovenzano] , isn't it the role of "{{{}kafka-storage --add-scram{}}}" command to create users than may be locally recognized by controller ? > KRaft cluster fails with SCRAM authentication enabled for control-plane > --- > > Key: KAFKA-15513 > URL: https://issues.apache.org/jira/browse/KAFKA-15513 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.6.0, 3.5.1 >Reporter: migruiz4 >Priority: Major > > We have observed a scenario where a KRaft cluster fails to bootstrap when > using SCRAM authentication for controller-to-controller communications. > The steps to reproduce are simple: > * Deploy (at least) 2 Kafka servers using latest version 3.5.1. > * Configure a KRaft cluster, where the controller listener uses > SASL_PLAINTEXT + SCRAM-SHA-256 or SCRAM-SHA-512. In my case, I'm using the > recommended in-line jaas config > '{{{}listener.name..scram-sha-512.sasl.jaas.config{}}}' > * Run 'kafka-storage.sh' in both nodes using option '--add-scram' to create > the SCRAM user. > When initialized, Controllers will fail to connect to each other with an > authentication error: > > {code:java} > [2023-08-01 11:12:45,295] ERROR [kafka-1-raft-outbound-request-thread]: > Failed to send the following request due to authentication error: > ClientRequest(expectResponse=true, > callback=kafka.raft.KafkaNetworkChannel$$Lambda$687/0x7f27d443fc60@2aba6075, > destination=0, correlationId=129, clientId=raft-client-1, > createdTimeMs=1690888364960, > requestBuilder=VoteRequestData(clusterId='abcdefghijklmnopqrstug', > topics=[TopicData(topicName='__cluster_metadata', > partitions=[PartitionData(partitionIndex=0, candidateEpoch=4, candidateId=1, > lastOffsetEpoch=0, lastOffset=0)])])) (kafka.raft.RaftSendThread) {code} > Some additional details about the scenario that we tested out: > * Controller listener does work when configured with SASL+PLAIN > * The issue only affects the Controller listener, SCRAM users created using > the same method work for data-plane listeners and inter-broker listeners. > > Below you can find the exact configuration and command used to deploy: > * server.properties > {code:java} > listeners=INTERNAL://:9092,CLIENT://:9091,CONTROLLER://:9093 > advertised.listeners=INTERNAL://kafka-0:9092,CLIENT://:9091 > listener.security.protocol.map=INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,CONTROLLER:SASL_PLAINTEXT > num.network.threads=3 > num.io.threads=8 > socket.send.buffer.bytes=102400 > socket.receive.buffer.bytes=102400 > socket.request.max.bytes=104857600 > log.dirs=/bitnami/kafka/data > num.partitions=1 > num.recovery.threads.per.data.dir=1 > offsets.topic.replication.factor=1 > transaction.state.log.replication.factor=1 > transaction.state.log.min.isr=1 > log.retention.hours=168 > log.retention.check.interval.ms=30 > controller.listener.names=CONTROLLER > controller.quorum.voters=0@kafka-0:9093,1@kafka-1:9093 > inter.broker.listener.name=INTERNAL > node.id=0 > process.roles=controller,broker > sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512 > sasl.mechanism.controller.protocol=SCRAM-SHA-512 > listener.name.controller.sasl.enabled.mechanisms=SCRAM-SHA-512 > listener.name.controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule > required username="controller_user" password="controller_password";{code} > * kafka-storage.sh command > {code:java} > kafka-storage.sh format --config /path/to/server.properties > --ignore-formatted --cluster-id abcdefghijklmnopqrstuv --add-scram > SCRAM-SHA-512=[name=controller_user,password=controller_password] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet
[ https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayank Shekhar Narula updated KAFKA-15824: -- Description: As can be seen [here|[https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L453]], it doesn't check if partition is subscribed by checking TopicPartitionState cached is null or not, as done here by maybe (was: Right now in java-client, producer-batches backoff upto retry.backoff.ms(100ms by default). This Jira proposes that backoff should be skipped if client knows of a newer-leader for the partition in a sub-sequent retry(typically through refresh of parition-metadata via the Metadata RPC). This would help improve the latency of the produce-request around when partition leadership changes.) > SubscriptionState's maybeValidatePositionForCurrentLeader should handle > partition which isn't subscribed yet > > > Key: KAFKA-15824 > URL: https://issues.apache.org/jira/browse/KAFKA-15824 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Mayank Shekhar Narula >Assignee: Mayank Shekhar Narula >Priority: Major > Fix For: 3.7.0 > > > As can be seen > [here|[https://github.com/apache/kafka/blob/832627fc78484fdc7c8d6da8a2d20e7691dbf882/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L453]], > it doesn't check if partition is subscribed by checking TopicPartitionState > cached is null or not, as done here by maybe -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15802: validate remote segment state before fetching index [kafka]
satishd commented on PR #14727: URL: https://github.com/apache/kafka/pull/14727#issuecomment-1810604177 There are a few unrelated test failures, merging the changes to trunk and 3.6 branches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet
[ https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayank Shekhar Narula updated KAFKA-15824: -- Fix Version/s: (was: 3.6.1) > SubscriptionState's maybeValidatePositionForCurrentLeader should handle > partition which isn't subscribed yet > > > Key: KAFKA-15824 > URL: https://issues.apache.org/jira/browse/KAFKA-15824 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Mayank Shekhar Narula >Assignee: Mayank Shekhar Narula >Priority: Major > Fix For: 3.7.0 > > > Right now in java-client, producer-batches backoff upto > retry.backoff.ms(100ms by default). This Jira proposes that backoff should be > skipped if client knows of a newer-leader for the partition in a sub-sequent > retry(typically through refresh of parition-metadata via the Metadata RPC). > This would help improve the latency of the produce-request around when > partition leadership changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet
[ https://issues.apache.org/jira/browse/KAFKA-15824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayank Shekhar Narula updated KAFKA-15824: -- Issue Type: Bug (was: Improvement) > SubscriptionState's maybeValidatePositionForCurrentLeader should handle > partition which isn't subscribed yet > > > Key: KAFKA-15824 > URL: https://issues.apache.org/jira/browse/KAFKA-15824 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Mayank Shekhar Narula >Assignee: Mayank Shekhar Narula >Priority: Major > Fix For: 3.7.0, 3.6.1 > > > Right now in java-client, producer-batches backoff upto > retry.backoff.ms(100ms by default). This Jira proposes that backoff should be > skipped if client knows of a newer-leader for the partition in a sub-sequent > retry(typically through refresh of parition-metadata via the Metadata RPC). > This would help improve the latency of the produce-request around when > partition leadership changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15824) SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet
Mayank Shekhar Narula created KAFKA-15824: - Summary: SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet Key: KAFKA-15824 URL: https://issues.apache.org/jira/browse/KAFKA-15824 Project: Kafka Issue Type: Improvement Components: clients Reporter: Mayank Shekhar Narula Assignee: Mayank Shekhar Narula Fix For: 3.7.0, 3.6.1 Right now in java-client, producer-batches backoff upto retry.backoff.ms(100ms by default). This Jira proposes that backoff should be skipped if client knows of a newer-leader for the partition in a sub-sequent retry(typically through refresh of parition-metadata via the Metadata RPC). This would help improve the latency of the produce-request around when partition leadership changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15376) Explore options of removing data earlier to the current leader's leader epoch lineage for topics enabled with tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785956#comment-17785956 ] Kamal Chandraprakash commented on KAFKA-15376: -- [~divijvaidya] The [example|https://github.com/apache/kafka/pull/13561#discussion_r1293286722] provided in the discussion is misleading. Let's divide the example into two to navigate it easier: Assume that there are two replicas Broker A and Broker B for partition tp0: *Case-1* Both the replicas A and B are insync on startup and they hold the leader-epoch 0. Then, the brokers started to go down in ping-pong fashion. Each broker will hold the following epoch in it's leader-epoch-checkpoint file: A: 0, 2, 4, 6, 8 B: 0, 1, 3, 5, 7 Since this is unclean-leader-election, the logs of Broker A and B might be diverged. As long as anyone of them is online, they continue to serve all the records according to the leader-epoch-checkpoint file. Once both the brokers becomes online, the follower truncates itself up-to the largest common log prefix offset so that the logs won't be diverged between the leader and follower. In this case, we continue to serve the data from the remote storage as no segments will be removed due to leader-epoch-cache truncation since both of them holds the LE0. Note that the approach taken here is similar to local-log where the broker will serve the log that they have until they sync with each other. *Case-2* Both the replicas A and B are out-of-sync on startup and the follower doesn't hold leader-epoch 0. Assume that Broker A is the leader and B is the follower & doesn't hold any data about the partition (empty-disk). When the Broker A goes down, there will be offline partition and B will be elected as unclean leader, the log-end-offset of the partition will be reset back to 0. >From the example provided in the discussion: At T1, Broker A {code:java} - leader-epoch | start-offset | - 0 0 1 180 2 400 - {code} At T2, Broker B, the start-offset will be reset back to 0: (Note that the leader does not interact with remote storage to find the next offset trade-off b/w availability and durabilty) {code:java} - leader-epoch | start-offset | - 3 0 4 780 6 900 7 990 - {code} Now, if we hold the data for both the lineage and ping-pong the brokers, we will be serving the diverged data back to the client for the same fetch-offset depends on the broker which is online. Once, the replicas start to interact with each other, they truncate the remote data themselves based on the current leader epoch lineage. The example provided in the discussion is applicable only when the replicas never interacted among themselves at-least once. > Explore options of removing data earlier to the current leader's leader epoch > lineage for topics enabled with tiered storage. > - > > Key: KAFKA-15376 > URL: https://issues.apache.org/jira/browse/KAFKA-15376 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Satish Duggana >Priority: Major > Fix For: 3.7.0 > > > Followup on the discussion thread: > [https://github.com/apache/kafka/pull/13561#discussion_r1288778006] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15448: Streams Standby Update Listener (KIP-988) [kafka]
coltmcnealy-lh commented on code in PR #14735: URL: https://github.com/apache/kafka/pull/14735#discussion_r1392849939 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -1012,6 +1022,8 @@ private void prepareChangelogs(final Map tasks, // no records to restore; in this case we just initialize the sensor to zero final long recordsToRestore = Math.max(changelogMetadata.restoreEndOffset - startOffset, 0L); task.recordRestoration(time, recordsToRestore, true); +} else if (changelogMetadata.stateManager.taskType() == TaskType.STANDBY && storeMetadata.endOffset() != null) { Review Comment: I think the `endOffset == null` could happen when the consumer hasn't yet made a `poll()` for a certain partition. Which means that I think it will be null in most cases, honestly. So this is actually a bit of a dilemma. Since in most cases we won't know the end offset until we have made our first call to `poll()` (and then `onBatchLoaded()`), the way the code is currently written makes me think we will almost never have a call to `onUpdateStart()`, which kind of defeats the purpose of the `onUpdateStart()` callback. I see two options here: 1. Pass in some sentinel value when we don't know the `endOffset` upon initialization of the Standby Task. Sentinel value could be `-1` or `null` or `Optional.Empty`...apparently my team thinks I am a really crappy programmer, so I don't have the right to opine on which one :joy: 2. Remove the `endOffset` parameter from the `onUpdateStart()` method signature. This might require changing the KIP but I don't think it would take a vote. Personally, I prefer option 2). In the most common case, we won't have the end offset, so I wouldn't want to "mislead" someone reading the javadoc into thinking that they might get some info that we probably don't have. And the `onUpdateStart()` in practice is normally followed by `onBatchLoaded()` just a few hundred milliseconds later. The `onBatchLoaded()` will contain the `endOffset`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
kirktrue commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1392830639 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -77,32 +138,110 @@ public class MembershipManagerImpl implements MembershipManager { /** * Assignment that the member received from the server and successfully processed. */ -private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment; +private Set currentAssignment; /** - * Assignment that the member received from the server but hasn't completely processed - * yet. + * Subscription state object holding the current assignment the member has for the topics it + * subscribed to. */ -private Optional targetAssignment; +private final SubscriptionState subscriptions; + +/** + * Metadata that allows us to create the partitions needed for {@link ConsumerRebalanceListener}. + */ +private final ConsumerMetadata metadata; + +/** + * TopicPartition comparator based on topic name and partition id. + */ +private final static TopicPartitionComparator COMPARATOR = new TopicPartitionComparator(); /** * Logger. */ private final Logger log; -public MembershipManagerImpl(String groupId, LogContext logContext) { -this(groupId, null, null, logContext); +/** + * Manager to perform commit requests needed before revoking partitions (if auto-commit is + * enabled) + */ +private final CommitRequestManager commitRequestManager; + +/** + * Local cache of assigned topic IDs and names. Topics are added here when received in a + * target assignment, as we discover their names in the Metadata cache, and removed when the + * topic is not in the subscription anymore. The purpose of this cache is to avoid metadata + * requests in cases where a currently assigned topic is in the target assignment (new + * partition assigned, or revoked), but it is not present the Metadata cache at that moment. + * The cache is cleared when the subscription changes ({@link #transitionToJoining()}, the + * member fails ({@link #transitionToFatal()} or leaves the group ({@link #leaveGroup()}). + */ +private final Map assignedTopicNamesCache; + +/** + * Topic IDs received in a target assignment for which we haven't found topic names yet. + * Items are added to this set every time a target assignment is received. Items are removed + * when metadata is found for the topic. This is where the member collects all assignments + * received from the broker, even though they may not be ready to reconcile due to missing + * metadata. + */ +private final Map> assignmentUnresolved; + +/** + * Assignment received for which topic names have been resolved, so it's ready to be + * reconciled. Items are added to this set when received in a target assignment (if metadata + * available), or when a metadata update is received. This is where the member keeps all the + * assignment ready to reconcile, even though the reconciliation might need to wait if there + * is already another on in process. + */ +private final SortedSet assignmentReadyToReconcile; + +/** + * Epoch that a member must include a heartbeat request to indicate that it want to join or + * re-join a group. + */ +public static final int JOIN_GROUP_EPOCH = 0; + +/** + * If there is a reconciliation running (triggering commit, callbacks) for the + * assignmentReadyToReconcile. This will be true if {@link #reconcile()} has been triggered + * after receiving a heartbeat response, or a metadata update. + */ +private boolean reconciliationInProgress; + +/** + * ID the member had when the reconciliation in progress started. This is used to identify if + * the member has rejoined while it was reconciling an assignment (in which case the result + * of the reconciliation is not applied.) + */ +private String memberIdOnReconciliationStart; + + Review Comment: Those kinds of things tend to jump out in _other people's_ code, but I frequently miss them in my own 😄 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -77,32 +138,110 @@ public class MembershipManagerImpl implements MembershipManager { /** * Assignment that the member received from the server and successfully processed. */ -private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment; +private Set currentAssignment; /** - * Assignment that the member received from the server but hasn't completely processed - * yet. + * Subscription state object holding the current assignment the member has for the topics it + * subscribed to. */ -privat
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
kirktrue commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1392829031 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java: ## @@ -25,7 +25,8 @@ public abstract class ApplicationEvent { public enum Type { COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, ASSIGNMENT_CHANGE, -LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA +LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, SUBSCRIPTION_CHANGE, Review Comment: I never liked `ASSIGNMENT_CHANGE` either, but I guess it's consistent, so 👍 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java: ## @@ -25,7 +25,8 @@ public abstract class ApplicationEvent { public enum Type { COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, ASSIGNMENT_CHANGE, -LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA +LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, SUBSCRIPTION_CHANGE, Review Comment: I never liked `ASSIGNMENT_CHANGE` either, but I guess it's consistent, so 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
kirktrue commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1392827512 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeApplicationEvent.java: ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals.events; + +/** + * Application event triggered when a user calls the unsubscribe API. This will make the consumer + * release all its assignments and send a heartbeat request to leave the consumer group. + * This event holds a future that will complete when the invocation of callbacks to release + * complete and the heartbeat to leave the group is sent out (minimal effort to send the + * leave group heartbeat, without waiting for any response or considering timeouts). + */ +public class UnsubscribeApplicationEvent extends CompletableApplicationEvent { Review Comment: Where does it block? I didn't see a call to `Future.get()` when I looked. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org