Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1448416633 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,57 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception if the Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-15538) Client support for java regex based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Phuc Hong Tran reopened KAFKA-15538: > Client support for java regex based subscription > > > Key: KAFKA-15538 > URL: https://issues.apache.org/jira/browse/KAFKA-15538 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848, kip-848-client-support > Fix For: 3.8.0 > > > When using subscribe with a java regex (Pattern), we need to resolve it on > the client side to send the broker a list of topic names to subscribe to. > Context: > The new consumer group protocol uses [Google > RE2/J|https://github.com/google/re2j] for regular expressions and introduces > new methods in the consumer API to subscribe using a `SubscribePattern`. The > subscribe using a java `Pattern` will be still supported for a while but > eventually removed. > * When the subscribe with SubscriptionPattern is used, the client should > just send the regex to the broker and it will be resolved on the server side. > * In the case of the subscribe with Pattern, the regex should be resolved on > the client side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15538) Client support for java regex based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805397#comment-17805397 ] Phuc Hong Tran commented on KAFKA-15538: hm nvm let's me double check > Client support for java regex based subscription > > > Key: KAFKA-15538 > URL: https://issues.apache.org/jira/browse/KAFKA-15538 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848, kip-848-client-support > Fix For: 3.8.0 > > > When using subscribe with a java regex (Pattern), we need to resolve it on > the client side to send the broker a list of topic names to subscribe to. > Context: > The new consumer group protocol uses [Google > RE2/J|https://github.com/google/re2j] for regular expressions and introduces > new methods in the consumer API to subscribe using a `SubscribePattern`. The > subscribe using a java `Pattern` will be still supported for a while but > eventually removed. > * When the subscribe with SubscriptionPattern is used, the client should > just send the regex to the broker and it will be resolved on the server side. > * In the case of the subscribe with Pattern, the regex should be resolved on > the client side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15538) Client support for java regex based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805393#comment-17805393 ] Phuc Hong Tran edited comment on KAFKA-15538 at 1/11/24 6:51 AM: - Look like this has already been done in [https://github.com/apache/kafka/pull/14638] [~lianetm] was (Author: JIRAUSER301295): Look like this has already been done in [https://github.com/apache/kafka/pull/14638.] [~lianetm] > Client support for java regex based subscription > > > Key: KAFKA-15538 > URL: https://issues.apache.org/jira/browse/KAFKA-15538 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848, kip-848-client-support > Fix For: 3.8.0 > > > When using subscribe with a java regex (Pattern), we need to resolve it on > the client side to send the broker a list of topic names to subscribe to. > Context: > The new consumer group protocol uses [Google > RE2/J|https://github.com/google/re2j] for regular expressions and introduces > new methods in the consumer API to subscribe using a `SubscribePattern`. The > subscribe using a java `Pattern` will be still supported for a while but > eventually removed. > * When the subscribe with SubscriptionPattern is used, the client should > just send the regex to the broker and it will be resolved on the server side. > * In the case of the subscribe with Pattern, the regex should be resolved on > the client side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15538) Client support for java regex based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Phuc Hong Tran resolved KAFKA-15538. Resolution: Fixed > Client support for java regex based subscription > > > Key: KAFKA-15538 > URL: https://issues.apache.org/jira/browse/KAFKA-15538 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848, kip-848-client-support > Fix For: 3.8.0 > > > When using subscribe with a java regex (Pattern), we need to resolve it on > the client side to send the broker a list of topic names to subscribe to. > Context: > The new consumer group protocol uses [Google > RE2/J|https://github.com/google/re2j] for regular expressions and introduces > new methods in the consumer API to subscribe using a `SubscribePattern`. The > subscribe using a java `Pattern` will be still supported for a while but > eventually removed. > * When the subscribe with SubscriptionPattern is used, the client should > just send the regex to the broker and it will be resolved on the server side. > * In the case of the subscribe with Pattern, the regex should be resolved on > the client side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15538) Client support for java regex based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805393#comment-17805393 ] Phuc Hong Tran commented on KAFKA-15538: Look like this has already been done in [https://github.com/apache/kafka/pull/14638.] [~lianetm] > Client support for java regex based subscription > > > Key: KAFKA-15538 > URL: https://issues.apache.org/jira/browse/KAFKA-15538 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848, kip-848-client-support > Fix For: 3.8.0 > > > When using subscribe with a java regex (Pattern), we need to resolve it on > the client side to send the broker a list of topic names to subscribe to. > Context: > The new consumer group protocol uses [Google > RE2/J|https://github.com/google/re2j] for regular expressions and introduces > new methods in the consumer API to subscribe using a `SubscribePattern`. The > subscribe using a java `Pattern` will be still supported for a while but > eventually removed. > * When the subscribe with SubscriptionPattern is used, the client should > just send the regex to the broker and it will be resolved on the server side. > * In the case of the subscribe with Pattern, the regex should be resolved on > the client side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16114) Fix partiton not retention after cancel alter intra broker log dir task
[ https://issues.apache.org/jira/browse/KAFKA-16114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wangliucheng updated KAFKA-16114: - Description: The deletion thread will not work on partition after cancel alter intra broker log dir task The steps to reproduce are as follows: 1、Create reassignment.json file test01-1 on the /data01/kafka/log01 directory of the broker 1003,then move to /data01/kafka/log02 { "version": 1, "partitions": [ { "topic": "test01", "partition": 1, "replicas": [1001,1003], "log_dirs": ["any","/data01/kafka/log02"] } ] } 2、Kick off the reassignment bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --execute 3、Cancel the reassignment bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --cancel 4、Result, The partition test01-1 on 1003 will not be deleted The reason for this problem is the partition has been filtered: {code:java} val deletableLogs = logs.filter { case (_, log) => !log.config.compact // pick non-compacted logs }.filterNot { case (topicPartition, _) => inProgress.contains(topicPartition) // skip any logs already in-progress } {code} > Fix partiton not retention after cancel alter intra broker log dir task > > > Key: KAFKA-16114 > URL: https://issues.apache.org/jira/browse/KAFKA-16114 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 3.3.2, 3.6.1 >Reporter: wangliucheng >Priority: Major > > The deletion thread will not work on partition after cancel alter intra > broker log dir task > The steps to reproduce are as follows: > 1、Create reassignment.json file > test01-1 on the /data01/kafka/log01 directory of the broker 1003,then move to > /data01/kafka/log02 > { > "version": 1, > "partitions": [ > { > "topic": "test01", > "partition": 1, > "replicas": [1001,1003], > "log_dirs": ["any","/data01/kafka/log02"] > } > ] > } > 2、Kick off the reassignment > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 > --reassignment-json-file reassignment.json --execute > 3、Cancel the reassignment > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 > --reassignment-json-file reassignment.json --cancel > 4、Result, The partition test01-1 on 1003 will not be deleted > The reason for this problem is the partition has been filtered: > {code:java} > val deletableLogs = logs.filter { > case (_, log) => !log.config.compact // pick non-compacted logs > }.filterNot { > case (topicPartition, _) => inProgress.contains(topicPartition) // skip any > logs already in-progress > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16114) Fix partiton not retention after cancel alter intra broker log dir task
[ https://issues.apache.org/jira/browse/KAFKA-16114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wangliucheng updated KAFKA-16114: - Description: The deletion thread will not work on partition after cancel alter intra broker log dir task The steps to reproduce are as follows: 1、Create reassignment.json file test01-1 on the /data01/kafka/log01 directory of the broker 1003,then move to /data01/kafka/log02 {code:java} { "version": 1, "partitions": [ { "topic": "test01", "partition": 1, "replicas": [1001,1003], "log_dirs": ["any","/data01/kafka/log02"] } ] }{code} 2、Kick off the reassignment {code:java} bin/kafka-reassign-partitions.sh -bootstrap-server localhost:9092 --reassignment-json-file reassignment.json -execute {code} 3、Cancel the reassignment {code:java} bin/kafka-reassign-partitions.sh -bootstrap-server localhost:9092 --reassignment-json-file reassignment.json -cancel {code} 4、Result, The partition test01-1 on 1003 will not be deleted The reason for this problem is the partition has been filtered: {code:java} val deletableLogs = logs.filter { case (_, log) => !log.config.compact // pick non-compacted logs }.filterNot { case (topicPartition, _) => inProgress.contains(topicPartition) // skip any logs already in-progress } {code} was: The deletion thread will not work on partition after cancel alter intra broker log dir task The steps to reproduce are as follows: 1、Create reassignment.json file test01-1 on the /data01/kafka/log01 directory of the broker 1003,then move to /data01/kafka/log02 { "version": 1, "partitions": [ { "topic": "test01", "partition": 1, "replicas": [1001,1003], "log_dirs": ["any","/data01/kafka/log02"] } ] } 2、Kick off the reassignment bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --execute 3、Cancel the reassignment bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file reassignment.json --cancel 4、Result, The partition test01-1 on 1003 will not be deleted The reason for this problem is the partition has been filtered: {code:java} val deletableLogs = logs.filter { case (_, log) => !log.config.compact // pick non-compacted logs }.filterNot { case (topicPartition, _) => inProgress.contains(topicPartition) // skip any logs already in-progress } {code} > Fix partiton not retention after cancel alter intra broker log dir task > > > Key: KAFKA-16114 > URL: https://issues.apache.org/jira/browse/KAFKA-16114 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 3.3.2, 3.6.1 >Reporter: wangliucheng >Priority: Major > > The deletion thread will not work on partition after cancel alter intra > broker log dir task > The steps to reproduce are as follows: > 1、Create reassignment.json file > test01-1 on the /data01/kafka/log01 directory of the broker 1003,then move to > /data01/kafka/log02 > {code:java} > { > "version": 1, > "partitions": [ > { > "topic": "test01", > "partition": 1, > "replicas": [1001,1003], > "log_dirs": ["any","/data01/kafka/log02"] > } > ] > }{code} > 2、Kick off the reassignment > {code:java} > bin/kafka-reassign-partitions.sh -bootstrap-server localhost:9092 > --reassignment-json-file reassignment.json -execute {code} > 3、Cancel the reassignment > {code:java} > bin/kafka-reassign-partitions.sh -bootstrap-server localhost:9092 > --reassignment-json-file reassignment.json -cancel {code} > 4、Result, The partition test01-1 on 1003 will not be deleted > The reason for this problem is the partition has been filtered: > {code:java} > val deletableLogs = logs.filter { > case (_, log) => !log.config.compact // pick non-compacted logs > }.filterNot { > case (topicPartition, _) => inProgress.contains(topicPartition) // skip any > logs already in-progress > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16114) Fix partiton not retention after cancel alter intra broker log dir task
wangliucheng created KAFKA-16114: Summary: Fix partiton not retention after cancel alter intra broker log dir task Key: KAFKA-16114 URL: https://issues.apache.org/jira/browse/KAFKA-16114 Project: Kafka Issue Type: Bug Components: log Affects Versions: 3.6.1, 3.3.2 Reporter: wangliucheng -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]
dengziming commented on code in PR #14846: URL: https://github.com/apache/kafka/pull/14846#discussion_r1448306793 ## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ## @@ -425,8 +539,10 @@ class DeleteTopicTest extends QuorumTestHarness { */ val replicaAssignment = Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2)) -val topic = "test" -servers = createTestTopicAndCluster(topic, true, replicaAssignment) +val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnectOrNull, enableControlledShutdown = false) Review Comment: Why not use `createTestTopicAndCluster` directly? ## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ## @@ -19,78 +19,95 @@ package kafka.admin import java.util import java.util.concurrent.ExecutionException import java.util.{Collections, Optional, Properties} - import scala.collection.Seq import kafka.log.UnifiedLog import kafka.zk.TopicPartitionZNode -import kafka.utils.TestUtils -import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness} +import kafka.utils._ +import kafka.server.{KafkaBroker, KafkaConfig, KafkaServer, QuorumTestHarness} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} -import kafka.common.TopicAlreadyMarkedForDeletionException +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import kafka.controller.{OfflineReplica, PartitionAndReplica, ReplicaAssignment, ReplicaDeletionSuccessful} import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, NewPartitionReassignment, NewPartitions} +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException +import org.apache.kafka.common.errors.{TopicDeletionDisabledException, UnknownTopicOrPartitionException} +import org.apache.kafka.metadata.BrokerState + import scala.jdk.CollectionConverters._ class DeleteTopicTest extends QuorumTestHarness { + var brokers: Seq[KafkaBroker] = Seq() + var servers: Seq[KafkaServer] = Seq() Review Comment: I thinks it's unnecessary to keep both servers and brokers, we can use `KafkaBroker` in most case, or cast it to `KafkaServer` if necessary ## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ## @@ -354,39 +433,54 @@ class DeleteTopicTest extends QuorumTestHarness { server.logManager.cleaner.awaitCleaned(new TopicPartition(topicName, 0), 0) // delete topic -adminZkClient.deleteTopic("test") -TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers) +admin.deleteTopics(Collections.singletonList(topic)).all().get() +TestUtils.verifyTopicDeletion(zkClientOrNull, "test", 1, brokers) } - @Test - def testDeleteTopicAlreadyMarkedAsDeleted(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDeleteTopicAlreadyMarkedAsDeleted(quorum: String): Unit = { val topicPartition = new TopicPartition("test", 0) val topic = topicPartition.topic -servers = createTestTopicAndCluster(topic) +brokers = createTestTopicAndCluster(topic) // start topic deletion -adminZkClient.deleteTopic(topic) +admin.deleteTopics(Collections.singletonList(topic)).all().get() // try to delete topic marked as deleted -assertThrows(classOf[TopicAlreadyMarkedForDeletionException], () => adminZkClient.deleteTopic(topic)) +// start topic deletion +TestUtils.waitUntilTrue(() => { + try { +admin.deleteTopics(Collections.singletonList(topic)).all().get() +false + } catch { +case e: ExecutionException => + classOf[UnknownTopicOrPartitionException].equals(e.getCause.getClass) + } +}, s"Topic ${topic} should be marked for deletion or already deleted.") -TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) +TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers) } - private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: Boolean = true, replicaAssignment: Map[Int, List[Int]] = expectedReplicaAssignment): Seq[KafkaServer] = { -val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, enableControlledShutdown = false) + private def createTestTopicAndCluster(topic: String, numOfConfigs: Int = 3, deleteTopicEnabled: Boolean = true, replicaAssignment: Map[Int, List[Int]] = expectedReplicaAssignment): Seq[KafkaBroker] = { +val brokerConfigs = TestUtils.createBrokerConfigs(numOfConfigs, zkConnectOrNull, enableControlledShutdown = false) brokerConfigs.foreach(_.setProperty("delete.topic.enable", deleteTopicEnabled.toString)) createTestTopicAndCluster(topic, brokerConfigs, replicaAssignment) } - private def createTestTopicAndCluster(t
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on PR #14612: URL: https://github.com/apache/kafka/pull/14612#issuecomment-1886258250 @mumrah @artemlivshits Refactored the code with the following major changes 1. Extract the request handling from the KafkaApis.scala. 2. Performance optimization. Now it loops twice the topic list and avoids unnecessary copies. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1448300820 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1409,6 +1426,77 @@ class KafkaApis(val requestChannel: RequestChannel, )) } + def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): Unit = { +metadataCache match { + case _: ZkMetadataCache => +throw new InvalidRequestException("ZK cluster does not handle DescribeTopicPartitions request") + case _ => +} +val KRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache] + +val describeTopicPartitionsRequest = request.body[DescribeTopicPartitionsRequest].data() +var topics = scala.collection.mutable.Set[String]() +describeTopicPartitionsRequest.topics().forEach(topic => topics.add(topic.name())) + +val cursor = describeTopicPartitionsRequest.cursor() +val fetchAllTopics = topics.isEmpty +if (fetchAllTopics) { + metadataCache.getAllTopics().foreach(topic => topics.add(topic)) Review Comment: Refactored with option (a). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1448300548 ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -189,6 +255,86 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + /** + * Get the topic metadata for the given topics. + * + * The quota is used to limit the number of partitions to return. The NextTopicPartition field points to the first + * partition can't be returned due the limit. + * If a topic can't return any partition due to quota limit reached, this topic will not be included in the response. + * + * Note, the topics should be sorted in alphabetical order. The topics in the DescribeTopicPartitionsResponseData + * will also be sorted in alphabetical order. + * + * @param topicsThe set of topics and their corresponding first partition id to fetch. + * @param listenerName The listener name. + * @param firstTopicPartitionStartIndex The start partition index for the first topic + * @param maximumNumberOfPartitions The max number of partitions to return. + */ + def getTopicMetadataForDescribeTopicResponse( +topics: Seq[String], +listenerName: ListenerName, +firstTopicPartitionStartIndex: Int, +maximumNumberOfPartitions: Int + ): DescribeTopicPartitionsResponseData = { +val image = _currentImage +var remaining = maximumNumberOfPartitions +var startIndex = firstTopicPartitionStartIndex +val result = new DescribeTopicPartitionsResponseData() +topics.foreach { topicName => + if (remaining > 0) { +val partitionResponse = getPartitionMetadataForDescribeTopicResponse(image, topicName, listenerName) +partitionResponse.map( partitions => { + val upperIndex = startIndex + remaining + val response = new DescribeTopicPartitionsResponseTopic() +.setErrorCode(Errors.NONE.code) +.setName(topicName) + .setTopicId(Option(image.topics().getTopic(topicName).id()).getOrElse(Uuid.ZERO_UUID)) +.setIsInternal(Topic.isInternal(topicName)) +.setPartitions(partitions.filter(partition => { + partition.partitionIndex() >= startIndex && partition.partitionIndex() < upperIndex +}).asJava) + remaining -= response.partitions().size() + result.topics().add(response) + + if (upperIndex < partitions.size) { +result.setNextCursor(new Cursor() + .setTopicName(topicName) + .setPartitionIndex(upperIndex) +) +remaining = -1 + } +}) + +// start index only applies to the first topic. Reset it here. +startIndex = 0 + +if (!partitionResponse.isDefined) { + val error = try { +Topic.validate(topicName) +Errors.UNKNOWN_TOPIC_OR_PARTITION Review Comment: Yes, the cursor topic can be deleted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1448300157 ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -189,6 +255,86 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + /** + * Get the topic metadata for the given topics. + * + * The quota is used to limit the number of partitions to return. The NextTopicPartition field points to the first + * partition can't be returned due the limit. + * If a topic can't return any partition due to quota limit reached, this topic will not be included in the response. + * + * Note, the topics should be sorted in alphabetical order. The topics in the DescribeTopicPartitionsResponseData + * will also be sorted in alphabetical order. + * + * @param topicsThe set of topics and their corresponding first partition id to fetch. + * @param listenerName The listener name. + * @param firstTopicPartitionStartIndex The start partition index for the first topic + * @param maximumNumberOfPartitions The max number of partitions to return. + */ + def getTopicMetadataForDescribeTopicResponse( +topics: Seq[String], +listenerName: ListenerName, +firstTopicPartitionStartIndex: Int, +maximumNumberOfPartitions: Int + ): DescribeTopicPartitionsResponseData = { +val image = _currentImage +var remaining = maximumNumberOfPartitions +var startIndex = firstTopicPartitionStartIndex +val result = new DescribeTopicPartitionsResponseData() +topics.foreach { topicName => Review Comment: Refactored, now it will break early if quota reached. ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -189,6 +255,86 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + /** + * Get the topic metadata for the given topics. + * + * The quota is used to limit the number of partitions to return. The NextTopicPartition field points to the first + * partition can't be returned due the limit. + * If a topic can't return any partition due to quota limit reached, this topic will not be included in the response. + * + * Note, the topics should be sorted in alphabetical order. The topics in the DescribeTopicPartitionsResponseData + * will also be sorted in alphabetical order. + * + * @param topicsThe set of topics and their corresponding first partition id to fetch. + * @param listenerName The listener name. + * @param firstTopicPartitionStartIndex The start partition index for the first topic + * @param maximumNumberOfPartitions The max number of partitions to return. + */ + def getTopicMetadataForDescribeTopicResponse( +topics: Seq[String], +listenerName: ListenerName, +firstTopicPartitionStartIndex: Int, +maximumNumberOfPartitions: Int + ): DescribeTopicPartitionsResponseData = { +val image = _currentImage +var remaining = maximumNumberOfPartitions +var startIndex = firstTopicPartitionStartIndex +val result = new DescribeTopicPartitionsResponseData() +topics.foreach { topicName => Review Comment: Refactored, now it will break early if quota is reached. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1448299281 ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + private def getPartitionMetadataForDescribeTopicResponse( +image: MetadataImage, +topicName: String, +listenerName: ListenerName Review Comment: Added the suggested logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1448299150 ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w } } + private def getPartitionMetadataForDescribeTopicResponse( +image: MetadataImage, +topicName: String, +listenerName: ListenerName + ): Option[List[DescribeTopicPartitionsResponsePartition]] = { +Option(image.topics().getTopic(topicName)) match { + case None => None + case Some(topic) => { +val partitions = Some(topic.partitions().entrySet().asScala.map { entry => + val partitionId = entry.getKey + val partition = entry.getValue + val filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas, +listenerName, false) + val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, +false) + val offlineReplicas = getOfflineReplicas(image, partition, listenerName) + val maybeLeader = getAliveEndpoint(image, partition.leader, listenerName) + maybeLeader match { +case None => + val error = if (!image.cluster().brokers.containsKey(partition.leader)) { +debug(s"Error while fetching metadata for $topicName-$partitionId: leader not available") +Errors.LEADER_NOT_AVAILABLE + } else { +debug(s"Error while fetching metadata for $topicName-$partitionId: listener $listenerName " + + s"not found on leader ${partition.leader}") +Errors.LISTENER_NOT_FOUND + } + new DescribeTopicPartitionsResponsePartition() +.setErrorCode(error.code) +.setPartitionIndex(partitionId) +.setLeaderId(MetadataResponse.NO_LEADER_ID) +.setLeaderEpoch(partition.leaderEpoch) +.setReplicaNodes(filteredReplicas) +.setIsrNodes(filteredIsr) +.setOfflineReplicas(offlineReplicas) +case Some(leader) => + val error = if (filteredReplicas.size < partition.replicas.length) { +debug(s"Error while fetching metadata for $topicName-$partitionId: replica information not available for " + + s"following brokers ${partition.replicas.filterNot(filteredReplicas.contains).mkString(",")}") +Errors.REPLICA_NOT_AVAILABLE + } else if (filteredIsr.size < partition.isr.length) { +debug(s"Error while fetching metadata for $topicName-$partitionId: in sync replica information not available for " + + s"following brokers ${partition.isr.filterNot(filteredIsr.contains).mkString(",")}") +Errors.REPLICA_NOT_AVAILABLE + } else { +Errors.NONE + } + + new DescribeTopicPartitionsResponsePartition() +.setErrorCode(error.code) +.setPartitionIndex(partitionId) +.setLeaderId(leader.id()) +.setLeaderEpoch(partition.leaderEpoch) +.setReplicaNodes(filteredReplicas) +.setIsrNodes(filteredIsr) +.setOfflineReplicas(offlineReplicas) +.setEligibleLeaderReplicas(Replicas.toList(partition.elr)) +.setLastKnownELR(Replicas.toList(partition.lastKnownElr)) + } +}.toList) +partitions Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1448298954 ## clients/src/main/resources/common/message/DescribeTopicPartitionsResponse.json: ## @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 75, + "type": "response", + "name": "DescribeTopicPartitionsResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, +{ "name": "Topics", "type": "[]DescribeTopicPartitionsResponseTopic", "versions": "0+", + "about": "Each topic in the response.", "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", +"about": "The topic error, or 0 if there was no error." }, + { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName", "nullableVersions": "0+", +"about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The topic id." }, + { "name": "IsInternal", "type": "bool", "versions": "0+", "default": "false", "ignorable": true, +"about": "True if the topic is internal." }, + { "name": "Partitions", "type": "[]DescribeTopicPartitionsResponsePartition", "versions": "0+", +"about": "Each partition in the topic.", "fields": [ +{ "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The partition error, or 0 if there was no error." }, +{ "name": "PartitionIndex", "type": "int32", "versions": "0+", + "about": "The partition index." }, +{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId", + "about": "The ID of the leader broker." }, +{ "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1", "ignorable": true, + "about": "The leader epoch of this partition." }, +{ "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId", + "about": "The set of all nodes that host this partition." }, +{ "name": "IsrNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId", + "about": "The set of nodes that are in sync with the leader for this partition." }, +{ "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", "entityType": "brokerId", + "versions": "0+", "nullableVersions": "0+", + "about": "The new eligible leader replicas otherwise." }, +{ "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId", Review Comment: Corrected the mistake. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1448298858 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1409,6 +1426,77 @@ class KafkaApis(val requestChannel: RequestChannel, )) } + def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): Unit = { +metadataCache match { + case _: ZkMetadataCache => +throw new InvalidRequestException("ZK cluster does not handle DescribeTopicPartitions request") + case _ => +} +val KRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache] + +val describeTopicPartitionsRequest = request.body[DescribeTopicPartitionsRequest].data() +var topics = scala.collection.mutable.Set[String]() +describeTopicPartitionsRequest.topics().forEach(topic => topics.add(topic.name())) + +val cursor = describeTopicPartitionsRequest.cursor() +val fetchAllTopics = topics.isEmpty +if (fetchAllTopics) { + metadataCache.getAllTopics().foreach(topic => topics.add(topic)) Review Comment: Now we loop twice. In the first time, we gather the topics lexicographically larger than the cursor. In the second time, we iterate the topic list and query for their partitions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]
CalvinConfluent commented on code in PR #14612: URL: https://github.com/apache/kafka/pull/14612#discussion_r1448297437 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1409,6 +1426,77 @@ class KafkaApis(val requestChannel: RequestChannel, )) } + def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): Unit = { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16060) Some questions about tiered storage capabilities
[ https://issues.apache.org/jira/browse/KAFKA-16060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805362#comment-17805362 ] Jianbin Chen commented on KAFKA-16060: -- Thank you for your replies. > Some questions about tiered storage capabilities > > > Key: KAFKA-16060 > URL: https://issues.apache.org/jira/browse/KAFKA-16060 > Project: Kafka > Issue Type: Wish > Components: core >Affects Versions: 3.6.1 >Reporter: Jianbin Chen >Priority: Major > > # If a topic has 3 replicas, when the local expiration time is reached, will > all 3 replicas trigger the log transfer to the remote storage, or will only > the leader in the isr transfer the log to the remote storage (hdfs, s3) > # Topics that do not support compression, do you mean topics that > log.cleanup.policy=compact? -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Upgrade Gradle wrapper to version 8.4 [kafka]
github-actions[bot] commented on PR #14502: URL: https://github.com/apache/kafka/pull/14502#issuecomment-1886169673 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Enable kraft test in kafka.api and kafka.network [kafka]
dengziming commented on code in PR #14595: URL: https://github.com/apache/kafka/pull/14595#discussion_r1448252557 ## core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala: ## @@ -35,31 +37,37 @@ class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwa overridingProps.put(KafkaConfig.NumPartitionsProp, numPartitions.toString) overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, replicationFactor.toString) + def generateConfigs = (0 until numServers) map { node => - TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = false, rack = Some((node / 2).toString)) + TestUtils.createBrokerConfig(node, zkConnectOrNull, enableControlledShutdown = false, rack = Some((node / 2).toString)) } map (KafkaConfig.fromProps(_, overridingProps)) private val topic = "topic" - @Test - def testAutoCreateTopic(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk")) // TODO Partition leader is not evenly distributed in kraft mode, see KAFKA-15354 Review Comment: @mimaison I reverted this change for now since we are moving this to KAFKA-15354. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
rreddy-22 commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1448205757 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -453,19 +454,31 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE /** * Get the Group List. * - * @param statesFilter The states of the groups we want to list. - * If empty all groups are returned with their state. - * @param committedOffset A specified committed offset corresponding to this shard + * @param statesFilter The states of the groups we want to list. + * If empty, all groups are returned with their state. + * @param typesFilter The types of the groups we want to list. + * If empty, all groups are returned with their type. + * @param committedOffset A specified committed offset corresponding to this shard. * * @return A list containing the ListGroupsResponseData.ListedGroup */ +public List listGroups( +List statesFilter, +List typesFilter, Review Comment: From the ListGroupsDataRequest the filters are passed as lists though, did you want to just convert them to sets here or throughout the whole pipeline? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
rreddy-22 commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1448203801 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -453,19 +454,31 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE /** * Get the Group List. * - * @param statesFilter The states of the groups we want to list. - * If empty all groups are returned with their state. - * @param committedOffset A specified committed offset corresponding to this shard + * @param statesFilter The states of the groups we want to list. + * If empty, all groups are returned with their state. + * @param typesFilter The types of the groups we want to list. + * If empty, all groups are returned with their type. + * @param committedOffset A specified committed offset corresponding to this shard. * * @return A list containing the ListGroupsResponseData.ListedGroup */ +public List listGroups( +List statesFilter, +List typesFilter, Review Comment: Sure I can do that, I just left it like how it was before -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1448140837 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1963,6 +2080,75 @@ public void testFetchAllOffsetsAtDifferentCommittedOffset() { ), context.fetchAllOffsets("group", Long.MAX_VALUE)); } +@Test +public void testFetchAllOffsetsWithPendingTransactionalOffsets() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + +context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + +context.commitOffset("group", "foo", 0, 100L, 1); +context.commitOffset("group", "foo", 1, 110L, 1); +context.commitOffset("group", "bar", 0, 200L, 1); + +context.commit(); + +assertEquals(3, context.lastWrittenOffset); +assertEquals(3, context.lastCommittedOffset); + +context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds()); +context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds()); + +// Fetching offsets with "require stable" (Long.MAX_VALUE) should return the UNSTABLE_OFFSET_COMMIT +// errors for foo-1 and bar-0. Review Comment: ditto with the comments for expectations for foo-0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics
[ https://issues.apache.org/jira/browse/KAFKA-16113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16113: --- Parent: KAFKA-14246 Issue Type: Sub-task (was: Improvement) > AsyncKafkaConsumer: Add missing offset commit metrics > - > > Key: KAFKA-16113 > URL: https://issues.apache.org/jira/browse/KAFKA-16113 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > > The following metrics are missing from the AsyncKafkaConsumer: > commit-latency-avg > commit-latency-max > commit-rate > commit-total > committed-time-ns-total -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics
Philip Nee created KAFKA-16113: -- Summary: AsyncKafkaConsumer: Add missing offset commit metrics Key: KAFKA-16113 URL: https://issues.apache.org/jira/browse/KAFKA-16113 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Philip Nee Assignee: Philip Nee The following metrics are missing from the AsyncKafkaConsumer: commit-latency-avg commit-latency-max commit-rate commit-total committed-time-ns-total -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1448140646 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1963,6 +2080,75 @@ public void testFetchAllOffsetsAtDifferentCommittedOffset() { ), context.fetchAllOffsets("group", Long.MAX_VALUE)); } +@Test +public void testFetchAllOffsetsWithPendingTransactionalOffsets() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + +context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + +context.commitOffset("group", "foo", 0, 100L, 1); +context.commitOffset("group", "foo", 1, 110L, 1); +context.commitOffset("group", "bar", 0, 200L, 1); + +context.commit(); + +assertEquals(3, context.lastWrittenOffset); +assertEquals(3, context.lastCommittedOffset); + +context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds()); +context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds()); Review Comment: should we have bar-1 again to test that case too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1448139696 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1856,6 +1888,91 @@ public void testFetchOffsetsAtDifferentCommittedOffset() { ), context.fetchOffsets("group", request, Long.MAX_VALUE)); } +@Test +public void testFetchOffsetsWithPendingTransactionalOffsets() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + +context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + +context.commitOffset("group", "foo", 0, 100L, 1); +context.commitOffset("group", "foo", 1, 110L, 1); +context.commitOffset("group", "bar", 0, 200L, 1); + +context.commit(); + +assertEquals(3, context.lastWrittenOffset); +assertEquals(3, context.lastCommittedOffset); + +context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds()); +context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds()); +// Note that bar-1 does not exist in the initial commits. UNSTABLE_OFFSET_COMMIT errors +// must be returned in this case too. +context.commitOffset(10L, "group", "bar", 1, 211L, 1, context.time.milliseconds()); + +// Always use the same request. +List request = Arrays.asList( +new OffsetFetchRequestData.OffsetFetchRequestTopics() +.setName("foo") +.setPartitionIndexes(Arrays.asList(0, 1)), +new OffsetFetchRequestData.OffsetFetchRequestTopics() +.setName("bar") +.setPartitionIndexes(Arrays.asList(0, 1)) +); + +// Fetching offsets with "require stable" (Long.MAX_VALUE) should return the UNSTABLE_OFFSET_COMMIT +// errors for foo-1, bar-0 and bar-1. Review Comment: should we mention foo-0 will be returned in this comment and also the comment below? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1448132968 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -195,6 +196,11 @@ public OffsetMetadataManager build() { */ private final TimelineHashMap pendingTransactionalOffsets; +/** + * The open transactions (producer ids) keyed by group. + */ +private final TimelineHashMap> openTransactionsByGroup; Review Comment: Ah its the snapshot registry and revertLastWrittenOffset method that does this. I had forgotten. But makes sense now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
artemlivshits commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r144830 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: My understanding is that by the time we come to this function it already runs on a GC thread (it has to happen because that's how we guarantee the atomicity), so there is no request local anyway here and we must use NoCaching (it is always safe to use NoCaching, just not as optimal as thread local). The future's callback will be called in the thread that completed it, but like I said, it doesn't matter for this function as it gets rescheduled on the GC thread pool. I gave a suggestion to hoist wrapping to the caller, so new GC doesn't have to do double-schedule https://github.com/apache/kafka/pull/14774#discussion_r1420931474. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15561) Client support for new SubscriptionPattern based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805327#comment-17805327 ] Phuc Hong Tran commented on KAFKA-15561: I see. I was wondering this since this ticket requires me to make change to ConsumerGroupHeartbeatRequestData's schema, so I wanted to see if I need to make any change anywhere else to ensure that records with new format can be read correctly. Thanks [~lianetm]. > Client support for new SubscriptionPattern based subscription > - > > Key: KAFKA-15561 > URL: https://issues.apache.org/jira/browse/KAFKA-15561 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > Fix For: 3.8.0 > > > New consumer should support subscribe with the new SubscriptionPattern > introduced in the new consumer group protocol. When subscribing with this > regex, the client should provide the regex in the HB request on the > SubscribedTopicRegex field, delegating the resolution to the server. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15538) Client support for java regex based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805326#comment-17805326 ] Phuc Hong Tran commented on KAFKA-15538: I understand, will be more careful when readling ticket from now on. Thanks [~lianetm] > Client support for java regex based subscription > > > Key: KAFKA-15538 > URL: https://issues.apache.org/jira/browse/KAFKA-15538 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848, kip-848-client-support > Fix For: 3.8.0 > > > When using subscribe with a java regex (Pattern), we need to resolve it on > the client side to send the broker a list of topic names to subscribe to. > Context: > The new consumer group protocol uses [Google > RE2/J|https://github.com/google/re2j] for regular expressions and introduces > new methods in the consumer API to subscribe using a `SubscribePattern`. The > subscribe using a java `Pattern` will be still supported for a while but > eventually removed. > * When the subscribe with SubscriptionPattern is used, the client should > just send the regex to the broker and it will be resolved on the server side. > * In the case of the subscribe with Pattern, the regex should be resolved on > the client side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
rreddy-22 commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1448101818 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -9583,24 +9584,24 @@ public void testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep @Test public void testListGroups() { Review Comment: I think it's there, did you want a different test by any chance? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
rreddy-22 commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1448098432 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -1105,16 +1105,18 @@ private[group] class GroupCoordinator( } } - def handleListGroups(states: Set[String]): (Errors, List[GroupOverview]) = { + def handleListGroups(states: Set[String], groupTypes: Set[String]): (Errors, List[GroupOverview]) = { if (!isActive.get) { (Errors.COORDINATOR_NOT_AVAILABLE, List[GroupOverview]()) } else { val errorCode = if (groupManager.isLoading) Errors.COORDINATOR_LOAD_IN_PROGRESS else Errors.NONE - // if states is empty, return all groups - val groups = if (states.isEmpty) -groupManager.currentGroups - else -groupManager.currentGroups.filter(g => states.contains(g.summary.state)) + // Filter groups based on states and groupTypes. If either is empty, it won't filter on that criterion. + // If groupType is mentioned then no group is returned since the notion of groupTypes doesn't exist in the + // old group coordinator. + val groups = groupManager.currentGroups.filter { g => +(states.isEmpty || states.contains(g.summary.state)) && + (groupTypes.isEmpty || groupTypes.contains("classic")) Review Comment: Yes okie I'll do that thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move ProcessRole to server module [kafka]
ijuma commented on PR #15166: URL: https://github.com/apache/kafka/pull/15166#issuecomment-1885894231 I re-ran the tests and failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move ProcessRole to server module [kafka]
ijuma merged PR #15166: URL: https://github.com/apache/kafka/pull/15166 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
rreddy-22 commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1448087895 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -453,19 +454,31 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE /** * Get the Group List. * - * @param statesFilter The states of the groups we want to list. - * If empty all groups are returned with their state. - * @param committedOffset A specified committed offset corresponding to this shard + * @param statesFilter The states of the groups we want to list. + * If empty, all groups are returned with their state. + * @param typesFilter The types of the groups we want to list. + * If empty, all groups are returned with their type. + * @param committedOffset A specified committed offset corresponding to this shard. * * @return A list containing the ListGroupsResponseData.ListedGroup */ +public List listGroups( +List statesFilter, +List typesFilter, +long committedOffset +) { +Predicate combinedFilter = group -> { +boolean stateCheck = statesFilter.isEmpty() || statesFilter.contains(group.stateAsString(committedOffset)); +boolean typeCheck = typesFilter.isEmpty() || typesFilter.contains(group.type().toString()); Review Comment: I decided to convert the the arg to lower case in the handle list groups request itself, so we wouldn't need to check for case in any of the API portions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]
rreddy-22 commented on code in PR #15152: URL: https://github.com/apache/kafka/pull/15152#discussion_r1448087379 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -453,19 +454,31 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE /** * Get the Group List. * - * @param statesFilter The states of the groups we want to list. - * If empty all groups are returned with their state. - * @param committedOffset A specified committed offset corresponding to this shard + * @param statesFilter The states of the groups we want to list. + * If empty, all groups are returned with their state. + * @param typesFilter The types of the groups we want to list. + * If empty, all groups are returned with their type. + * @param committedOffset A specified committed offset corresponding to this shard. * * @return A list containing the ListGroupsResponseData.ListedGroup */ +public List listGroups( +List statesFilter, +List typesFilter, +long committedOffset +) { +Predicate combinedFilter = group -> { +boolean stateCheck = statesFilter.isEmpty() || statesFilter.contains(group.stateAsString(committedOffset)); +boolean typeCheck = typesFilter.isEmpty() || typesFilter.contains(group.type().toString()); Review Comment: I didn't implement it by then, was still waiting for confirmation from today's meeting, It is there now! ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -453,19 +454,31 @@ public Group group(String groupId, long committedOffset) throws GroupIdNotFoundE /** * Get the Group List. * - * @param statesFilter The states of the groups we want to list. - * If empty all groups are returned with their state. - * @param committedOffset A specified committed offset corresponding to this shard + * @param statesFilter The states of the groups we want to list. + * If empty, all groups are returned with their state. + * @param typesFilter The types of the groups we want to list. + * If empty, all groups are returned with their type. + * @param committedOffset A specified committed offset corresponding to this shard. * * @return A list containing the ListGroupsResponseData.ListedGroup */ +public List listGroups( +List statesFilter, +List typesFilter, +long committedOffset +) { +Predicate combinedFilter = group -> { +boolean stateCheck = statesFilter.isEmpty() || statesFilter.contains(group.stateAsString(committedOffset)); +boolean typeCheck = typesFilter.isEmpty() || typesFilter.contains(group.type().toString()); Review Comment: I didn't implement it by then, was still waiting for confirmation from today's meeting, It is there now! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]
gharris1727 commented on code in PR #14995: URL: https://github.com/apache/kafka/pull/14995#discussion_r1448064344 ## clients/src/main/java/org/apache/kafka/common/config/internals/AllowedPaths.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.internals; + +import org.apache.kafka.common.config.ConfigException; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class AllowedPaths { +private List allowedPaths; + +private AllowedPaths(List allowedPaths) { +this.allowedPaths = allowedPaths; +} + +/** + * Constructs AllowedPaths with a list of Paths retrieved from {@code configValue}. + * @param configValue {@code allowed.paths} config value which is a string containing comma separated list of paths + * @return AllowedPaths with a list of Paths or null list if the {@code configValue} is null or empty string. + */ +public static AllowedPaths configureAllowedPaths(String configValue) { +if (configValue != null && !configValue.isEmpty()) { +List allowedPaths = new ArrayList<>(); + +Arrays.stream(configValue.split(",")).forEach(b -> { +Path normalisedPath = Paths.get(b).normalize(); + +if (normalisedPath.isAbsolute() && Files.exists(normalisedPath)) { +allowedPaths.add(normalisedPath); +} else { +throw new ConfigException("Path " + normalisedPath + " is not valid. The path should be absolute and exist"); +} +}); + +return new AllowedPaths(allowedPaths); +} + +return new AllowedPaths(null); +} + +/** + * Checks if the given {@code path} resides in the configured {@code allowed.paths}. + * If {@code allowed.paths} is not configured, the given Path is returned as allowed. + * @param path the Path to check if allowed + * @return Path that can be accessed or null if the given Path does not reside in the configured {@code allowed.paths}. + */ +public Path getIfPathIsAllowed(Path path) { Review Comment: This name sounds like it should return a boolean. Also I noticed that the call-sites duplicate the Path.get(). ```suggestion public Path parseUntrustedPath(String untrustedPath) { ``` ## clients/src/main/java/org/apache/kafka/common/config/internals/AllowedPaths.java: ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.internals; + +import org.apache.kafka.common.config.ConfigException; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class AllowedPaths { +private List allowedPaths; + +private AllowedPaths(List allowedPaths) { +this.allowedPaths = allowedPaths; +} + +/** + * Constructs AllowedPaths with a list of Paths retrieved from {@code configValue}. + * @param configValue {@code allowed.paths} config value which is a string containing comma separated list of paths + * @return AllowedPaths with a list of Paths or null list if the {@code configValue} is null or empty string. Review Comment:
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1448027564 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: I guess we just complete the future in the callback and do the write in the thenCompose. That made me realize I don't know everything about how futures are scheduled, but looking at the docs I think it is safe to say this is correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15351: Ensure log-start-offset not updated to local-log-start-offset when remote storage enabled [kafka]
junrao commented on code in PR #14301: URL: https://github.com/apache/kafka/pull/14301#discussion_r1448007858 ## core/src/main/scala/kafka/log/LogLoader.scala: ## @@ -78,7 +78,8 @@ class LogLoader( recoveryPointCheckpoint: Long, leaderEpochCache: Option[LeaderEpochFileCache], producerStateManager: ProducerStateManager, - numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int] + numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int], + isRemoteLogEnabled: Boolean = false, Review Comment: Could we add the new param to the javadoc? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1447988893 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: Ok -- sorry I just meant I didn't know if it was passed through a callback (or future in this case) or if it was only invoked in the callback. For example, there were issues in the past since we used the buffer defined for the callback on a new handler thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer
[ https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805289#comment-17805289 ] Lianet Magrans commented on KAFKA-15475: Regarding your previous question about the retriable behaviour, short answer would be that yes, we do want to retry internally, but it depends on the request. Ex. TopicMetadata requests are retried internally by the manager whenever they fail on a Retriable error, also sync offset Commit and offsetFetch. But, the CommitRequestManager also supports asyn commits, where we do not want to retry. Probably the commitRequestManager is the more complex one in that sense, as there are many different ways of committing, and they have different retry expectations, other managers might be simpler, like the TopicMetadata one. > Timeout request might retry forever even if the user API times out in > PrototypeAsyncConsumer > > > Key: KAFKA-15475 > URL: https://issues.apache.org/jira/browse/KAFKA-15475 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor, kip-848-preview > Fix For: 3.8.0 > > > If the request timeout in the background thread, it will be completed with > TimeoutException, which is Retriable. In the TopicMetadataRequestManager and > possibly other managers, the request might continue to be retried forever. > > There are two ways to fix this > # Pass a timer to the manager to remove the inflight requests when it is > expired. > # Pass the future to the application layer and continue to retry. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16033) Review client retry logic of OffsetFetch and OffsetCommit responses
[ https://issues.apache.org/jira/browse/KAFKA-16033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16033: --- Description: The retry logic for OffsetFetch and OffsetCommit requests lives in the CommitRequestManager, and applies to requests issued from multiple components (AsyncKakfaConsumer for commitSync and commitAsync, CommitRequestManager for the regular auto-commits, MembershipManager for auto-commits before rebalance, auto-commit before closing consumer). While this approach helps to avoid having the retry logic in each caller, currently the CommitManager has it in different places and it ends up being rather hard to follow. This task aims at reviewing the retry logic from a high level perspective (multiple callers, with retry needs that have similarities and differences at the same time). So the review should asses the similarities vs differences, and then consider two options: 1. Keep retry logic centralized in the CommitManager, but fixed in a more consistent way, applied the same way for all requests, depending on the intention expressed by the caller. Advantages of this approach (current approach + improvement) is that callers that require the same retry logic could reuse if, keeping it in a single place (ex. commitSync from the consumer retries in the same way as the auto-commit before rebalance). 2. move retry logic to the caller. This aligns with the way it was done on the legacy coordinator, but the main challenge seems to be not duplicating the retry logic in callers that require the same. This task will also review what exceptions are indeed retried on the OffsetCommit and OffsetFetch, considering that the legacy implementation only retries on some expected Retriable errors (not all) was: The retry logic for OffsetFetch and OffsetCommit requests lives in the CommitRequestManager, and applies to requests issued from multiple components (AsyncKakfaConsumer for commitSync and commitAsync, CommitRequestManager for the regular auto-commits, MembershipManager for auto-commits before rebalance, auto-commit before closing consumer). While this approach helps to avoid having the retry logic in each caller, currently the CommitManager has it in different places and it ends up being rather hard to follow. This task aims at reviewing the retry logic from a high level perspective (multiple callers, with retry needs that have similarities and differences at the same time). So the review should asses the similarities vs differences, and then consider two options: 1. Keep retry logic centralized in the CommitManager, but fixed in a more consistent way, applied the same way for all requests, depending on the intention expressed by the caller. Advantages of this approach (current approach + improvement) is that callers that require the same retry logic could reuse if, keeping it in a single place (ex. commitSync from the consumer retries in the same way as the auto-commit before rebalance). 2. move retry logic to the caller. This aligns with the way it was done on the legacy coordinator, but the main challenge seems to be not duplicating the retry logic in callers that require the same. > Review client retry logic of OffsetFetch and OffsetCommit responses > --- > > Key: KAFKA-16033 > URL: https://issues.apache.org/jira/browse/KAFKA-16033 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > The retry logic for OffsetFetch and OffsetCommit requests lives in the > CommitRequestManager, and applies to requests issued from multiple components > (AsyncKakfaConsumer for commitSync and commitAsync, CommitRequestManager for > the regular auto-commits, MembershipManager for auto-commits before > rebalance, auto-commit before closing consumer). While this approach helps to > avoid having the retry logic in each caller, currently the CommitManager has > it in different places and it ends up being rather hard to follow. > This task aims at reviewing the retry logic from a high level perspective > (multiple callers, with retry needs that have similarities and differences at > the same time). So the review should asses the similarities vs differences, > and then consider two options: > 1. Keep retry logic centralized in the CommitManager, but fixed in a more > consistent way, applied the same way for all requests, depending on the > intention expressed by the caller. Advantages of this approach (current > approach + improvement) is that callers that require the same retry logic > could reuse if, keeping it in a single place (ex. commitSync from th
[jira] [Commented] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer
[ https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805287#comment-17805287 ] Lianet Magrans commented on KAFKA-15475: Sure, TopicMetadataManager [here|https://github.com/apache/kafka/blob/fbbfafe1f556f424bf511697db6f399e5a622aa3/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java#L212] and CommitRequestManager [here|https://github.com/apache/kafka/blob/fbbfafe1f556f424bf511697db6f399e5a622aa3/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L561]. Both include similar logic to make sure that requests are retried but within the timeout boundaries. > Timeout request might retry forever even if the user API times out in > PrototypeAsyncConsumer > > > Key: KAFKA-15475 > URL: https://issues.apache.org/jira/browse/KAFKA-15475 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor, kip-848-preview > Fix For: 3.8.0 > > > If the request timeout in the background thread, it will be completed with > TimeoutException, which is Retriable. In the TopicMetadataRequestManager and > possibly other managers, the request might continue to be retried forever. > > There are two ways to fix this > # Pass a timer to the manager to remove the inflight requests when it is > expired. > # Pass the future to the application layer and continue to retry. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition
[ https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805286#comment-17805286 ] Stanislav Kozlovski commented on KAFKA-16082: - Deeming this not a blocker as per discussions with [~pprovenzano] > JBOD: Possible dataloss when moving leader partition > > > Key: KAFKA-16082 > URL: https://issues.apache.org/jira/browse/KAFKA-16082 > Project: Kafka > Issue Type: Bug > Components: jbod >Affects Versions: 3.7.0 >Reporter: Proven Provenzano >Assignee: Gaurav Narula >Priority: Critical > Fix For: 3.7.1 > > > There is a possible dataloss scenario > when using JBOD, > when moving the partition leader log from one directory to another on the > same broker, > when after the destination log has caught up to the source log and after the > broker has sent an update to the partition assignment > if the broker accepts and commits a new record for the partition and then the > broker restarts and the original partition leader log is lost > then the destination log would not contain the new record. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition
[ https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-16082: Priority: Critical (was: Blocker) > JBOD: Possible dataloss when moving leader partition > > > Key: KAFKA-16082 > URL: https://issues.apache.org/jira/browse/KAFKA-16082 > Project: Kafka > Issue Type: Bug > Components: jbod >Affects Versions: 3.7.0 >Reporter: Proven Provenzano >Assignee: Gaurav Narula >Priority: Critical > Fix For: 3.7.1 > > > There is a possible dataloss scenario > when using JBOD, > when moving the partition leader log from one directory to another on the > same broker, > when after the destination log has caught up to the source log and after the > broker has sent an update to the partition assignment > if the broker accepts and commits a new record for the partition and then the > broker restarts and the original partition leader log is lost > then the destination log would not contain the new record. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition
[ https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-16082: Fix Version/s: 3.7.1 (was: 3.7.0) > JBOD: Possible dataloss when moving leader partition > > > Key: KAFKA-16082 > URL: https://issues.apache.org/jira/browse/KAFKA-16082 > Project: Kafka > Issue Type: Bug > Components: jbod >Affects Versions: 3.7.0 >Reporter: Proven Provenzano >Assignee: Gaurav Narula >Priority: Blocker > Fix For: 3.7.1 > > > There is a possible dataloss scenario > when using JBOD, > when moving the partition leader log from one directory to another on the > same broker, > when after the destination log has caught up to the source log and after the > broker has sent an update to the partition assignment > if the broker accepts and commits a new record for the partition and then the > broker restarts and the original partition leader log is lost > then the destination log would not contain the new record. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: log new coordinator partition load schedule time [kafka]
dajac commented on code in PR #15017: URL: https://github.com/apache/kafka/pull/15017#discussion_r1447922892 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java: ## @@ -53,12 +53,16 @@ public short unknownType() { class LoadSummary { private final long startTimeMs; private final long endTimeMs; +private final long totalTimeMs; Review Comment: yeah, i would rather use a log message similar to the one you removed instead of just toString’ing the summary. it will look better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: log new coordinator partition load schedule time [kafka]
dajac commented on code in PR #15017: URL: https://github.com/apache/kafka/pull/15017#discussion_r1447921933 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java: ## @@ -53,12 +53,16 @@ public short unknownType() { class LoadSummary { private final long startTimeMs; private final long endTimeMs; +private final long totalTimeMs; +private final long schedulerTimeMs; Review Comment: that works. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1447918075 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,57 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception if the Review Comment: sorry, missed it. i think that i will just replace those docs with links to the interface doc in order to avoid this in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
dajac commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1447916841 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: Yes, we reschedule to request thread to only push an event to the coordinator thread. It is not optimal. The request local is not used at all as the writer uses its own buffers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
dajac commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1447913870 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -195,6 +196,11 @@ public OffsetMetadataManager build() { */ private final TimelineHashMap pendingTransactionalOffsets; +/** + * The open transactions (producer ids) keyed by group. + */ +private final TimelineHashMap> openTransactionsByGroup; Review Comment: yeah, it is implicit. the write operation (the transactional offset commit) could for instance fail. the state is rolled back in this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1447907424 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -195,6 +196,11 @@ public OffsetMetadataManager build() { */ private final TimelineHashMap pendingTransactionalOffsets; +/** + * The open transactions (producer ids) keyed by group. + */ +private final TimelineHashMap> openTransactionsByGroup; Review Comment: Sorry -- I left a lot of words here. Is there a time where we take advantage of the data structure? Ie using the epoch/rolling back? I didn't see any in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1447883700 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,57 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception if the Review Comment: nit: did we want to update this comment to be consistent with the PartitionWriter one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1447880085 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: If it is the case its only an optimization question rather than a correctness one, I am good to tackle in a followup. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1447879037 ## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( +tp: TopicPartition, +transactionalId: String, +producerId: Long, +producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { +val future = new CompletableFuture[VerificationGuard]() +replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { +if (error != Errors.NONE) { + future.completeExceptionally(error.exception) +} else { + future.complete(verificationGuard) +} + } +) +future + } + private def internalAppend( tp: TopicPartition, -memoryRecords: MemoryRecords +memoryRecords: MemoryRecords, +verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty -replicaManager.appendRecords( +replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: Sorry -- you are correct. I was confused because we have a check for if we execute on the same request thread -- there we don't reschedule/wrap the request. (This was added when we error before we send the request to the txn coordinator) Just for my understanding, right now we reschedule to the request thread, and that works fine -- the only concern is not using the allocated coordinator threads and taking up space on the request handler threads? Just triple checking we won't run into an issue with request locals if the callback expects the buffer supplier from the coordinator thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]
jolshan commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1447873603 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -1106,6 +1106,8 @@ private static boolean isGroupIdNotEmpty(String groupId) { * @return The Errors instance associated with the given exception. */ private static Errors normalizeException(Throwable exception) { +exception = Errors.maybeUnwrapException(exception); Review Comment: I see. It was too elegant before 😂 Thanks for looking into it and fixing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16110) Implement consumer performance tests
[ https://issues.apache.org/jira/browse/KAFKA-16110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805272#comment-17805272 ] Philip Nee commented on KAFKA-16110: My proposal here is - Let's run trogdor to see what can we get out of it. If the current settings is not satisfied then we can add more "specs" to the repo and see if we can get to the point we want. - We also might want to monitor the performance of the head of trunk as we are putting code in. I wonder if it is easy to achieve using trogdor. > Implement consumer performance tests > > > Key: KAFKA-16110 > URL: https://issues.apache.org/jira/browse/KAFKA-16110 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16110) Implement consumer performance tests
[ https://issues.apache.org/jira/browse/KAFKA-16110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805271#comment-17805271 ] Philip Nee commented on KAFKA-16110: Hi [~kirktrue] - Thanks for filing this JIRA. There are two paths forward for the performance testing. One is using trogdor - which does more than performance testing but also allow us to test different fault scenarios. Second is implementing our own benchmarking test, I started working on it some time ago but I left it in a limbo state given trogdor does a lot of it already. WDTY? FWIW: this is the repo: https://github.com/philipnee/kafka-benchmarker > Implement consumer performance tests > > > Key: KAFKA-16110 > URL: https://issues.apache.org/jira/browse/KAFKA-16110 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16112) Review JMX metrics in Async Consumer and determine the missing ones
Kirk True created KAFKA-16112: - Summary: Review JMX metrics in Async Consumer and determine the missing ones Key: KAFKA-16112 URL: https://issues.apache.org/jira/browse/KAFKA-16112 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Kirk True Assignee: Philip Nee Fix For: 3.8.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16111) Implement tests for tricky rebalance callbacks scenarios
Kirk True created KAFKA-16111: - Summary: Implement tests for tricky rebalance callbacks scenarios Key: KAFKA-16111 URL: https://issues.apache.org/jira/browse/KAFKA-16111 Project: Kafka Issue Type: Test Components: clients, consumer Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16110) Implement consumer performance tests
Kirk True created KAFKA-16110: - Summary: Implement consumer performance tests Key: KAFKA-16110 URL: https://issues.apache.org/jira/browse/KAFKA-16110 Project: Kafka Issue Type: New Feature Components: clients, consumer Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16109) Ensure system tests cover the "simple consumer + commit" use case
Kirk True created KAFKA-16109: - Summary: Ensure system tests cover the "simple consumer + commit" use case Key: KAFKA-16109 URL: https://issues.apache.org/jira/browse/KAFKA-16109 Project: Kafka Issue Type: Improvement Components: clients, consumer, system tests Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16107) Ensure consumer does not start fetching from added partitions until onPartitionsAssigned completes
[ https://issues.apache.org/jira/browse/KAFKA-16107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16107: -- Fix Version/s: 3.8.0 > Ensure consumer does not start fetching from added partitions until > onPartitionsAssigned completes > -- > > Key: KAFKA-16107 > URL: https://issues.apache.org/jira/browse/KAFKA-16107 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > In the new consumer implementation, when new partitions are assigned, the > subscription state is updated and then the #onPartitionsAssigned triggered. > This sequence seems sensible but we need to ensure that no data is fetched > until the onPartitionsAssigned completes (where the user could be setting the > committed offsets it want to start fetching from). > We should pause the partitions newly added partitions until > onPartitionsAssigned completes, similar to how it's done on revocation to > avoid positions getting ahead of the committed offsets. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]
wernerdv commented on PR #15101: URL: https://github.com/apache/kafka/pull/15101#issuecomment-1885412204 @divijvaidya hello, I rerun the tests after merging #15093 and #15077. https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15101/10/testReport/ There are failed tests from the `org.apache.kafka.streams.integration` package that have unexpected treads e.g. `kafka-scheduler`, `Controller-0-to-broker-0-send-thread`, `ReplicaFetcherThread`. https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15101/10/testReport/org.apache.kafka.streams.integration/ It's also worth looking at the test results: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15101/10/testReport/kafka.utils/ The names of the expected threads are similar to https://github.com/apache/kafka/pull/15052/files#diff-b8f9f9d1b191457cbdb332a3429f0ad65b50fa4cef5af8562abcfd1f177a2cfeR2441 Is there anything I can do to improve my PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]
ijuma commented on code in PR #15158: URL: https://github.com/apache/kafka/pull/15158#discussion_r1447757987 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.transaction; + +public class TransactionLogConfig { Review Comment: Yeah, I was using the maven name. I agree that we shouldn't add a dependency unless it exists. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16106) group size counters do not reflect the actual sizes when operations fail
[ https://issues.apache.org/jira/browse/KAFKA-16106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Kim updated KAFKA-16106: - Description: An expire-group-metadata operation generates tombstone records, updates the `groups` state and decrements group size counters, then performs a write to the log. If there is a __consumer_offsets partition reassignment, this operation fails. The `groups` state is reverted to an earlier snapshot but classic group size counters are not. This begins an inconsistency between the metrics and the actual groups size. This applies to all unsuccessful write operations that alter the `groups` state. The issue is exacerbated because the expire group metadata operation can be retried multiple times until the partition is fully unloaded. The solution to this is to make the counters also a timeline data structure (TimelineLong) so that in the event of a failed write operation we revert the counters as well. was: An expire-group-metadata operation generates tombstone records, updates the `groups` state and decrements group size counters, then performs a write to the log. If there is a __consumer_offsets partition reassignment, this operation fails. The `groups` state is reverted to an earlier snapshot but classic group size counters are not. This begins an inconsistency between the metrics and the actual groups size. This applies to all unsuccessful write operations that alter the `groups` state. The issue is exacerbated because the expire group metadata operation is retried possibly indefinitely. The solution to this is to make the counters also a timeline data structure (TimelineLong) so that in the event of a failed write operation we revert the counters as well. > group size counters do not reflect the actual sizes when operations fail > > > Key: KAFKA-16106 > URL: https://issues.apache.org/jira/browse/KAFKA-16106 > Project: Kafka > Issue Type: Sub-task >Reporter: Jeff Kim >Assignee: Jeff Kim >Priority: Major > > An expire-group-metadata operation generates tombstone records, updates the > `groups` state and decrements group size counters, then performs a write to > the log. If there is a __consumer_offsets partition reassignment, this > operation fails. The `groups` state is reverted to an earlier snapshot but > classic group size counters are not. This begins an inconsistency between the > metrics and the actual groups size. This applies to all unsuccessful write > operations that alter the `groups` state. > > The issue is exacerbated because the expire group metadata operation can be > retried multiple times until the partition is fully unloaded. > > The solution to this is to make the counters also a timeline data structure > (TimelineLong) so that in the event of a failed write operation we revert the > counters as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: log new coordinator partition load schedule time [kafka]
jeffkbkim commented on code in PR #15017: URL: https://github.com/apache/kafka/pull/15017#discussion_r1447748531 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java: ## @@ -53,12 +53,16 @@ public short unknownType() { class LoadSummary { private final long startTimeMs; private final long endTimeMs; +private final long totalTimeMs; +private final long schedulerTimeMs; Review Comment: how's schedulerQueueTimeMs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: log new coordinator partition load schedule time [kafka]
jeffkbkim commented on code in PR #15017: URL: https://github.com/apache/kafka/pull/15017#discussion_r1447744934 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java: ## @@ -53,12 +53,16 @@ public short unknownType() { class LoadSummary { private final long startTimeMs; private final long endTimeMs; +private final long totalTimeMs; Review Comment: This simplifies the log in CoordinatorRuntime ``` log.info("Finished loading of metadata from {} with epoch {} and LoadSummary={}.", tp, partitionEpoch, summary ); ``` this simplifies the log format -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: fix custom retry backoff in new group coordinator [kafka]
jeffkbkim opened a new pull request, #15170: URL: https://github.com/apache/kafka/pull/15170 When a retryable write operation fails, we retry with the default 500ms backoff. If a custom retry backoff was used to originally schedule the operation, we should retry with the same custom backoff instead of the default. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16098) State updater may attempt to resume a task that is not assigned anymore
[ https://issues.apache.org/jira/browse/KAFKA-16098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna resolved KAFKA-16098. --- Resolution: Fixed > State updater may attempt to resume a task that is not assigned anymore > --- > > Key: KAFKA-16098 > URL: https://issues.apache.org/jira/browse/KAFKA-16098 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Lucas Brutschy >Assignee: Bruno Cadonna >Priority: Major > Attachments: streams.log.gz > > > A long-running soak test brought to light this `IllegalStateException`: > {code:java} > [2024-01-07 08:54:13,688] ERROR [i-0637ca8609f50425f-StreamThread-1] Thread > encountered an error processing soak test > (org.apache.kafka.streams.StreamsSoakTest) > java.lang.IllegalStateException: No current assignment for partition > network-id-repartition-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.resume(SubscriptionState.java:753) > at > org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.resume(LegacyKafkaConsumer.java:963) > at > org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1524) > at > org.apache.kafka.streams.processor.internals.TaskManager.transitRestoredTaskToRunning(TaskManager.java:857) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleRestoredTasksFromStateUpdater(TaskManager.java:979) > at > org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:791) > at > org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1141) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:949) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645) > [2024-01-07 08:54:13,688] ERROR [i-0637ca8609f50425f-StreamThread-1] > stream-client [i-0637ca8609f50425f] Encountered the following exception > during processing and sent shutdown request for the entire application. > (org.apache.kafka.streams.KafkaStreams) > org.apache.kafka.streams.errors.StreamsException: > java.lang.IllegalStateException: No current assignment for partition > network-id-repartition-1 > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645) > Caused by: java.lang.IllegalStateException: No current assignment for > partition network-id-repartition-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.resume(SubscriptionState.java:753) > at > org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.resume(LegacyKafkaConsumer.java:963) > at > org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1524) > at > org.apache.kafka.streams.processor.internals.TaskManager.transitRestoredTaskToRunning(TaskManager.java:857) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleRestoredTasksFromStateUpdater(TaskManager.java:979) > at > org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:791) > at > org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1141) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:949) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) > ... 1 more {code} > Log (with some common messages filtered) attached. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16098: Verify pending recycle action when standby is re-assigned [kafka]
cadonna merged PR #15168: URL: https://github.com/apache/kafka/pull/15168 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14546 - Support Partitioner fallback to default [kafka]
jolshan commented on code in PR #14531: URL: https://github.com/apache/kafka/pull/14531#discussion_r1447664834 ## clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java: ## @@ -26,6 +26,15 @@ */ public interface Partitioner extends Configurable, Closeable { +/** + * Indicate if the given topic is handled. Returning {@code false} will cause the Producer to fallback to default partitioning. + * + * @param topic The topic name + */ +default boolean partitionsTopic(String topic) { +return true; Review Comment: I think the idea is that this would be implemented differently by different custom partitioners. As default though there is no change from current behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14546 - Support Partitioner fallback to default [kafka]
jolshan commented on PR #14531: URL: https://github.com/apache/kafka/pull/14531#issuecomment-1885235923 I think this change would require a KIP since it modifies the public API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15747: KRaft support in DynamicConnectionQuotaTest [kafka]
mimaison merged PR #15028: URL: https://github.com/apache/kafka/pull/15028 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15747) KRaft support in DynamicConnectionQuotaTest
[ https://issues.apache.org/jira/browse/KAFKA-15747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-15747. Fix Version/s: 3.8.0 Resolution: Fixed > KRaft support in DynamicConnectionQuotaTest > --- > > Key: KAFKA-15747 > URL: https://issues.apache.org/jira/browse/KAFKA-15747 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Priority: Minor > Labels: kraft, kraft-test, newbie > Fix For: 3.8.0 > > > The following tests in DynamicConnectionQuotaTest in > core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala > need to be updated to support KRaft > 77 : def testDynamicConnectionQuota(): Unit = { > 104 : def testDynamicListenerConnectionQuota(): Unit = { > 175 : def testDynamicListenerConnectionCreationRateQuota(): Unit = { > 237 : def testDynamicIpConnectionRateQuota(): Unit = { > Scanned 416 lines. Found 0 KRaft tests out of 4 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15747: KRaft support in DynamicConnectionQuotaTest [kafka]
mimaison commented on PR #15028: URL: https://github.com/apache/kafka/pull/15028#issuecomment-1885220999 Do you have a Jira id? so I can assign the [ticket](https://issues.apache.org/jira/browse/KAFKA-15747) to you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]
lucasbru commented on PR #15000: URL: https://github.com/apache/kafka/pull/15000#issuecomment-1885169869 In terms of implementation of the autocommit-interceptor in the application thread, I see three options: - Move auto-commit triggering to application thread. That seems cleanest to me (all commits are triggered from the application thread), but it's the largest refactoring. Then the interceptor is trivial. - Share the Invoker queue with the background thread, and access it when we register the auto-commit callback. It's already shared with the background thread implicitly as part of the commitrequest completed future. - Add a new background event in the background event queue. Seems unclean, since we already have a common datastructure shared between application & background thread for things to be invoked after a commit, and that is the "Invoker" (option 2). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-4759 Add support for IPv4 and IPv6 ranges in AclAuthorizer [kafka]
rgo commented on PR #9937: URL: https://github.com/apache/kafka/pull/9937#issuecomment-1885023082 > hi, I wonder is this feature implemented somewhere in another PR? Past month I received a PR in my branch to fix current conflicts. I will merge it as soon as possible. But as far I can said it got blocked because of the request of removing the external library. To be frank, it was rather discouraging the response time, making impossible any discussion (i.e., if the library should be used or not). Maybe things have changed, let's see. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15839) Topic ID integration in consumer subscription state
[ https://issues.apache.org/jira/browse/KAFKA-15839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-15839: --- Summary: Topic ID integration in consumer subscription state (was: Review topic ID integration in consumer subscription) > Topic ID integration in consumer subscription state > --- > > Key: KAFKA-15839 > URL: https://issues.apache.org/jira/browse/KAFKA-15839 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > Fix For: 3.8.0 > > > With the new consumer group protocol, assignments received by the consumer > contain topic IDs instead of topic names. Topic Ids are used in the > reconciliation path, integrated using TopicIdPartition. When reconciling, > topic names are resolved via a metadata update, but they are also kept in a > local #MembershipManager cache. This local cache serves the purpose of > keeping assigned topicId-names (that might have been deleted from metadata, > ex. topic deleted). > That's just an initial step towards spreading topic IDs internally in the > consumer code. Next step to address with this task would be to include topic > IDs in the subscription state, so that assigned topicId-names can be accessed > from other components without the need of resolving names multiple times. > Note that this task aims only at spreading topic IDs internally in the > consumer, no changes to expose them at the API level. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15839) Review topic ID integration in consumer subscription
[ https://issues.apache.org/jira/browse/KAFKA-15839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-15839: --- Description: With the new consumer group protocol, assignments received by the consumer contain topic IDs instead of topic names. Topic Ids are used in the reconciliation path, integrated using TopicIdPartition. When reconciling, topic names are resolved via a metadata update, but they are also kept in a local #MembershipManager cache. This local cache serves the purpose of keeping assigned topicId-names (that might have been deleted from metadata, ex. topic deleted). That's just an initial step towards spreading topic IDs internally in the consumer code. Next step to address with this task would be to include topic IDs in the subscription state, so that assigned topicId-names can be accessed from other components without the need of resolving names multiple times. Note that this task aims only at spreading topic IDs internally in the consumer, no changes to expose them at the API level. was: TopicIdPartition is currently used in the reconciliation path. Could be used more, just leaving topicPartitions when necessary for the callbacks and interaction with the subscription state that does not fully support topic ids yet Ensure that we properly handle topic re-creation (same name, diff topic IDs) in the reconciliation process (assignment cache, same assignment comparison, etc.) > Review topic ID integration in consumer subscription > > > Key: KAFKA-15839 > URL: https://issues.apache.org/jira/browse/KAFKA-15839 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > Fix For: 3.8.0 > > > With the new consumer group protocol, assignments received by the consumer > contain topic IDs instead of topic names. Topic Ids are used in the > reconciliation path, integrated using TopicIdPartition. When reconciling, > topic names are resolved via a metadata update, but they are also kept in a > local #MembershipManager cache. This local cache serves the purpose of > keeping assigned topicId-names (that might have been deleted from metadata, > ex. topic deleted). > That's just an initial step towards spreading topic IDs internally in the > consumer code. Next step to address with this task would be to include topic > IDs in the subscription state, so that assigned topicId-names can be accessed > from other components without the need of resolving names multiple times. > Note that this task aims only at spreading topic IDs internally in the > consumer, no changes to expose them at the API level. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15839) Review topic ID integration in consumer subscription
[ https://issues.apache.org/jira/browse/KAFKA-15839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-15839: --- Summary: Review topic ID integration in consumer subscription (was: Review topic ID integration in consumer reconciliation process) > Review topic ID integration in consumer subscription > > > Key: KAFKA-15839 > URL: https://issues.apache.org/jira/browse/KAFKA-15839 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > Fix For: 3.8.0 > > > TopicIdPartition is currently used in the reconciliation path. Could be used > more, just leaving topicPartitions when necessary for the callbacks and > interaction with the subscription state that does not fully support topic ids > yet > Ensure that we properly handle topic re-creation (same name, diff topic IDs) > in the reconciliation process (assignment cache, same assignment comparison, > etc.) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move OffsetConfig to group-coordinator module [kafka]
dajac commented on PR #15161: URL: https://github.com/apache/kafka/pull/15161#issuecomment-1884999821 That seems fine. We also have `GroupCoordinatorConfig`. I hope that we can eventually consolidate them but we can look into this afterwards. We already have a Jira for reworking the group coordinator configs: https://issues.apache.org/jira/browse/KAFKA-15089. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]
dajac commented on code in PR #15158: URL: https://github.com/apache/kafka/pull/15158#discussion_r1447491926 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.transaction; + +public class TransactionLogConfig { Review Comment: Yeah, creating a new module makes sense to me. I would call it `transaction-coordinator` and the jar would be called `kafka-transaction-coordinator` in order to follow the naming scheme of the `group-coordinator`. I am not sure if it really needs to depend on the group-coordinator module though but I haven't not checked the details. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15866) Refactor OffsetFetchRequestState Error handling to be more consistent with OffsetCommitRequestState
[ https://issues.apache.org/jira/browse/KAFKA-15866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15866. Fix Version/s: 3.7.0 (was: 3.8.0) Assignee: (was: Lan Ding) Resolution: Fixed > Refactor OffsetFetchRequestState Error handling to be more consistent with > OffsetCommitRequestState > --- > > Key: KAFKA-15866 > URL: https://issues.apache.org/jira/browse/KAFKA-15866 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.7.0 > > > The current OffsetFetchRequestState error handling uses nested if-else, which > is quite different, stylistically, to the OffsetCommitRequestState using a > switch statment. The latter is a bit more readable so we should refactor the > error handling using the same style to improve readability. > > A minor point: Some of the error handling seems inconsistent with the commit. > The logic was from the current implementation, so we should also review all > the error handling. For example: somehow the current logic doesn't mark the > coordinator unavailable when receiving COORDINATOR_NOT_AVAILABLE -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16032) Review client inconsistent error handling of OffsetFetch and OffsetCommit responses
[ https://issues.apache.org/jira/browse/KAFKA-16032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16032: --- Parent: KAFKA-14048 Issue Type: Sub-task (was: Bug) > Review client inconsistent error handling of OffsetFetch and OffsetCommit > responses > --- > > Key: KAFKA-16032 > URL: https://issues.apache.org/jira/browse/KAFKA-16032 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > OffsetFetch and OffsetCommit handle errors separately. There are 2 issues to > review around this: > - The logic is duplicated for some errors that are treated similarly (ex. > NOT_COORDINATOR) > - Some errors are not handled similarly in both requests (ex. > COORDINATOR_NOT_AVAILABLE handled and retried for OffsetCommit but not > OffsetFetch). Note that the specific errors handled by each request were kept > the same as in the legacy ConsumerCoordinator but this should be reviewed, in > an attempt to handle the same errors, in the same way, whenever possible. > Note that the legacy approach handles expected errors for each path (FETCH > and COMMIT), retrying on those when needed, but does not retry on unexpected > retriable errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16093: Fix spurious REST-related warnings on Connect startup [kafka]
C0urante commented on code in PR #15149: URL: https://github.com/apache/kafka/pull/15149#discussion_r1447487472 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java: ## @@ -41,18 +41,11 @@ public class HerderRequestHandler { private final RestClient restClient; -private volatile long requestTimeoutMs; +private final RestRequestTimeout requestTimeout; Review Comment: Ah, gotcha. The intent was to make it thread-safe since it's likely that writes and reads to/of that value will take place on different threads. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]
ijuma commented on code in PR #15158: URL: https://github.com/apache/kafka/pull/15158#discussion_r1447483283 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.transaction; + +public class TransactionLogConfig { Review Comment: Let's see what @dajac says, but I think a `kafka-transactions` or `kafka-transactions-coordinator` module that depends on `kafka-group-coordinator` makes sense to me. The former option is shorter and as clear while the second follows the same pattern as for the group coordinator -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16093: Fix spurious REST-related warnings on Connect startup [kafka]
vamossagar12 commented on code in PR #15149: URL: https://github.com/apache/kafka/pull/15149#discussion_r1447480579 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestRequestTimeout.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.rest; + +public interface RestRequestTimeout { Review Comment: Ok. Makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16093: Fix spurious REST-related warnings on Connect startup [kafka]
vamossagar12 commented on PR #15149: URL: https://github.com/apache/kafka/pull/15149#issuecomment-1884969533 @C0urante , hmm okay. I understand those weren't blocker comments and I called them as nits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16093: Fix spurious REST-related warnings on Connect startup [kafka]
vamossagar12 commented on code in PR #15149: URL: https://github.com/apache/kafka/pull/15149#discussion_r1447468334 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java: ## @@ -41,18 +41,11 @@ public class HerderRequestHandler { private final RestClient restClient; -private volatile long requestTimeoutMs; +private final RestRequestTimeout requestTimeout; Review Comment: I didn't intend to mark the final field as volatile. I wanted to know why the field was marked as volatile in the old PR since you had made the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16107) Ensure consumer does not start fetching from added partitions until onPartitionsAssigned completes
[ https://issues.apache.org/jira/browse/KAFKA-16107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16107: --- Summary: Ensure consumer does not start fetching from added partitions until onPartitionsAssigned completes (was: Ensure consumer does not start fetching from added partitions until onPartitionsAssgined completes) > Ensure consumer does not start fetching from added partitions until > onPartitionsAssigned completes > -- > > Key: KAFKA-16107 > URL: https://issues.apache.org/jira/browse/KAFKA-16107 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > > In the new consumer implementation, when new partitions are assigned, the > subscription state is updated and then the #onPartitionsAssigned triggered. > This sequence seems sensible but we need to ensure that no data is fetched > until the onPartitionsAssigned completes (where the user could be setting the > committed offsets it want to start fetching from). > We should pause the partitions newly added partitions until > onPartitionsAssigned completes, similar to how it's done on revocation to > avoid positions getting ahead of the committed offsets. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16052) OOM in Kafka test suite
[ https://issues.apache.org/jira/browse/KAFKA-16052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-16052: - Fix Version/s: 3.8.0 > OOM in Kafka test suite > --- > > Key: KAFKA-16052 > URL: https://issues.apache.org/jira/browse/KAFKA-16052 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Divij Vaidya >Priority: Major > Fix For: 3.8.0 > > Attachments: Screenshot 2023-12-27 at 14.04.52.png, Screenshot > 2023-12-27 at 14.22.21.png, Screenshot 2023-12-27 at 14.45.20.png, Screenshot > 2023-12-27 at 15.31.09.png, Screenshot 2023-12-27 at 17.44.09.png, Screenshot > 2023-12-28 at 00.13.06.png, Screenshot 2023-12-28 at 00.18.56.png, Screenshot > 2023-12-28 at 11.26.03.png, Screenshot 2023-12-28 at 11.26.09.png, Screenshot > 2023-12-28 at 18.44.19.png, Screenshot 2024-01-10 at 14.59.47.png, newRM.patch > > > *Problem* > Our test suite is failing with frequent OOM. Discussion in the mailing list > is here: [https://lists.apache.org/thread/d5js0xpsrsvhgjb10mbzo9cwsy8087x4] > *Setup* > To find the source of leaks, I ran the :core:test build target with a single > thread (see below on how to do it) and attached a profiler to it. This Jira > tracks the list of action items identified from the analysis. > How to run tests using a single thread: > {code:java} > diff --git a/build.gradle b/build.gradle > index f7abbf4f0b..81df03f1ee 100644 > --- a/build.gradle > +++ b/build.gradle > @@ -74,9 +74,8 @@ ext { > "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED" > )- maxTestForks = project.hasProperty('maxParallelForks') ? > maxParallelForks.toInteger() : Runtime.runtime.availableProcessors() > - maxScalacThreads = project.hasProperty('maxScalacThreads') ? > maxScalacThreads.toInteger() : > - Math.min(Runtime.runtime.availableProcessors(), 8) > + maxTestForks = 1 > + maxScalacThreads = 1 > userIgnoreFailures = project.hasProperty('ignoreFailures') ? > ignoreFailures : false userMaxTestRetries = > project.hasProperty('maxTestRetries') ? maxTestRetries.toInteger() : 0 > diff --git a/gradle.properties b/gradle.properties > index 4880248cac..ee4b6e3bc1 100644 > --- a/gradle.properties > +++ b/gradle.properties > @@ -30,4 +30,4 @@ scalaVersion=2.13.12 > swaggerVersion=2.2.8 > task=build > org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC > -org.gradle.parallel=true > +org.gradle.parallel=false {code} > *Result of experiment* > This is how the heap memory utilized looks like, starting from tens of MB to > ending with 1.5GB (with spikes of 2GB) of heap being used as the test > executes. Note that the total number of threads also increases but it does > not correlate with sharp increase in heap memory usage. The heap dump is > available at > [https://www.dropbox.com/scl/fi/nwtgc6ir6830xlfy9z9cu/GradleWorkerMain_10311_27_12_2023_13_37_08.hprof.zip?rlkey=ozbdgh5vih4rcynnxbatzk7ln&dl=0] > > !Screenshot 2023-12-27 at 14.22.21.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16052) OOM in Kafka test suite
[ https://issues.apache.org/jira/browse/KAFKA-16052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805163#comment-17805163 ] Divij Vaidya commented on KAFKA-16052: -- On current trunk, the heap "at max" goes to 1200MB compared to 1800MB provided in the description. I have also not seen any OOM for CI for quite a while now based on [https://ge.apache.org/scans/failures?failures.failureClassification=all_failures&failures.failureMessage=Execution%20failed%20for%20task%20%27:tools:test%27.%0A%3E%20Process%20%27Gradle%20Test%20Executor%2096%27%20finished%20with%20non-zero%20exit%20value%201%0A%20%20This%20problem%20might%20be%20caused%20by%20incorrect%20test%20process%20configuration.%0A%20%20For%20more%20on%20test%20execution%2C%20please%20refer%20to%20https:%2F%2Fdocs.gradle.org%2F8.5%2Fuserguide%2Fjava_testing.html%23sec:test_execution%20in%20the%20Gradle%20documentation.&search.rootProjectNames=kafka&search.tags=trunk&search.timeZoneId=Europe%2FBerlin] Also, notice the drastic decrease in number of threads in the test (right graph) due to fixes made here. At this stage, I am resolving this Jira based on the above. We have some future looking tasks at [https://github.com/apache/kafka/pull/15101] to fix this permanently. !Screenshot 2024-01-10 at 14.59.47.png! > OOM in Kafka test suite > --- > > Key: KAFKA-16052 > URL: https://issues.apache.org/jira/browse/KAFKA-16052 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Divij Vaidya >Priority: Major > Attachments: Screenshot 2023-12-27 at 14.04.52.png, Screenshot > 2023-12-27 at 14.22.21.png, Screenshot 2023-12-27 at 14.45.20.png, Screenshot > 2023-12-27 at 15.31.09.png, Screenshot 2023-12-27 at 17.44.09.png, Screenshot > 2023-12-28 at 00.13.06.png, Screenshot 2023-12-28 at 00.18.56.png, Screenshot > 2023-12-28 at 11.26.03.png, Screenshot 2023-12-28 at 11.26.09.png, Screenshot > 2023-12-28 at 18.44.19.png, Screenshot 2024-01-10 at 14.59.47.png, newRM.patch > > > *Problem* > Our test suite is failing with frequent OOM. Discussion in the mailing list > is here: [https://lists.apache.org/thread/d5js0xpsrsvhgjb10mbzo9cwsy8087x4] > *Setup* > To find the source of leaks, I ran the :core:test build target with a single > thread (see below on how to do it) and attached a profiler to it. This Jira > tracks the list of action items identified from the analysis. > How to run tests using a single thread: > {code:java} > diff --git a/build.gradle b/build.gradle > index f7abbf4f0b..81df03f1ee 100644 > --- a/build.gradle > +++ b/build.gradle > @@ -74,9 +74,8 @@ ext { > "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED" > )- maxTestForks = project.hasProperty('maxParallelForks') ? > maxParallelForks.toInteger() : Runtime.runtime.availableProcessors() > - maxScalacThreads = project.hasProperty('maxScalacThreads') ? > maxScalacThreads.toInteger() : > - Math.min(Runtime.runtime.availableProcessors(), 8) > + maxTestForks = 1 > + maxScalacThreads = 1 > userIgnoreFailures = project.hasProperty('ignoreFailures') ? > ignoreFailures : false userMaxTestRetries = > project.hasProperty('maxTestRetries') ? maxTestRetries.toInteger() : 0 > diff --git a/gradle.properties b/gradle.properties > index 4880248cac..ee4b6e3bc1 100644 > --- a/gradle.properties > +++ b/gradle.properties > @@ -30,4 +30,4 @@ scalaVersion=2.13.12 > swaggerVersion=2.2.8 > task=build > org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC > -org.gradle.parallel=true > +org.gradle.parallel=false {code} > *Result of experiment* > This is how the heap memory utilized looks like, starting from tens of MB to > ending with 1.5GB (with spikes of 2GB) of heap being used as the test > executes. Note that the total number of threads also increases but it does > not correlate with sharp increase in heap memory usage. The heap dump is > available at > [https://www.dropbox.com/scl/fi/nwtgc6ir6830xlfy9z9cu/GradleWorkerMain_10311_27_12_2023_13_37_08.hprof.zip?rlkey=ozbdgh5vih4rcynnxbatzk7ln&dl=0] > > !Screenshot 2023-12-27 at 14.22.21.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16108) Backport fix for KAFKA-16093 to 3.7
Chris Egerton created KAFKA-16108: - Summary: Backport fix for KAFKA-16093 to 3.7 Key: KAFKA-16108 URL: https://issues.apache.org/jira/browse/KAFKA-16108 Project: Kafka Issue Type: Improvement Components: connect Reporter: Chris Egerton Assignee: Chris Egerton Fix For: 3.7.1 A fix for KAFKA-16093 is present on the branches trunk (the version for which is currently 3.8.0-SNAPSHOT) and 3.6. We are in code freeze for the 3.7.0 release, and this issue is not a blocker, so it cannot be backported right now. We should backport the fix once 3.7.0 has been released and before 3.7.1 is released. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16107) Ensure consumer does not start fetching from added partitions until onPartitionsAssgined completes
Lianet Magrans created KAFKA-16107: -- Summary: Ensure consumer does not start fetching from added partitions until onPartitionsAssgined completes Key: KAFKA-16107 URL: https://issues.apache.org/jira/browse/KAFKA-16107 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans In the new consumer implementation, when new partitions are assigned, the subscription state is updated and then the #onPartitionsAssigned triggered. This sequence seems sensible but we need to ensure that no data is fetched until the onPartitionsAssigned completes (where the user could be setting the committed offsets it want to start fetching from). We should pause the partitions newly added partitions until onPartitionsAssigned completes, similar to how it's done on revocation to avoid positions getting ahead of the committed offsets. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16093) Spurious warnings logged to stderr about empty path annotations and providers not implementing provider interfaces
[ https://issues.apache.org/jira/browse/KAFKA-16093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-16093: -- Fix Version/s: 3.6.2 > Spurious warnings logged to stderr about empty path annotations and providers > not implementing provider interfaces > -- > > Key: KAFKA-16093 > URL: https://issues.apache.org/jira/browse/KAFKA-16093 > Project: Kafka > Issue Type: Improvement > Components: connect >Affects Versions: 3.6.0, 3.7.0, 3.6.1, 3.6.2, 3.8.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Minor > Fix For: 3.6.2, 3.8.0 > > > Some warnings get logged to stderr on Connect startup. For example: > {quote}Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.RootResource registered in > SERVER runtime does not implement any provider interfaces applicable in the > SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. > Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered > in SERVER runtime does not implement any provider interfaces applicable in > the SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be > ignored. > Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource > registered in SERVER runtime does not implement any provider interfaces > applicable in the SERVER runtime. Due to constraint configuration problems > the provider > org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource will > be ignored. > Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > registered in SERVER runtime does not implement any provider interfaces > applicable in the SERVER runtime. Due to constraint configuration problems > the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will > be ignored. > Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in > SERVER runtime does not implement any provider interfaces applicable in the > SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be > ignored. > Jan 08, 2024 1:48:19 PM org.glassfish.jersey.internal.Errors logErrors > WARNING: The following warnings have been detected: WARNING: The > (sub)resource method listLoggers in > org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains > empty path annotation. > WARNING: The (sub)resource method listConnectors in > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains > empty path annotation. > WARNING: The (sub)resource method createConnector in > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains > empty path annotation. > WARNING: The (sub)resource method listConnectorPlugins in > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > contains empty path annotation. > WARNING: The (sub)resource method serverInfo in > org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty > path annotation. > {quote} > These are benign, but can confuse and even frighten new users. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16052) OOM in Kafka test suite
[ https://issues.apache.org/jira/browse/KAFKA-16052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-16052: - Attachment: Screenshot 2024-01-10 at 14.59.47.png > OOM in Kafka test suite > --- > > Key: KAFKA-16052 > URL: https://issues.apache.org/jira/browse/KAFKA-16052 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Divij Vaidya >Priority: Major > Attachments: Screenshot 2023-12-27 at 14.04.52.png, Screenshot > 2023-12-27 at 14.22.21.png, Screenshot 2023-12-27 at 14.45.20.png, Screenshot > 2023-12-27 at 15.31.09.png, Screenshot 2023-12-27 at 17.44.09.png, Screenshot > 2023-12-28 at 00.13.06.png, Screenshot 2023-12-28 at 00.18.56.png, Screenshot > 2023-12-28 at 11.26.03.png, Screenshot 2023-12-28 at 11.26.09.png, Screenshot > 2023-12-28 at 18.44.19.png, Screenshot 2024-01-10 at 14.59.47.png, newRM.patch > > > *Problem* > Our test suite is failing with frequent OOM. Discussion in the mailing list > is here: [https://lists.apache.org/thread/d5js0xpsrsvhgjb10mbzo9cwsy8087x4] > *Setup* > To find the source of leaks, I ran the :core:test build target with a single > thread (see below on how to do it) and attached a profiler to it. This Jira > tracks the list of action items identified from the analysis. > How to run tests using a single thread: > {code:java} > diff --git a/build.gradle b/build.gradle > index f7abbf4f0b..81df03f1ee 100644 > --- a/build.gradle > +++ b/build.gradle > @@ -74,9 +74,8 @@ ext { > "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED" > )- maxTestForks = project.hasProperty('maxParallelForks') ? > maxParallelForks.toInteger() : Runtime.runtime.availableProcessors() > - maxScalacThreads = project.hasProperty('maxScalacThreads') ? > maxScalacThreads.toInteger() : > - Math.min(Runtime.runtime.availableProcessors(), 8) > + maxTestForks = 1 > + maxScalacThreads = 1 > userIgnoreFailures = project.hasProperty('ignoreFailures') ? > ignoreFailures : false userMaxTestRetries = > project.hasProperty('maxTestRetries') ? maxTestRetries.toInteger() : 0 > diff --git a/gradle.properties b/gradle.properties > index 4880248cac..ee4b6e3bc1 100644 > --- a/gradle.properties > +++ b/gradle.properties > @@ -30,4 +30,4 @@ scalaVersion=2.13.12 > swaggerVersion=2.2.8 > task=build > org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC > -org.gradle.parallel=true > +org.gradle.parallel=false {code} > *Result of experiment* > This is how the heap memory utilized looks like, starting from tens of MB to > ending with 1.5GB (with spikes of 2GB) of heap being used as the test > executes. Note that the total number of threads also increases but it does > not correlate with sharp increase in heap memory usage. The heap dump is > available at > [https://www.dropbox.com/scl/fi/nwtgc6ir6830xlfy9z9cu/GradleWorkerMain_10311_27_12_2023_13_37_08.hprof.zip?rlkey=ozbdgh5vih4rcynnxbatzk7ln&dl=0] > > !Screenshot 2023-12-27 at 14.22.21.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)