Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


dajac commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1448416633


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,57 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception 
if the

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Reopened] (KAFKA-15538) Client support for java regex based subscription

2024-01-10 Thread Phuc Hong Tran (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phuc Hong Tran reopened KAFKA-15538:


> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15538) Client support for java regex based subscription

2024-01-10 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805397#comment-17805397
 ] 

Phuc Hong Tran commented on KAFKA-15538:


hm nvm let's me double check

> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15538) Client support for java regex based subscription

2024-01-10 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805393#comment-17805393
 ] 

Phuc Hong Tran edited comment on KAFKA-15538 at 1/11/24 6:51 AM:
-

Look like this has already been done in 
[https://github.com/apache/kafka/pull/14638] [~lianetm] 


was (Author: JIRAUSER301295):
Look like this has already been done in 
[https://github.com/apache/kafka/pull/14638.] [~lianetm] 

> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15538) Client support for java regex based subscription

2024-01-10 Thread Phuc Hong Tran (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phuc Hong Tran resolved KAFKA-15538.

Resolution: Fixed

> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15538) Client support for java regex based subscription

2024-01-10 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805393#comment-17805393
 ] 

Phuc Hong Tran commented on KAFKA-15538:


Look like this has already been done in 
[https://github.com/apache/kafka/pull/14638.] [~lianetm] 

> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16114) Fix partiton not retention after cancel alter intra broker log dir task

2024-01-10 Thread wangliucheng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wangliucheng updated KAFKA-16114:
-
Description: 
The deletion thread will not work on partition after cancel alter intra broker 
log dir task 

The steps to reproduce are as follows:
1、Create reassignment.json file
test01-1 on the /data01/kafka/log01 directory of the broker 1003,then move to 
/data01/kafka/log02 

{
    "version": 1,    
    "partitions": [
        {
            "topic": "test01",  
            "partition": 1,
            "replicas": [1001,1003],
            "log_dirs": ["any","/data01/kafka/log02"]
        }
    ]
}

2、Kick off the reassignment
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 
--reassignment-json-file reassignment.json  --execute

3、Cancel the reassignment
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 
--reassignment-json-file reassignment.json  --cancel

4、Result, The partition test01-1 on 1003 will not be deleted 

The reason for this problem is the partition has been filtered:
{code:java}
val deletableLogs = logs.filter {
  case (_, log) => !log.config.compact // pick non-compacted logs
}.filterNot {
  case (topicPartition, _) => inProgress.contains(topicPartition) // skip any 
logs already in-progress
} {code}

> Fix partiton not retention after cancel alter intra broker log dir task 
> 
>
> Key: KAFKA-16114
> URL: https://issues.apache.org/jira/browse/KAFKA-16114
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.3.2, 3.6.1
>Reporter: wangliucheng
>Priority: Major
>
> The deletion thread will not work on partition after cancel alter intra 
> broker log dir task 
> The steps to reproduce are as follows:
> 1、Create reassignment.json file
> test01-1 on the /data01/kafka/log01 directory of the broker 1003,then move to 
> /data01/kafka/log02 
> {
>     "version": 1,    
>     "partitions": [
>         {
>             "topic": "test01",  
>             "partition": 1,
>             "replicas": [1001,1003],
>             "log_dirs": ["any","/data01/kafka/log02"]
>         }
>     ]
> }
> 2、Kick off the reassignment
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 
> --reassignment-json-file reassignment.json  --execute
> 3、Cancel the reassignment
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 
> --reassignment-json-file reassignment.json  --cancel
> 4、Result, The partition test01-1 on 1003 will not be deleted 
> The reason for this problem is the partition has been filtered:
> {code:java}
> val deletableLogs = logs.filter {
>   case (_, log) => !log.config.compact // pick non-compacted logs
> }.filterNot {
>   case (topicPartition, _) => inProgress.contains(topicPartition) // skip any 
> logs already in-progress
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16114) Fix partiton not retention after cancel alter intra broker log dir task

2024-01-10 Thread wangliucheng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wangliucheng updated KAFKA-16114:
-
Description: 
The deletion thread will not work on partition after cancel alter intra broker 
log dir task 

The steps to reproduce are as follows:
1、Create reassignment.json file
test01-1 on the /data01/kafka/log01 directory of the broker 1003,then move to 
/data01/kafka/log02 
{code:java}
{
    "version": 1,    
    "partitions": [
        {
            "topic": "test01",  
            "partition": 1,
            "replicas": [1001,1003],
            "log_dirs": ["any","/data01/kafka/log02"]
        }
    ]
}{code}
2、Kick off the reassignment
{code:java}
bin/kafka-reassign-partitions.sh -bootstrap-server localhost:9092 
--reassignment-json-file reassignment.json  -execute {code}
3、Cancel the reassignment
{code:java}
bin/kafka-reassign-partitions.sh -bootstrap-server localhost:9092 
--reassignment-json-file reassignment.json  -cancel {code}
4、Result, The partition test01-1 on 1003 will not be deleted 

The reason for this problem is the partition has been filtered:
{code:java}
val deletableLogs = logs.filter {
  case (_, log) => !log.config.compact // pick non-compacted logs
}.filterNot {
  case (topicPartition, _) => inProgress.contains(topicPartition) // skip any 
logs already in-progress
} {code}

  was:
The deletion thread will not work on partition after cancel alter intra broker 
log dir task 

The steps to reproduce are as follows:
1、Create reassignment.json file
test01-1 on the /data01/kafka/log01 directory of the broker 1003,then move to 
/data01/kafka/log02 

{
    "version": 1,    
    "partitions": [
        {
            "topic": "test01",  
            "partition": 1,
            "replicas": [1001,1003],
            "log_dirs": ["any","/data01/kafka/log02"]
        }
    ]
}

2、Kick off the reassignment
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 
--reassignment-json-file reassignment.json  --execute

3、Cancel the reassignment
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 
--reassignment-json-file reassignment.json  --cancel

4、Result, The partition test01-1 on 1003 will not be deleted 

The reason for this problem is the partition has been filtered:
{code:java}
val deletableLogs = logs.filter {
  case (_, log) => !log.config.compact // pick non-compacted logs
}.filterNot {
  case (topicPartition, _) => inProgress.contains(topicPartition) // skip any 
logs already in-progress
} {code}


> Fix partiton not retention after cancel alter intra broker log dir task 
> 
>
> Key: KAFKA-16114
> URL: https://issues.apache.org/jira/browse/KAFKA-16114
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.3.2, 3.6.1
>Reporter: wangliucheng
>Priority: Major
>
> The deletion thread will not work on partition after cancel alter intra 
> broker log dir task 
> The steps to reproduce are as follows:
> 1、Create reassignment.json file
> test01-1 on the /data01/kafka/log01 directory of the broker 1003,then move to 
> /data01/kafka/log02 
> {code:java}
> {
>     "version": 1,    
>     "partitions": [
>         {
>             "topic": "test01",  
>             "partition": 1,
>             "replicas": [1001,1003],
>             "log_dirs": ["any","/data01/kafka/log02"]
>         }
>     ]
> }{code}
> 2、Kick off the reassignment
> {code:java}
> bin/kafka-reassign-partitions.sh -bootstrap-server localhost:9092 
> --reassignment-json-file reassignment.json  -execute {code}
> 3、Cancel the reassignment
> {code:java}
> bin/kafka-reassign-partitions.sh -bootstrap-server localhost:9092 
> --reassignment-json-file reassignment.json  -cancel {code}
> 4、Result, The partition test01-1 on 1003 will not be deleted 
> The reason for this problem is the partition has been filtered:
> {code:java}
> val deletableLogs = logs.filter {
>   case (_, log) => !log.config.compact // pick non-compacted logs
> }.filterNot {
>   case (topicPartition, _) => inProgress.contains(topicPartition) // skip any 
> logs already in-progress
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16114) Fix partiton not retention after cancel alter intra broker log dir task

2024-01-10 Thread wangliucheng (Jira)
wangliucheng created KAFKA-16114:


 Summary: Fix partiton not retention after cancel alter intra 
broker log dir task 
 Key: KAFKA-16114
 URL: https://issues.apache.org/jira/browse/KAFKA-16114
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 3.6.1, 3.3.2
Reporter: wangliucheng






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15720: KRaft support in DeleteTopicTest [kafka]

2024-01-10 Thread via GitHub


dengziming commented on code in PR #14846:
URL: https://github.com/apache/kafka/pull/14846#discussion_r1448306793


##
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##
@@ -425,8 +539,10 @@ class DeleteTopicTest extends QuorumTestHarness {
   */
 
 val replicaAssignment = Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2))
-val topic = "test"
-servers = createTestTopicAndCluster(topic, true, replicaAssignment)
+val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnectOrNull, 
enableControlledShutdown = false)

Review Comment:
   Why not use `createTestTopicAndCluster` directly?



##
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##
@@ -19,78 +19,95 @@ package kafka.admin
 import java.util
 import java.util.concurrent.ExecutionException
 import java.util.{Collections, Optional, Properties}
-
 import scala.collection.Seq
 import kafka.log.UnifiedLog
 import kafka.zk.TopicPartitionZNode
-import kafka.utils.TestUtils
-import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness}
+import kafka.utils._
+import kafka.server.{KafkaBroker, KafkaConfig, KafkaServer, QuorumTestHarness}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
-import kafka.common.TopicAlreadyMarkedForDeletionException
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 import kafka.controller.{OfflineReplica, PartitionAndReplica, 
ReplicaAssignment, ReplicaDeletionSuccessful}
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
NewPartitionReassignment, NewPartitions}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
+import org.apache.kafka.common.errors.{TopicDeletionDisabledException, 
UnknownTopicOrPartitionException}
+import org.apache.kafka.metadata.BrokerState
+
 import scala.jdk.CollectionConverters._
 
 class DeleteTopicTest extends QuorumTestHarness {
 
+  var brokers: Seq[KafkaBroker] = Seq()
+
   var servers: Seq[KafkaServer] = Seq()

Review Comment:
   I thinks it's unnecessary to keep both servers and brokers, we can use 
`KafkaBroker` in most case, or cast it to `KafkaServer` if necessary



##
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##
@@ -354,39 +433,54 @@ class DeleteTopicTest extends QuorumTestHarness {
server.logManager.cleaner.awaitCleaned(new TopicPartition(topicName, 0), 0)
 
 // delete topic
-adminZkClient.deleteTopic("test")
-TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers)
+admin.deleteTopics(Collections.singletonList(topic)).all().get()
+TestUtils.verifyTopicDeletion(zkClientOrNull, "test", 1, brokers)
   }
 
-  @Test
-  def testDeleteTopicAlreadyMarkedAsDeleted(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteTopicAlreadyMarkedAsDeleted(quorum: String): Unit = {
 val topicPartition = new TopicPartition("test", 0)
 val topic = topicPartition.topic
-servers = createTestTopicAndCluster(topic)
+brokers = createTestTopicAndCluster(topic)
 // start topic deletion
-adminZkClient.deleteTopic(topic)
+admin.deleteTopics(Collections.singletonList(topic)).all().get()
 // try to delete topic marked as deleted
-assertThrows(classOf[TopicAlreadyMarkedForDeletionException], () => 
adminZkClient.deleteTopic(topic))
+// start topic deletion
+TestUtils.waitUntilTrue(() => {
+  try {
+admin.deleteTopics(Collections.singletonList(topic)).all().get()
+false
+  } catch {
+case e: ExecutionException =>
+  classOf[UnknownTopicOrPartitionException].equals(e.getCause.getClass)
+  }
+}, s"Topic ${topic} should be marked for deletion or already deleted.")
 
-TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers)
   }
 
-  private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: 
Boolean = true, replicaAssignment: Map[Int, List[Int]] = 
expectedReplicaAssignment): Seq[KafkaServer] = {
-val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, 
enableControlledShutdown = false)
+  private def createTestTopicAndCluster(topic: String, numOfConfigs: Int = 3, 
deleteTopicEnabled: Boolean = true, replicaAssignment: Map[Int, List[Int]] = 
expectedReplicaAssignment): Seq[KafkaBroker] = {
+val brokerConfigs = TestUtils.createBrokerConfigs(numOfConfigs, 
zkConnectOrNull, enableControlledShutdown = false)
 brokerConfigs.foreach(_.setProperty("delete.topic.enable", 
deleteTopicEnabled.toString))
 createTestTopicAndCluster(topic, brokerConfigs, replicaAssignment)
   }
 
-  private def 

Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-10 Thread via GitHub


CalvinConfluent commented on PR #14612:
URL: https://github.com/apache/kafka/pull/14612#issuecomment-1886258250

   @mumrah @artemlivshits 
   Refactored the code with the following major changes
   1. Extract the request handling from the KafkaApis.scala.
   2. Performance optimization. Now it loops twice the topic list and avoids 
unnecessary copies.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-10 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1448300820


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1409,6 +1426,77 @@ class KafkaApis(val requestChannel: RequestChannel,
   ))
   }
 
+  def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): 
Unit = {
+metadataCache match {
+  case _: ZkMetadataCache =>
+throw new InvalidRequestException("ZK cluster does not handle 
DescribeTopicPartitions request")
+  case _ =>
+}
+val KRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache]
+
+val describeTopicPartitionsRequest = 
request.body[DescribeTopicPartitionsRequest].data()
+var topics = scala.collection.mutable.Set[String]()
+describeTopicPartitionsRequest.topics().forEach(topic => 
topics.add(topic.name()))
+
+val cursor = describeTopicPartitionsRequest.cursor()
+val fetchAllTopics = topics.isEmpty
+if (fetchAllTopics) {
+  metadataCache.getAllTopics().foreach(topic => topics.add(topic))

Review Comment:
   Refactored with option (a).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-10 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1448300548


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -189,6 +255,86 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 }
   }
 
+  /**
+   * Get the topic metadata for the given topics.
+   *
+   * The quota is used to limit the number of partitions to return. The 
NextTopicPartition field points to the first
+   * partition can't be returned due the limit.
+   * If a topic can't return any partition due to quota limit reached, this 
topic will not be included in the response.
+   *
+   * Note, the topics should be sorted in alphabetical order. The topics in 
the DescribeTopicPartitionsResponseData
+   * will also be sorted in alphabetical order.
+   *
+   * @param topicsThe set of topics and their 
corresponding first partition id to fetch.
+   * @param listenerName  The listener name.
+   * @param firstTopicPartitionStartIndex The start partition index for the 
first topic
+   * @param maximumNumberOfPartitions The max number of partitions to 
return.
+   */
+  def getTopicMetadataForDescribeTopicResponse(
+topics: Seq[String],
+listenerName: ListenerName,
+firstTopicPartitionStartIndex: Int,
+maximumNumberOfPartitions: Int
+  ): DescribeTopicPartitionsResponseData = {
+val image = _currentImage
+var remaining = maximumNumberOfPartitions
+var startIndex = firstTopicPartitionStartIndex
+val result = new DescribeTopicPartitionsResponseData()
+topics.foreach { topicName =>
+  if (remaining > 0) {
+val partitionResponse = 
getPartitionMetadataForDescribeTopicResponse(image, topicName, listenerName)
+partitionResponse.map( partitions => {
+  val upperIndex = startIndex + remaining
+  val response = new DescribeTopicPartitionsResponseTopic()
+.setErrorCode(Errors.NONE.code)
+.setName(topicName)
+
.setTopicId(Option(image.topics().getTopic(topicName).id()).getOrElse(Uuid.ZERO_UUID))
+.setIsInternal(Topic.isInternal(topicName))
+.setPartitions(partitions.filter(partition => {
+  partition.partitionIndex() >= startIndex && 
partition.partitionIndex() < upperIndex
+}).asJava)
+  remaining -= response.partitions().size()
+  result.topics().add(response)
+
+  if (upperIndex < partitions.size) {
+result.setNextCursor(new Cursor()
+  .setTopicName(topicName)
+  .setPartitionIndex(upperIndex)
+)
+remaining = -1
+  }
+})
+
+// start index only applies to the first topic. Reset it here.
+startIndex = 0
+
+if (!partitionResponse.isDefined) {
+  val error = try {
+Topic.validate(topicName)
+Errors.UNKNOWN_TOPIC_OR_PARTITION

Review Comment:
   Yes, the cursor topic can be deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-10 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1448300157


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -189,6 +255,86 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 }
   }
 
+  /**
+   * Get the topic metadata for the given topics.
+   *
+   * The quota is used to limit the number of partitions to return. The 
NextTopicPartition field points to the first
+   * partition can't be returned due the limit.
+   * If a topic can't return any partition due to quota limit reached, this 
topic will not be included in the response.
+   *
+   * Note, the topics should be sorted in alphabetical order. The topics in 
the DescribeTopicPartitionsResponseData
+   * will also be sorted in alphabetical order.
+   *
+   * @param topicsThe set of topics and their 
corresponding first partition id to fetch.
+   * @param listenerName  The listener name.
+   * @param firstTopicPartitionStartIndex The start partition index for the 
first topic
+   * @param maximumNumberOfPartitions The max number of partitions to 
return.
+   */
+  def getTopicMetadataForDescribeTopicResponse(
+topics: Seq[String],
+listenerName: ListenerName,
+firstTopicPartitionStartIndex: Int,
+maximumNumberOfPartitions: Int
+  ): DescribeTopicPartitionsResponseData = {
+val image = _currentImage
+var remaining = maximumNumberOfPartitions
+var startIndex = firstTopicPartitionStartIndex
+val result = new DescribeTopicPartitionsResponseData()
+topics.foreach { topicName =>

Review Comment:
   Refactored, now it will break early if quota reached.



##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -189,6 +255,86 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 }
   }
 
+  /**
+   * Get the topic metadata for the given topics.
+   *
+   * The quota is used to limit the number of partitions to return. The 
NextTopicPartition field points to the first
+   * partition can't be returned due the limit.
+   * If a topic can't return any partition due to quota limit reached, this 
topic will not be included in the response.
+   *
+   * Note, the topics should be sorted in alphabetical order. The topics in 
the DescribeTopicPartitionsResponseData
+   * will also be sorted in alphabetical order.
+   *
+   * @param topicsThe set of topics and their 
corresponding first partition id to fetch.
+   * @param listenerName  The listener name.
+   * @param firstTopicPartitionStartIndex The start partition index for the 
first topic
+   * @param maximumNumberOfPartitions The max number of partitions to 
return.
+   */
+  def getTopicMetadataForDescribeTopicResponse(
+topics: Seq[String],
+listenerName: ListenerName,
+firstTopicPartitionStartIndex: Int,
+maximumNumberOfPartitions: Int
+  ): DescribeTopicPartitionsResponseData = {
+val image = _currentImage
+var remaining = maximumNumberOfPartitions
+var startIndex = firstTopicPartitionStartIndex
+val result = new DescribeTopicPartitionsResponseData()
+topics.foreach { topicName =>

Review Comment:
   Refactored, now it will break early if quota is reached.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-10 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1448299281


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 }
   }
 
+  private def getPartitionMetadataForDescribeTopicResponse(
+image: MetadataImage,
+topicName: String,
+listenerName: ListenerName

Review Comment:
   Added the suggested logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-10 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1448299150


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -140,6 +141,71 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
 }
   }
 
+  private def getPartitionMetadataForDescribeTopicResponse(
+image: MetadataImage,
+topicName: String,
+listenerName: ListenerName
+  ): Option[List[DescribeTopicPartitionsResponsePartition]] = {
+Option(image.topics().getTopic(topicName)) match {
+  case None => None
+  case Some(topic) => {
+val partitions = Some(topic.partitions().entrySet().asScala.map { 
entry =>
+  val partitionId = entry.getKey
+  val partition = entry.getValue
+  val filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas,
+listenerName, false)
+  val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, 
listenerName,
+false)
+  val offlineReplicas = getOfflineReplicas(image, partition, 
listenerName)
+  val maybeLeader = getAliveEndpoint(image, partition.leader, 
listenerName)
+  maybeLeader match {
+case None =>
+  val error = if 
(!image.cluster().brokers.containsKey(partition.leader)) {
+debug(s"Error while fetching metadata for 
$topicName-$partitionId: leader not available")
+Errors.LEADER_NOT_AVAILABLE
+  } else {
+debug(s"Error while fetching metadata for 
$topicName-$partitionId: listener $listenerName " +
+  s"not found on leader ${partition.leader}")
+Errors.LISTENER_NOT_FOUND
+  }
+  new DescribeTopicPartitionsResponsePartition()
+.setErrorCode(error.code)
+.setPartitionIndex(partitionId)
+.setLeaderId(MetadataResponse.NO_LEADER_ID)
+.setLeaderEpoch(partition.leaderEpoch)
+.setReplicaNodes(filteredReplicas)
+.setIsrNodes(filteredIsr)
+.setOfflineReplicas(offlineReplicas)
+case Some(leader) =>
+  val error = if (filteredReplicas.size < 
partition.replicas.length) {
+debug(s"Error while fetching metadata for 
$topicName-$partitionId: replica information not available for " +
+  s"following brokers 
${partition.replicas.filterNot(filteredReplicas.contains).mkString(",")}")
+Errors.REPLICA_NOT_AVAILABLE
+  } else if (filteredIsr.size < partition.isr.length) {
+debug(s"Error while fetching metadata for 
$topicName-$partitionId: in sync replica information not available for " +
+  s"following brokers 
${partition.isr.filterNot(filteredIsr.contains).mkString(",")}")
+Errors.REPLICA_NOT_AVAILABLE
+  } else {
+Errors.NONE
+  }
+
+  new DescribeTopicPartitionsResponsePartition()
+.setErrorCode(error.code)
+.setPartitionIndex(partitionId)
+.setLeaderId(leader.id())
+.setLeaderEpoch(partition.leaderEpoch)
+.setReplicaNodes(filteredReplicas)
+.setIsrNodes(filteredIsr)
+.setOfflineReplicas(offlineReplicas)
+.setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+.setLastKnownELR(Replicas.toList(partition.lastKnownElr))
+  }
+}.toList)
+partitions

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-10 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1448298954


##
clients/src/main/resources/common/message/DescribeTopicPartitionsResponse.json:
##
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 75,
+  "type": "response",
+  "name": "DescribeTopicPartitionsResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", 
"ignorable": true,
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "Topics", "type": "[]DescribeTopicPartitionsResponseTopic", 
"versions": "0+",
+  "about": "Each topic in the response.", "fields": [
+  { "name": "ErrorCode", "type": "int16", "versions": "0+",
+"about": "The topic error, or 0 if there was no error." },
+  { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, 
"entityType": "topicName", "nullableVersions": "0+",
+"about": "The topic name." },
+  { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": 
true, "about": "The topic id." },
+  { "name": "IsInternal", "type": "bool", "versions": "0+", "default": 
"false", "ignorable": true,
+"about": "True if the topic is internal." },
+  { "name": "Partitions", "type": 
"[]DescribeTopicPartitionsResponsePartition", "versions": "0+",
+"about": "Each partition in the topic.", "fields": [
+{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+  "about": "The partition error, or 0 if there was no error." },
+{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
+  "about": "The partition index." },
+{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+  "about": "The ID of the leader broker." },
+{ "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": 
"-1", "ignorable": true,
+  "about": "The leader epoch of this partition." },
+{ "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", 
"entityType": "brokerId",
+  "about": "The set of all nodes that host this partition." },
+{ "name": "IsrNodes", "type": "[]int32", "versions": "0+", 
"entityType": "brokerId",
+  "about": "The set of nodes that are in sync with the leader for this 
partition." },
+{ "name": "EligibleLeaderReplicas", "type": "[]int32", "default": 
"null", "entityType": "brokerId",
+  "versions": "0+", "nullableVersions": "0+",
+  "about": "The new eligible leader replicas otherwise." },
+{ "name": "LastKnownELR", "type": "[]int32", "default": "null", 
"entityType": "brokerId",

Review Comment:
   Corrected the mistake.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-10 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1448298858


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1409,6 +1426,77 @@ class KafkaApis(val requestChannel: RequestChannel,
   ))
   }
 
+  def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): 
Unit = {
+metadataCache match {
+  case _: ZkMetadataCache =>
+throw new InvalidRequestException("ZK cluster does not handle 
DescribeTopicPartitions request")
+  case _ =>
+}
+val KRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache]
+
+val describeTopicPartitionsRequest = 
request.body[DescribeTopicPartitionsRequest].data()
+var topics = scala.collection.mutable.Set[String]()
+describeTopicPartitionsRequest.topics().forEach(topic => 
topics.add(topic.name()))
+
+val cursor = describeTopicPartitionsRequest.cursor()
+val fetchAllTopics = topics.isEmpty
+if (fetchAllTopics) {
+  metadataCache.getAllTopics().foreach(topic => topics.add(topic))

Review Comment:
   Now we loop twice. 
   In the first time, we gather the topics lexicographically larger than the 
cursor.
   In the second time, we iterate the topic list and query for their partitions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: Add DescribeTopics API server side support [kafka]

2024-01-10 Thread via GitHub


CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1448297437


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1409,6 +1426,77 @@ class KafkaApis(val requestChannel: RequestChannel,
   ))
   }
 
+  def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): 
Unit = {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16060) Some questions about tiered storage capabilities

2024-01-10 Thread Jianbin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805362#comment-17805362
 ] 

Jianbin Chen commented on KAFKA-16060:
--

Thank you for your replies.

> Some questions about tiered storage capabilities
> 
>
> Key: KAFKA-16060
> URL: https://issues.apache.org/jira/browse/KAFKA-16060
> Project: Kafka
>  Issue Type: Wish
>  Components: core
>Affects Versions: 3.6.1
>Reporter: Jianbin Chen
>Priority: Major
>
> # If a topic has 3 replicas, when the local expiration time is reached, will 
> all 3 replicas trigger the log transfer to the remote storage, or will only 
> the leader in the isr transfer the log to the remote storage (hdfs, s3)
>  # Topics that do not support compression, do you mean topics that 
> log.cleanup.policy=compact?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Upgrade Gradle wrapper to version 8.4 [kafka]

2024-01-10 Thread via GitHub


github-actions[bot] commented on PR #14502:
URL: https://github.com/apache/kafka/pull/14502#issuecomment-1886169673

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Enable kraft test in kafka.api and kafka.network [kafka]

2024-01-10 Thread via GitHub


dengziming commented on code in PR #14595:
URL: https://github.com/apache/kafka/pull/14595#discussion_r1448252557


##
core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala:
##
@@ -35,31 +37,37 @@ class RackAwareAutoTopicCreationTest extends 
KafkaServerTestHarness with RackAwa
   overridingProps.put(KafkaConfig.NumPartitionsProp, numPartitions.toString)
   overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, 
replicationFactor.toString)
 
+
   def generateConfigs =
 (0 until numServers) map { node =>
-  TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = 
false, rack = Some((node / 2).toString))
+  TestUtils.createBrokerConfig(node, zkConnectOrNull, 
enableControlledShutdown = false, rack = Some((node / 2).toString))
 } map (KafkaConfig.fromProps(_, overridingProps))
 
   private val topic = "topic"
 
-  @Test
-  def testAutoCreateTopic(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk")) // TODO Partition leader is not evenly 
distributed in kraft mode, see KAFKA-15354

Review Comment:
   @mimaison I reverted this change for now since we are moving this to 
KAFKA-15354.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-10 Thread via GitHub


rreddy-22 commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1448205757


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -453,19 +454,31 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 /**
  * Get the Group List.
  *
- * @param statesFilter The states of the groups we want to list.
- * If empty all groups are returned with their state.
- * @param committedOffset A specified committed offset corresponding to 
this shard
+ * @param statesFilter  The states of the groups we want to list.
+ *  If empty, all groups are returned with their 
state.
+ * @param typesFilter   The types of the groups we want to list.
+ *  If empty, all groups are returned with their 
type.
+ * @param committedOffset   A specified committed offset corresponding to 
this shard.
  *
  * @return A list containing the ListGroupsResponseData.ListedGroup
  */
+public List listGroups(
+List statesFilter,
+List typesFilter,

Review Comment:
   From the ListGroupsDataRequest the filters are passed as lists though, did 
you want to just convert them to sets here or throughout the whole pipeline?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-10 Thread via GitHub


rreddy-22 commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1448203801


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -453,19 +454,31 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 /**
  * Get the Group List.
  *
- * @param statesFilter The states of the groups we want to list.
- * If empty all groups are returned with their state.
- * @param committedOffset A specified committed offset corresponding to 
this shard
+ * @param statesFilter  The states of the groups we want to list.
+ *  If empty, all groups are returned with their 
state.
+ * @param typesFilter   The types of the groups we want to list.
+ *  If empty, all groups are returned with their 
type.
+ * @param committedOffset   A specified committed offset corresponding to 
this shard.
  *
  * @return A list containing the ListGroupsResponseData.ListedGroup
  */
+public List listGroups(
+List statesFilter,
+List typesFilter,

Review Comment:
   Sure I can do that, I just left it like how it was before



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1448140837


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1963,6 +2080,75 @@ public void 
testFetchAllOffsetsAtDifferentCommittedOffset() {
 ), context.fetchAllOffsets("group", Long.MAX_VALUE));
 }
 
+@Test
+public void testFetchAllOffsetsWithPendingTransactionalOffsets() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+
+context.commitOffset("group", "foo", 0, 100L, 1);
+context.commitOffset("group", "foo", 1, 110L, 1);
+context.commitOffset("group", "bar", 0, 200L, 1);
+
+context.commit();
+
+assertEquals(3, context.lastWrittenOffset);
+assertEquals(3, context.lastCommittedOffset);
+
+context.commitOffset(10L, "group", "foo", 1, 111L, 1, 
context.time.milliseconds());
+context.commitOffset(10L, "group", "bar", 0, 201L, 1, 
context.time.milliseconds());
+
+// Fetching offsets with "require stable" (Long.MAX_VALUE) should 
return the UNSTABLE_OFFSET_COMMIT
+// errors for foo-1 and bar-0.

Review Comment:
   ditto with the comments for expectations for foo-0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics

2024-01-10 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16113:
---
Parent: KAFKA-14246
Issue Type: Sub-task  (was: Improvement)

> AsyncKafkaConsumer: Add missing offset commit metrics
> -
>
> Key: KAFKA-16113
> URL: https://issues.apache.org/jira/browse/KAFKA-16113
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>
> The following metrics are missing from the AsyncKafkaConsumer:
> commit-latency-avg
> commit-latency-max
> commit-rate
> commit-total
> committed-time-ns-total



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics

2024-01-10 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16113:
--

 Summary: AsyncKafkaConsumer: Add missing offset commit metrics
 Key: KAFKA-16113
 URL: https://issues.apache.org/jira/browse/KAFKA-16113
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


The following metrics are missing from the AsyncKafkaConsumer:

commit-latency-avg
commit-latency-max
commit-rate
commit-total
committed-time-ns-total



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1448140646


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1963,6 +2080,75 @@ public void 
testFetchAllOffsetsAtDifferentCommittedOffset() {
 ), context.fetchAllOffsets("group", Long.MAX_VALUE));
 }
 
+@Test
+public void testFetchAllOffsetsWithPendingTransactionalOffsets() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+
+context.commitOffset("group", "foo", 0, 100L, 1);
+context.commitOffset("group", "foo", 1, 110L, 1);
+context.commitOffset("group", "bar", 0, 200L, 1);
+
+context.commit();
+
+assertEquals(3, context.lastWrittenOffset);
+assertEquals(3, context.lastCommittedOffset);
+
+context.commitOffset(10L, "group", "foo", 1, 111L, 1, 
context.time.milliseconds());
+context.commitOffset(10L, "group", "bar", 0, 201L, 1, 
context.time.milliseconds());

Review Comment:
   should we have bar-1 again to test that case too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1448139696


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1856,6 +1888,91 @@ public void testFetchOffsetsAtDifferentCommittedOffset() 
{
 ), context.fetchOffsets("group", request, Long.MAX_VALUE));
 }
 
+@Test
+public void testFetchOffsetsWithPendingTransactionalOffsets() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+
+context.commitOffset("group", "foo", 0, 100L, 1);
+context.commitOffset("group", "foo", 1, 110L, 1);
+context.commitOffset("group", "bar", 0, 200L, 1);
+
+context.commit();
+
+assertEquals(3, context.lastWrittenOffset);
+assertEquals(3, context.lastCommittedOffset);
+
+context.commitOffset(10L, "group", "foo", 1, 111L, 1, 
context.time.milliseconds());
+context.commitOffset(10L, "group", "bar", 0, 201L, 1, 
context.time.milliseconds());
+// Note that bar-1 does not exist in the initial commits. 
UNSTABLE_OFFSET_COMMIT errors
+// must be returned in this case too.
+context.commitOffset(10L, "group", "bar", 1, 211L, 1, 
context.time.milliseconds());
+
+// Always use the same request.
+List request = 
Arrays.asList(
+new OffsetFetchRequestData.OffsetFetchRequestTopics()
+.setName("foo")
+.setPartitionIndexes(Arrays.asList(0, 1)),
+new OffsetFetchRequestData.OffsetFetchRequestTopics()
+.setName("bar")
+.setPartitionIndexes(Arrays.asList(0, 1))
+);
+
+// Fetching offsets with "require stable" (Long.MAX_VALUE) should 
return the UNSTABLE_OFFSET_COMMIT
+// errors for foo-1, bar-0 and bar-1.

Review Comment:
   should we mention foo-0 will be returned in this comment and also the 
comment below?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1448132968


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -195,6 +196,11 @@ public OffsetMetadataManager build() {
  */
 private final TimelineHashMap pendingTransactionalOffsets;
 
+/**
+ * The open transactions (producer ids) keyed by group.
+ */
+private final TimelineHashMap> 
openTransactionsByGroup;

Review Comment:
   Ah its the snapshot registry and revertLastWrittenOffset method that does 
this. I had forgotten. But makes sense now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


artemlivshits commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r144830


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   My understanding is that by the time we come to this function it already 
runs on a GC thread (it has to happen because that's how we guarantee the 
atomicity), so there is no request local anyway here and we must use NoCaching 
(it is always safe to use NoCaching, just not as optimal as thread local).
   
   The future's callback will be called in the thread that completed it, but 
like I said, it doesn't matter for this function as it gets rescheduled on the 
GC thread pool.
   
   I gave a suggestion to hoist wrapping to the caller, so new GC doesn't have 
to do double-schedule 
https://github.com/apache/kafka/pull/14774#discussion_r1420931474.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15561) Client support for new SubscriptionPattern based subscription

2024-01-10 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805327#comment-17805327
 ] 

Phuc Hong Tran commented on KAFKA-15561:


I see. I was wondering this since this ticket requires me to make change to 
ConsumerGroupHeartbeatRequestData's schema, so I wanted to see if I need to 
make any change anywhere else to ensure that records with new format can be 
read correctly. Thanks [~lianetm].

> Client support for new SubscriptionPattern based subscription
> -
>
> Key: KAFKA-15561
> URL: https://issues.apache.org/jira/browse/KAFKA-15561
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> New consumer should support subscribe with the new SubscriptionPattern 
> introduced in the new consumer group protocol. When subscribing with this 
> regex, the client should provide the regex in the HB request on the 
> SubscribedTopicRegex field, delegating the resolution to the server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15538) Client support for java regex based subscription

2024-01-10 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805326#comment-17805326
 ] 

Phuc Hong Tran commented on KAFKA-15538:


I understand, will be more careful when readling ticket from now on. Thanks 
[~lianetm] 

> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848, kip-848-client-support
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-10 Thread via GitHub


rreddy-22 commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1448101818


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -9583,24 +9584,24 @@ public void 
testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep
 @Test
 public void testListGroups() {

Review Comment:
   I think it's there, did you want a different test by any chance?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-10 Thread via GitHub


rreddy-22 commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1448098432


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##
@@ -1105,16 +1105,18 @@ private[group] class GroupCoordinator(
 }
   }
 
-  def handleListGroups(states: Set[String]): (Errors, List[GroupOverview]) = {
+  def handleListGroups(states: Set[String], groupTypes: Set[String]): (Errors, 
List[GroupOverview]) = {
 if (!isActive.get) {
   (Errors.COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
 } else {
   val errorCode = if (groupManager.isLoading) 
Errors.COORDINATOR_LOAD_IN_PROGRESS else Errors.NONE
-  // if states is empty, return all groups
-  val groups = if (states.isEmpty)
-groupManager.currentGroups
-  else
-groupManager.currentGroups.filter(g => 
states.contains(g.summary.state))
+  // Filter groups based on states and groupTypes. If either is empty, it 
won't filter on that criterion.
+  // If groupType is mentioned then no group is returned since the notion 
of groupTypes doesn't exist in the
+  // old group coordinator.
+  val groups = groupManager.currentGroups.filter { g =>
+(states.isEmpty || states.contains(g.summary.state)) &&
+  (groupTypes.isEmpty || groupTypes.contains("classic"))

Review Comment:
   Yes okie I'll do that thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move ProcessRole to server module [kafka]

2024-01-10 Thread via GitHub


ijuma commented on PR #15166:
URL: https://github.com/apache/kafka/pull/15166#issuecomment-1885894231

   I re-ran the tests and failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move ProcessRole to server module [kafka]

2024-01-10 Thread via GitHub


ijuma merged PR #15166:
URL: https://github.com/apache/kafka/pull/15166


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-10 Thread via GitHub


rreddy-22 commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1448087895


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -453,19 +454,31 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 /**
  * Get the Group List.
  *
- * @param statesFilter The states of the groups we want to list.
- * If empty all groups are returned with their state.
- * @param committedOffset A specified committed offset corresponding to 
this shard
+ * @param statesFilter  The states of the groups we want to list.
+ *  If empty, all groups are returned with their 
state.
+ * @param typesFilter   The types of the groups we want to list.
+ *  If empty, all groups are returned with their 
type.
+ * @param committedOffset   A specified committed offset corresponding to 
this shard.
  *
  * @return A list containing the ListGroupsResponseData.ListedGroup
  */
+public List listGroups(
+List statesFilter,
+List typesFilter,
+long committedOffset
+) {
+Predicate combinedFilter = group -> {
+boolean stateCheck = statesFilter.isEmpty() || 
statesFilter.contains(group.stateAsString(committedOffset));
+boolean typeCheck = typesFilter.isEmpty() || 
typesFilter.contains(group.type().toString());

Review Comment:
   I decided to convert the the arg to lower case in the handle list groups 
request itself, so we wouldn't need to check for case in any of the API portions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-10 Thread via GitHub


rreddy-22 commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1448087379


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -453,19 +454,31 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 /**
  * Get the Group List.
  *
- * @param statesFilter The states of the groups we want to list.
- * If empty all groups are returned with their state.
- * @param committedOffset A specified committed offset corresponding to 
this shard
+ * @param statesFilter  The states of the groups we want to list.
+ *  If empty, all groups are returned with their 
state.
+ * @param typesFilter   The types of the groups we want to list.
+ *  If empty, all groups are returned with their 
type.
+ * @param committedOffset   A specified committed offset corresponding to 
this shard.
  *
  * @return A list containing the ListGroupsResponseData.ListedGroup
  */
+public List listGroups(
+List statesFilter,
+List typesFilter,
+long committedOffset
+) {
+Predicate combinedFilter = group -> {
+boolean stateCheck = statesFilter.isEmpty() || 
statesFilter.contains(group.stateAsString(committedOffset));
+boolean typeCheck = typesFilter.isEmpty() || 
typesFilter.contains(group.type().toString());

Review Comment:
   I didn't implement it by then, was still waiting for confirmation from 
today's meeting, It is there now!



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -453,19 +454,31 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 /**
  * Get the Group List.
  *
- * @param statesFilter The states of the groups we want to list.
- * If empty all groups are returned with their state.
- * @param committedOffset A specified committed offset corresponding to 
this shard
+ * @param statesFilter  The states of the groups we want to list.
+ *  If empty, all groups are returned with their 
state.
+ * @param typesFilter   The types of the groups we want to list.
+ *  If empty, all groups are returned with their 
type.
+ * @param committedOffset   A specified committed offset corresponding to 
this shard.
  *
  * @return A list containing the ListGroupsResponseData.ListedGroup
  */
+public List listGroups(
+List statesFilter,
+List typesFilter,
+long committedOffset
+) {
+Predicate combinedFilter = group -> {
+boolean stateCheck = statesFilter.isEmpty() || 
statesFilter.contains(group.stateAsString(committedOffset));
+boolean typeCheck = typesFilter.isEmpty() || 
typesFilter.contains(group.type().toString());

Review Comment:
   I didn't implement it by then, was still waiting for confirmation from 
today's meeting, It is there now!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14822: Allow restricting File and Directory ConfigProviders to specific paths [kafka]

2024-01-10 Thread via GitHub


gharris1727 commented on code in PR #14995:
URL: https://github.com/apache/kafka/pull/14995#discussion_r1448064344


##
clients/src/main/java/org/apache/kafka/common/config/internals/AllowedPaths.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.internals;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class AllowedPaths {
+private List allowedPaths;
+
+private AllowedPaths(List allowedPaths) {
+this.allowedPaths = allowedPaths;
+}
+
+/**
+ * Constructs AllowedPaths with a list of Paths retrieved from {@code 
configValue}.
+ * @param configValue {@code allowed.paths} config value which is a string 
containing comma separated list of paths
+ * @return AllowedPaths with a list of Paths or null list if the {@code 
configValue} is null or empty string.
+ */
+public static AllowedPaths configureAllowedPaths(String configValue) {
+if (configValue != null && !configValue.isEmpty()) {
+List allowedPaths = new ArrayList<>();
+
+Arrays.stream(configValue.split(",")).forEach(b -> {
+Path normalisedPath = Paths.get(b).normalize();
+
+if (normalisedPath.isAbsolute() && 
Files.exists(normalisedPath)) {
+allowedPaths.add(normalisedPath);
+} else {
+throw new ConfigException("Path " + normalisedPath + " is 
not valid. The path should be absolute and exist");
+}
+});
+
+return new AllowedPaths(allowedPaths);
+}
+
+return new AllowedPaths(null);
+}
+
+/**
+ * Checks if the given {@code path} resides in the configured {@code 
allowed.paths}.
+ * If {@code allowed.paths} is not configured, the given Path is returned 
as allowed.
+ * @param path the Path to check if allowed
+ * @return Path that can be accessed or null if the given Path does not 
reside in the configured {@code allowed.paths}.
+ */
+public Path getIfPathIsAllowed(Path path) {

Review Comment:
   This name sounds like it should return a boolean. Also I noticed that the 
call-sites duplicate the Path.get().
   
   ```suggestion
   public Path parseUntrustedPath(String untrustedPath) {
   ```



##
clients/src/main/java/org/apache/kafka/common/config/internals/AllowedPaths.java:
##
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.internals;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class AllowedPaths {
+private List allowedPaths;
+
+private AllowedPaths(List allowedPaths) {
+this.allowedPaths = allowedPaths;
+}
+
+/**
+ * Constructs AllowedPaths with a list of Paths retrieved from {@code 
configValue}.
+ * @param configValue {@code allowed.paths} config value which is a string 
containing comma separated list of paths
+ * @return AllowedPaths with a list of Paths or null list if the {@code 
configValue} is null or empty string.

Review Comment:
   

Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1448027564


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   I guess we just complete the future in the callback and do the write in the 
thenCompose. That made me realize I don't know everything about how futures are 
scheduled, but looking at the docs I think it is safe to say this is correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15351: Ensure log-start-offset not updated to local-log-start-offset when remote storage enabled [kafka]

2024-01-10 Thread via GitHub


junrao commented on code in PR #14301:
URL: https://github.com/apache/kafka/pull/14301#discussion_r1448007858


##
core/src/main/scala/kafka/log/LogLoader.scala:
##
@@ -78,7 +78,8 @@ class LogLoader(
   recoveryPointCheckpoint: Long,
   leaderEpochCache: Option[LeaderEpochFileCache],
   producerStateManager: ProducerStateManager,
-  numRemainingSegments: ConcurrentMap[String, Int] = new 
ConcurrentHashMap[String, Int]
+  numRemainingSegments: ConcurrentMap[String, Int] = new 
ConcurrentHashMap[String, Int],
+  isRemoteLogEnabled: Boolean = false,

Review Comment:
   Could we add the new param to the javadoc?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447988893


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   Ok -- sorry I just meant I didn't know if it was passed through a callback 
(or future in this case) or if it was only invoked in the callback. For 
example, there were issues in the past since we used the buffer defined for the 
callback on a new handler thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer

2024-01-10 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805289#comment-17805289
 ] 

Lianet Magrans commented on KAFKA-15475:


Regarding your previous question about the retriable behaviour, short answer 
would be that yes, we do want to retry internally, but it depends on the 
request. Ex. TopicMetadata requests are retried internally by the manager 
whenever they fail on a Retriable error, also sync offset Commit and 
offsetFetch. But, the CommitRequestManager also supports asyn commits, where we 
do not want to retry. Probably the commitRequestManager is the more complex one 
in that sense, as there are many different ways of committing, and they have 
different retry expectations, other managers might be simpler, like the 
TopicMetadata one. 

> Timeout request might retry forever even if the user API times out in 
> PrototypeAsyncConsumer
> 
>
> Key: KAFKA-15475
> URL: https://issues.apache.org/jira/browse/KAFKA-15475
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.8.0
>
>
> If the request timeout in the background thread, it will be completed with 
> TimeoutException, which is Retriable.  In the TopicMetadataRequestManager and 
> possibly other managers, the request might continue to be retried forever.
>  
> There are two ways to fix this
>  # Pass a timer to the manager to remove the inflight requests when it is 
> expired.
>  # Pass the future to the application layer and continue to retry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16033) Review client retry logic of OffsetFetch and OffsetCommit responses

2024-01-10 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16033:
---
Description: 
The retry logic for OffsetFetch and OffsetCommit requests lives in the 
CommitRequestManager, and applies to requests issued from multiple components 
(AsyncKakfaConsumer for commitSync and commitAsync, CommitRequestManager for 
the regular auto-commits, MembershipManager for auto-commits before rebalance, 
auto-commit before closing consumer). While this approach helps to avoid having 
the retry logic in each caller, currently the CommitManager has it in different 
places and it ends up being rather hard to follow.

This task aims at reviewing the retry logic from a high level perspective 
(multiple callers, with retry needs that have similarities and differences at 
the same time). So the review should asses the similarities vs differences, and 
then consider two options:
1. Keep retry logic centralized in the CommitManager, but fixed in a more 
consistent way, applied the same way for all requests, depending on the 
intention expressed by the caller. Advantages of this approach (current 
approach + improvement) is that callers that require the same retry logic could 
reuse if, keeping it in a single place (ex. commitSync from the consumer 
retries in the same way as the auto-commit before rebalance). 
2. move retry logic to the caller. This aligns with the way it was done on the 
legacy coordinator, but the main challenge seems to be not duplicating the 
retry logic in callers that require the same.
This task will also review what exceptions are indeed retried on the 
OffsetCommit and OffsetFetch, considering that the legacy implementation only 
retries on some expected Retriable errors (not all)

  was:
The retry logic for OffsetFetch and OffsetCommit requests lives in the 
CommitRequestManager, and applies to requests issued from multiple components 
(AsyncKakfaConsumer for commitSync and commitAsync, CommitRequestManager for 
the regular auto-commits, MembershipManager for auto-commits before rebalance, 
auto-commit before closing consumer). While this approach helps to avoid having 
the retry logic in each caller, currently the CommitManager has it in different 
places and it ends up being rather hard to follow.

This task aims at reviewing the retry logic from a high level perspective 
(multiple callers, with retry needs that have similarities and differences at 
the same time). So the review should asses the similarities vs differences, and 
then consider two options:
1. Keep retry logic centralized in the CommitManager, but fixed in a more 
consistent way, applied the same way for all requests, depending on the 
intention expressed by the caller. Advantages of this approach (current 
approach + improvement) is that callers that require the same retry logic could 
reuse if, keeping it in a single place (ex. commitSync from the consumer 
retries in the same way as the auto-commit before rebalance). 
2. move retry logic to the caller. This aligns with the way it was done on the 
legacy coordinator, but the main challenge seems to be not duplicating the 
retry logic in callers that require the same.


> Review client retry logic of OffsetFetch and OffsetCommit responses
> ---
>
> Key: KAFKA-16033
> URL: https://issues.apache.org/jira/browse/KAFKA-16033
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> The retry logic for OffsetFetch and OffsetCommit requests lives in the 
> CommitRequestManager, and applies to requests issued from multiple components 
> (AsyncKakfaConsumer for commitSync and commitAsync, CommitRequestManager for 
> the regular auto-commits, MembershipManager for auto-commits before 
> rebalance, auto-commit before closing consumer). While this approach helps to 
> avoid having the retry logic in each caller, currently the CommitManager has 
> it in different places and it ends up being rather hard to follow.
> This task aims at reviewing the retry logic from a high level perspective 
> (multiple callers, with retry needs that have similarities and differences at 
> the same time). So the review should asses the similarities vs differences, 
> and then consider two options:
> 1. Keep retry logic centralized in the CommitManager, but fixed in a more 
> consistent way, applied the same way for all requests, depending on the 
> intention expressed by the caller. Advantages of this approach (current 
> approach + improvement) is that callers that require the same retry logic 
> could reuse if, keeping it in a single place (ex. commitSync from 

[jira] [Commented] (KAFKA-15475) Timeout request might retry forever even if the user API times out in PrototypeAsyncConsumer

2024-01-10 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805287#comment-17805287
 ] 

Lianet Magrans commented on KAFKA-15475:


Sure, TopicMetadataManager 
[here|https://github.com/apache/kafka/blob/fbbfafe1f556f424bf511697db6f399e5a622aa3/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java#L212]
 and CommitRequestManager 
[here|https://github.com/apache/kafka/blob/fbbfafe1f556f424bf511697db6f399e5a622aa3/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L561].
 Both include similar logic to make sure that requests are retried but within 
the timeout boundaries.


> Timeout request might retry forever even if the user API times out in 
> PrototypeAsyncConsumer
> 
>
> Key: KAFKA-15475
> URL: https://issues.apache.org/jira/browse/KAFKA-15475
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.8.0
>
>
> If the request timeout in the background thread, it will be completed with 
> TimeoutException, which is Retriable.  In the TopicMetadataRequestManager and 
> possibly other managers, the request might continue to be retried forever.
>  
> There are two ways to fix this
>  # Pass a timer to the manager to remove the inflight requests when it is 
> expired.
>  # Pass the future to the application layer and continue to retry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition

2024-01-10 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805286#comment-17805286
 ] 

Stanislav Kozlovski commented on KAFKA-16082:
-

Deeming this not a blocker as per discussions with [~pprovenzano] 

 

> JBOD: Possible dataloss when moving leader partition
> 
>
> Key: KAFKA-16082
> URL: https://issues.apache.org/jira/browse/KAFKA-16082
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Proven Provenzano
>Assignee: Gaurav Narula
>Priority: Critical
> Fix For: 3.7.1
>
>
> There is a possible dataloss scenario
> when using JBOD,
> when moving the partition leader log from one directory to another on the 
> same broker,
> when after the destination log has caught up to the source log and after the 
> broker has sent an update to the partition assignment
> if the broker accepts and commits a new record for the partition and then the 
> broker restarts and the original partition leader log is lost
> then the destination log would not contain the new record.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition

2024-01-10 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-16082:

Priority: Critical  (was: Blocker)

> JBOD: Possible dataloss when moving leader partition
> 
>
> Key: KAFKA-16082
> URL: https://issues.apache.org/jira/browse/KAFKA-16082
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Proven Provenzano
>Assignee: Gaurav Narula
>Priority: Critical
> Fix For: 3.7.1
>
>
> There is a possible dataloss scenario
> when using JBOD,
> when moving the partition leader log from one directory to another on the 
> same broker,
> when after the destination log has caught up to the source log and after the 
> broker has sent an update to the partition assignment
> if the broker accepts and commits a new record for the partition and then the 
> broker restarts and the original partition leader log is lost
> then the destination log would not contain the new record.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16082) JBOD: Possible dataloss when moving leader partition

2024-01-10 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-16082:

Fix Version/s: 3.7.1
   (was: 3.7.0)

> JBOD: Possible dataloss when moving leader partition
> 
>
> Key: KAFKA-16082
> URL: https://issues.apache.org/jira/browse/KAFKA-16082
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Proven Provenzano
>Assignee: Gaurav Narula
>Priority: Blocker
> Fix For: 3.7.1
>
>
> There is a possible dataloss scenario
> when using JBOD,
> when moving the partition leader log from one directory to another on the 
> same broker,
> when after the destination log has caught up to the source log and after the 
> broker has sent an update to the partition assignment
> if the broker accepts and commits a new record for the partition and then the 
> broker restarts and the original partition leader log is lost
> then the destination log would not contain the new record.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: log new coordinator partition load schedule time [kafka]

2024-01-10 Thread via GitHub


dajac commented on code in PR #15017:
URL: https://github.com/apache/kafka/pull/15017#discussion_r1447922892


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java:
##
@@ -53,12 +53,16 @@ public short unknownType() {
 class LoadSummary {
 private final long startTimeMs;
 private final long endTimeMs;
+private final long totalTimeMs;

Review Comment:
   yeah, i would rather use a log message similar to the one you removed 
instead of just toString’ing the summary. it will look better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: log new coordinator partition load schedule time [kafka]

2024-01-10 Thread via GitHub


dajac commented on code in PR #15017:
URL: https://github.com/apache/kafka/pull/15017#discussion_r1447921933


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java:
##
@@ -53,12 +53,16 @@ public short unknownType() {
 class LoadSummary {
 private final long startTimeMs;
 private final long endTimeMs;
+private final long totalTimeMs;
+private final long schedulerTimeMs;

Review Comment:
   that works.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


dajac commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447918075


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,57 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception 
if the

Review Comment:
   sorry, missed it. i think that i will just replace those docs with links to 
the interface doc in order to avoid this in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


dajac commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447916841


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   Yes, we reschedule to request thread to only push an event to the 
coordinator thread. It is not optimal. The request local is not used at all as 
the writer uses its own buffers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-10 Thread via GitHub


dajac commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1447913870


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -195,6 +196,11 @@ public OffsetMetadataManager build() {
  */
 private final TimelineHashMap pendingTransactionalOffsets;
 
+/**
+ * The open transactions (producer ids) keyed by group.
+ */
+private final TimelineHashMap> 
openTransactionsByGroup;

Review Comment:
   yeah, it is implicit. the write operation (the transactional offset commit) 
could for instance fail. the state is rolled back in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1447907424


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -195,6 +196,11 @@ public OffsetMetadataManager build() {
  */
 private final TimelineHashMap pendingTransactionalOffsets;
 
+/**
+ * The open transactions (producer ids) keyed by group.
+ */
+private final TimelineHashMap> 
openTransactionsByGroup;

Review Comment:
   Sorry -- I left a lot of words here. Is there a time where we take advantage 
of the data structure? Ie using the epoch/rolling back? I didn't see any in 
this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447883700


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,57 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception 
if the

Review Comment:
   nit: did we want to update this comment to be consistent with the 
PartitionWriter one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447880085


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   If it is the case its only an optimization question rather than a 
correctness one, I am good to tackle in a followup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447879037


##
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##
@@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T](
 ))
   }
 
+  /**
+   * Verify the transaction.
+   *
+   * @param tp  The partition to write records to.
+   * @param transactionalId The transactional id.
+   * @param producerId  The producer id.
+   * @param producerEpoch   The producer epoch.
+   * @return A future containing the {@link VerificationGuard} or an exception.
+   * @throws KafkaException Any KafkaException caught during the operation.
+   */
+  override def maybeStartTransactionVerification(
+tp: TopicPartition,
+transactionalId: String,
+producerId: Long,
+producerEpoch: Short
+  ): CompletableFuture[VerificationGuard] = {
+val future = new CompletableFuture[VerificationGuard]()
+replicaManager.maybeStartTransactionVerificationForPartition(
+  topicPartition = tp,
+  transactionalId = transactionalId,
+  producerId = producerId,
+  producerEpoch = producerEpoch,
+  baseSequence = RecordBatch.NO_SEQUENCE,
+  requestLocal = RequestLocal.NoCaching,
+  callback = (error, _, verificationGuard) => {
+if (error != Errors.NONE) {
+  future.completeExceptionally(error.exception)
+} else {
+  future.complete(verificationGuard)
+}
+  }
+)
+future
+  }
+
   private def internalAppend(
 tp: TopicPartition,
-memoryRecords: MemoryRecords
+memoryRecords: MemoryRecords,
+verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
   ): Long = {
 var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
-replicaManager.appendRecords(
+replicaManager.appendForGroup(
   timeout = 0L,
   requiredAcks = 1,
-  internalTopicsAllowed = true,
-  origin = AppendOrigin.COORDINATOR,
   entriesPerPartition = Map(tp -> memoryRecords),
   responseCallback = results => appendResults = results,
+  requestLocal = RequestLocal.NoCaching,

Review Comment:
   Sorry -- you are correct. I was confused because we have a check for if we 
execute on the same request thread -- there we don't reschedule/wrap the 
request. (This was added when we error before we send the request to the txn 
coordinator)
   
   Just for my understanding, right now we reschedule to the request thread, 
and that works fine -- the only concern is not using the allocated coordinator 
threads and taking up space on the request handler threads?
   
   Just triple checking we won't run into an issue with request locals if the 
callback expects the buffer supplier from the coordinator thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [4/N] Wire transaction verification [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15142:
URL: https://github.com/apache/kafka/pull/15142#discussion_r1447873603


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1106,6 +1106,8 @@ private static boolean isGroupIdNotEmpty(String groupId) {
  * @return The Errors instance associated with the given exception.
  */
 private static Errors normalizeException(Throwable exception) {
+exception = Errors.maybeUnwrapException(exception);

Review Comment:
   I see. It was too elegant before  Thanks for looking into it and fixing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16110) Implement consumer performance tests

2024-01-10 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805272#comment-17805272
 ] 

Philip Nee commented on KAFKA-16110:


My proposal here is

- Let's run trogdor to see what can we get out of it. If the current settings 
is not satisfied then we can add more "specs" to the repo and see if we can get 
to the point we want.

- We also might want to monitor the performance of the head of trunk as we are 
putting code in.  I wonder if it is easy to achieve using trogdor.

> Implement consumer performance tests
> 
>
> Key: KAFKA-16110
> URL: https://issues.apache.org/jira/browse/KAFKA-16110
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16110) Implement consumer performance tests

2024-01-10 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805271#comment-17805271
 ] 

Philip Nee commented on KAFKA-16110:


Hi [~kirktrue] - Thanks for filing this JIRA.  There are two paths forward for 
the performance testing.  One is using trogdor - which does more than 
performance testing but also allow us to test different fault scenarios.  
Second is implementing our own benchmarking test, I started working on it some 
time ago but I left it in a limbo state given trogdor does a lot of it already. 
  WDTY?

 

FWIW: this is the repo: https://github.com/philipnee/kafka-benchmarker

> Implement consumer performance tests
> 
>
> Key: KAFKA-16110
> URL: https://issues.apache.org/jira/browse/KAFKA-16110
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16112) Review JMX metrics in Async Consumer and determine the missing ones

2024-01-10 Thread Kirk True (Jira)
Kirk True created KAFKA-16112:
-

 Summary: Review JMX metrics in Async Consumer and determine the 
missing ones
 Key: KAFKA-16112
 URL: https://issues.apache.org/jira/browse/KAFKA-16112
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Kirk True
Assignee: Philip Nee
 Fix For: 3.8.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16111) Implement tests for tricky rebalance callbacks scenarios

2024-01-10 Thread Kirk True (Jira)
Kirk True created KAFKA-16111:
-

 Summary: Implement tests for tricky rebalance callbacks scenarios
 Key: KAFKA-16111
 URL: https://issues.apache.org/jira/browse/KAFKA-16111
 Project: Kafka
  Issue Type: Test
  Components: clients, consumer
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16110) Implement consumer performance tests

2024-01-10 Thread Kirk True (Jira)
Kirk True created KAFKA-16110:
-

 Summary: Implement consumer performance tests
 Key: KAFKA-16110
 URL: https://issues.apache.org/jira/browse/KAFKA-16110
 Project: Kafka
  Issue Type: New Feature
  Components: clients, consumer
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16109) Ensure system tests cover the "simple consumer + commit" use case

2024-01-10 Thread Kirk True (Jira)
Kirk True created KAFKA-16109:
-

 Summary: Ensure system tests cover the "simple consumer + commit" 
use case
 Key: KAFKA-16109
 URL: https://issues.apache.org/jira/browse/KAFKA-16109
 Project: Kafka
  Issue Type: Improvement
  Components: clients, consumer, system tests
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16107) Ensure consumer does not start fetching from added partitions until onPartitionsAssigned completes

2024-01-10 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16107:
--
Fix Version/s: 3.8.0

> Ensure consumer does not start fetching from added partitions until 
> onPartitionsAssigned completes
> --
>
> Key: KAFKA-16107
> URL: https://issues.apache.org/jira/browse/KAFKA-16107
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> In the new consumer implementation, when new partitions are assigned, the 
> subscription state is updated and then the #onPartitionsAssigned triggered. 
> This sequence seems sensible but we need to ensure that no data is fetched 
> until the onPartitionsAssigned completes (where the user could be setting the 
> committed offsets it want to start fetching from).
> We should pause the partitions newly added partitions until 
> onPartitionsAssigned completes, similar to how it's done on revocation to 
> avoid positions getting ahead of the committed offsets.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]

2024-01-10 Thread via GitHub


wernerdv commented on PR #15101:
URL: https://github.com/apache/kafka/pull/15101#issuecomment-1885412204

   @divijvaidya hello, I rerun the tests after merging #15093 and #15077.
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15101/10/testReport/
   
   There are failed tests from the `org.apache.kafka.streams.integration` 
package that have unexpected treads e.g. `kafka-scheduler`, 
`Controller-0-to-broker-0-send-thread`, `ReplicaFetcherThread`.
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15101/10/testReport/org.apache.kafka.streams.integration/
   
   It's also worth looking at the test results:
   
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15101/10/testReport/kafka.utils/
   
   The names of the expected threads are similar to 
https://github.com/apache/kafka/pull/15052/files#diff-b8f9f9d1b191457cbdb332a3429f0ad65b50fa4cef5af8562abcfd1f177a2cfeR2441
   
   Is there anything I can do to improve my PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]

2024-01-10 Thread via GitHub


ijuma commented on code in PR #15158:
URL: https://github.com/apache/kafka/pull/15158#discussion_r1447757987


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+public class TransactionLogConfig {

Review Comment:
   Yeah, I was using the maven name. I agree that we shouldn't add a dependency 
unless it exists.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16106) group size counters do not reflect the actual sizes when operations fail

2024-01-10 Thread Jeff Kim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Kim updated KAFKA-16106:
-
Description: 
An expire-group-metadata operation generates tombstone records, updates the 
`groups` state and decrements group size counters, then performs a write to the 
log. If there is a __consumer_offsets partition reassignment, this operation 
fails. The `groups` state is reverted to an earlier snapshot but classic group 
size counters are not. This begins an inconsistency between the metrics and the 
actual groups size. This applies to all unsuccessful write operations that 
alter the `groups` state.

 

The issue is exacerbated because the expire group metadata operation can be 
retried multiple times until the partition is fully unloaded.

 

The solution to this is to make the counters also a timeline data structure 
(TimelineLong) so that in the event of a failed write operation we revert the 
counters as well.

  was:
An expire-group-metadata operation generates tombstone records, updates the 
`groups` state and decrements group size counters, then performs a write to the 
log. If there is a __consumer_offsets partition reassignment, this operation 
fails. The `groups` state is reverted to an earlier snapshot but classic group 
size counters are not. This begins an inconsistency between the metrics and the 
actual groups size. This applies to all unsuccessful write operations that 
alter the `groups` state.

 

The issue is exacerbated because the expire group metadata operation is retried 
possibly indefinitely.

 

The solution to this is to make the counters also a timeline data structure 
(TimelineLong) so that in the event of a failed write operation we revert the 
counters as well.


> group size counters do not reflect the actual sizes when operations fail
> 
>
> Key: KAFKA-16106
> URL: https://issues.apache.org/jira/browse/KAFKA-16106
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Major
>
> An expire-group-metadata operation generates tombstone records, updates the 
> `groups` state and decrements group size counters, then performs a write to 
> the log. If there is a __consumer_offsets partition reassignment, this 
> operation fails. The `groups` state is reverted to an earlier snapshot but 
> classic group size counters are not. This begins an inconsistency between the 
> metrics and the actual groups size. This applies to all unsuccessful write 
> operations that alter the `groups` state.
>  
> The issue is exacerbated because the expire group metadata operation can be 
> retried multiple times until the partition is fully unloaded.
>  
> The solution to this is to make the counters also a timeline data structure 
> (TimelineLong) so that in the event of a failed write operation we revert the 
> counters as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: log new coordinator partition load schedule time [kafka]

2024-01-10 Thread via GitHub


jeffkbkim commented on code in PR #15017:
URL: https://github.com/apache/kafka/pull/15017#discussion_r1447748531


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java:
##
@@ -53,12 +53,16 @@ public short unknownType() {
 class LoadSummary {
 private final long startTimeMs;
 private final long endTimeMs;
+private final long totalTimeMs;
+private final long schedulerTimeMs;

Review Comment:
   how's schedulerQueueTimeMs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: log new coordinator partition load schedule time [kafka]

2024-01-10 Thread via GitHub


jeffkbkim commented on code in PR #15017:
URL: https://github.com/apache/kafka/pull/15017#discussion_r1447744934


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java:
##
@@ -53,12 +53,16 @@ public short unknownType() {
 class LoadSummary {
 private final long startTimeMs;
 private final long endTimeMs;
+private final long totalTimeMs;

Review Comment:
   This simplifies the log in CoordinatorRuntime
   ```
   log.info("Finished loading of metadata from {} with epoch {} and 
LoadSummary={}.",
   tp, partitionEpoch, summary
   );
   ```
   
   this simplifies the log format



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: fix custom retry backoff in new group coordinator [kafka]

2024-01-10 Thread via GitHub


jeffkbkim opened a new pull request, #15170:
URL: https://github.com/apache/kafka/pull/15170

   When a retryable write operation fails, we retry with the default 500ms 
backoff. If a custom retry backoff was used to originally schedule the 
operation, we should retry with the same custom backoff instead of the default.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16098) State updater may attempt to resume a task that is not assigned anymore

2024-01-10 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-16098.
---
Resolution: Fixed

> State updater may attempt to resume a task that is not assigned anymore
> ---
>
> Key: KAFKA-16098
> URL: https://issues.apache.org/jira/browse/KAFKA-16098
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Bruno Cadonna
>Priority: Major
> Attachments: streams.log.gz
>
>
> A long-running soak test brought to light this `IllegalStateException`:
> {code:java}
> [2024-01-07 08:54:13,688] ERROR [i-0637ca8609f50425f-StreamThread-1] Thread 
> encountered an error processing soak test 
> (org.apache.kafka.streams.StreamsSoakTest)
> java.lang.IllegalStateException: No current assignment for partition 
> network-id-repartition-1
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367)
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resume(SubscriptionState.java:753)
>     at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.resume(LegacyKafkaConsumer.java:963)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1524)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.transitRestoredTaskToRunning(TaskManager.java:857)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRestoredTasksFromStateUpdater(TaskManager.java:979)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:791)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1141)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:949)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
> [2024-01-07 08:54:13,688] ERROR [i-0637ca8609f50425f-StreamThread-1] 
> stream-client [i-0637ca8609f50425f] Encountered the following exception 
> during processing and sent shutdown request for the entire application. 
> (org.apache.kafka.streams.KafkaStreams)
> org.apache.kafka.streams.errors.StreamsException: 
> java.lang.IllegalStateException: No current assignment for partition 
> network-id-repartition-1
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
> Caused by: java.lang.IllegalStateException: No current assignment for 
> partition network-id-repartition-1
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367)
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resume(SubscriptionState.java:753)
>     at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.resume(LegacyKafkaConsumer.java:963)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1524)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.transitRestoredTaskToRunning(TaskManager.java:857)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRestoredTasksFromStateUpdater(TaskManager.java:979)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:791)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1141)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:949)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
>     ... 1 more {code}
> Log (with some common messages filtered) attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16098: Verify pending recycle action when standby is re-assigned [kafka]

2024-01-10 Thread via GitHub


cadonna merged PR #15168:
URL: https://github.com/apache/kafka/pull/15168


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14546 - Support Partitioner fallback to default [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #14531:
URL: https://github.com/apache/kafka/pull/14531#discussion_r1447664834


##
clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java:
##
@@ -26,6 +26,15 @@
  */
 public interface Partitioner extends Configurable, Closeable {
 
+/**
+ * Indicate if the given topic is handled.  Returning {@code false} will 
cause the Producer to fallback to default partitioning.
+ *
+ * @param topic The topic name
+ */
+default boolean partitionsTopic(String topic) {
+return true;

Review Comment:
   I think the idea is that this would be implemented differently by different 
custom partitioners. As default though there is no change from current behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14546 - Support Partitioner fallback to default [kafka]

2024-01-10 Thread via GitHub


jolshan commented on PR #14531:
URL: https://github.com/apache/kafka/pull/14531#issuecomment-1885235923

   I think this change would require a KIP since it modifies the public API.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15747: KRaft support in DynamicConnectionQuotaTest [kafka]

2024-01-10 Thread via GitHub


mimaison merged PR #15028:
URL: https://github.com/apache/kafka/pull/15028


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15747) KRaft support in DynamicConnectionQuotaTest

2024-01-10 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-15747.

Fix Version/s: 3.8.0
   Resolution: Fixed

> KRaft support in DynamicConnectionQuotaTest
> ---
>
> Key: KAFKA-15747
> URL: https://issues.apache.org/jira/browse/KAFKA-15747
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
> Fix For: 3.8.0
>
>
> The following tests in DynamicConnectionQuotaTest in 
> core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
>  need to be updated to support KRaft
> 77 : def testDynamicConnectionQuota(): Unit = {
> 104 : def testDynamicListenerConnectionQuota(): Unit = {
> 175 : def testDynamicListenerConnectionCreationRateQuota(): Unit = {
> 237 : def testDynamicIpConnectionRateQuota(): Unit = {
> Scanned 416 lines. Found 0 KRaft tests out of 4 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15747: KRaft support in DynamicConnectionQuotaTest [kafka]

2024-01-10 Thread via GitHub


mimaison commented on PR #15028:
URL: https://github.com/apache/kafka/pull/15028#issuecomment-1885220999

   Do you have a Jira id? so I can assign the 
[ticket](https://issues.apache.org/jira/browse/KAFKA-15747) to you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15942: Implement ConsumerInterceptors in the async consumer [kafka]

2024-01-10 Thread via GitHub


lucasbru commented on PR #15000:
URL: https://github.com/apache/kafka/pull/15000#issuecomment-1885169869

   In terms of implementation of the autocommit-interceptor in the application 
thread, I see three options:
- Move auto-commit triggering to application thread. That seems cleanest to 
me (all commits are triggered from the application thread), but it's the 
largest refactoring. Then the interceptor is trivial.
- Share the Invoker queue with the background thread, and access it when we 
register the auto-commit callback. It's already shared with the background 
thread implicitly as part of the commitrequest completed future.
- Add a new background event in the background event queue. Seems unclean, 
since we already have a common datastructure shared between application & 
background thread for things to be invoked after a commit, and that is the 
"Invoker" (option 2).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-4759 Add support for IPv4 and IPv6 ranges in AclAuthorizer [kafka]

2024-01-10 Thread via GitHub


rgo commented on PR #9937:
URL: https://github.com/apache/kafka/pull/9937#issuecomment-1885023082

   > hi, I wonder is this feature implemented somewhere in another PR?
   
   Past month I received a PR in my branch to fix current conflicts. I will 
merge it as soon as possible.
   
   But as far I can said it got blocked because of the request of removing the 
external library.
   
   To be frank, it was rather discouraging the response time, making impossible 
any discussion (i.e., if the library should be used or not). Maybe things have 
changed, let's see.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15839) Topic ID integration in consumer subscription state

2024-01-10 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15839:
---
Summary: Topic ID integration in consumer subscription state  (was: Review 
topic ID integration in consumer subscription)

> Topic ID integration in consumer subscription state
> ---
>
> Key: KAFKA-15839
> URL: https://issues.apache.org/jira/browse/KAFKA-15839
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.8.0
>
>
> With the new consumer group protocol, assignments received by the consumer 
> contain topic IDs instead of topic names. Topic Ids are used in the 
> reconciliation path, integrated using TopicIdPartition. When reconciling, 
> topic names are resolved via a metadata update, but they are also kept in a 
> local #MembershipManager cache. This local cache serves the purpose of 
> keeping assigned topicId-names (that might have been deleted from metadata, 
> ex. topic deleted). 
> That's just an initial step towards spreading topic IDs internally in the 
> consumer code. Next step to address with this task would be to include topic 
> IDs in the subscription state, so that assigned topicId-names can be accessed 
> from other components without the need of resolving names multiple times.
> Note that this task aims only at spreading topic IDs internally in the 
> consumer, no changes to expose them at the API level. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15839) Review topic ID integration in consumer subscription

2024-01-10 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15839:
---
Description: 
With the new consumer group protocol, assignments received by the consumer 
contain topic IDs instead of topic names. Topic Ids are used in the 
reconciliation path, integrated using TopicIdPartition. When reconciling, topic 
names are resolved via a metadata update, but they are also kept in a local 
#MembershipManager cache. This local cache serves the purpose of keeping 
assigned topicId-names (that might have been deleted from metadata, ex. topic 
deleted). 
That's just an initial step towards spreading topic IDs internally in the 
consumer code. Next step to address with this task would be to include topic 
IDs in the subscription state, so that assigned topicId-names can be accessed 
from other components without the need of resolving names multiple times.

Note that this task aims only at spreading topic IDs internally in the 
consumer, no changes to expose them at the API level. 

  was:
TopicIdPartition is currently used in the reconciliation path. Could be used 
more, just leaving topicPartitions when necessary for the callbacks and 
interaction with the subscription state that does not fully support topic ids 
yet

Ensure that we properly handle topic re-creation (same name, diff topic IDs) in 
the reconciliation process (assignment cache, same assignment comparison, etc.)


> Review topic ID integration in consumer subscription
> 
>
> Key: KAFKA-15839
> URL: https://issues.apache.org/jira/browse/KAFKA-15839
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.8.0
>
>
> With the new consumer group protocol, assignments received by the consumer 
> contain topic IDs instead of topic names. Topic Ids are used in the 
> reconciliation path, integrated using TopicIdPartition. When reconciling, 
> topic names are resolved via a metadata update, but they are also kept in a 
> local #MembershipManager cache. This local cache serves the purpose of 
> keeping assigned topicId-names (that might have been deleted from metadata, 
> ex. topic deleted). 
> That's just an initial step towards spreading topic IDs internally in the 
> consumer code. Next step to address with this task would be to include topic 
> IDs in the subscription state, so that assigned topicId-names can be accessed 
> from other components without the need of resolving names multiple times.
> Note that this task aims only at spreading topic IDs internally in the 
> consumer, no changes to expose them at the API level. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15839) Review topic ID integration in consumer subscription

2024-01-10 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15839:
---
Summary: Review topic ID integration in consumer subscription  (was: Review 
topic ID integration in consumer reconciliation process)

> Review topic ID integration in consumer subscription
> 
>
> Key: KAFKA-15839
> URL: https://issues.apache.org/jira/browse/KAFKA-15839
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.8.0
>
>
> TopicIdPartition is currently used in the reconciliation path. Could be used 
> more, just leaving topicPartitions when necessary for the callbacks and 
> interaction with the subscription state that does not fully support topic ids 
> yet
> Ensure that we properly handle topic re-creation (same name, diff topic IDs) 
> in the reconciliation process (assignment cache, same assignment comparison, 
> etc.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move OffsetConfig to group-coordinator module [kafka]

2024-01-10 Thread via GitHub


dajac commented on PR #15161:
URL: https://github.com/apache/kafka/pull/15161#issuecomment-1884999821

   That seems fine. We also have `GroupCoordinatorConfig`. I hope that we can 
eventually consolidate them but we can look into this afterwards. We already 
have a Jira for reworking the group coordinator configs: 
https://issues.apache.org/jira/browse/KAFKA-15089.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]

2024-01-10 Thread via GitHub


dajac commented on code in PR #15158:
URL: https://github.com/apache/kafka/pull/15158#discussion_r1447491926


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+public class TransactionLogConfig {

Review Comment:
   Yeah, creating a new module makes sense to me. I would call it 
`transaction-coordinator` and the jar would be called 
`kafka-transaction-coordinator` in order to follow the naming scheme of the 
`group-coordinator`. I am not sure if it really needs to depend on the 
group-coordinator module though but I haven't not checked the details.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15866) Refactor OffsetFetchRequestState Error handling to be more consistent with OffsetCommitRequestState

2024-01-10 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans resolved KAFKA-15866.

Fix Version/s: 3.7.0
   (was: 3.8.0)
 Assignee: (was: Lan Ding)
   Resolution: Fixed

> Refactor OffsetFetchRequestState Error handling to be more consistent with 
> OffsetCommitRequestState
> ---
>
> Key: KAFKA-15866
> URL: https://issues.apache.org/jira/browse/KAFKA-15866
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.7.0
>
>
> The current OffsetFetchRequestState error handling uses nested if-else, which 
> is quite different, stylistically, to the OffsetCommitRequestState using a 
> switch statment.  The latter is a bit more readable so we should refactor the 
> error handling using the same style to improve readability.
>  
> A minor point: Some of the error handling seems inconsistent with the commit. 
> The logic was from the current implementation, so we should also review all 
> the error handling.  For example: somehow the current logic doesn't mark the 
> coordinator unavailable when receiving COORDINATOR_NOT_AVAILABLE



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16032) Review client inconsistent error handling of OffsetFetch and OffsetCommit responses

2024-01-10 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16032:
---
Parent: KAFKA-14048
Issue Type: Sub-task  (was: Bug)

> Review client inconsistent error handling of OffsetFetch and OffsetCommit 
> responses
> ---
>
> Key: KAFKA-16032
> URL: https://issues.apache.org/jira/browse/KAFKA-16032
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> OffsetFetch and OffsetCommit handle errors separately. There are 2 issues to 
> review around this:
>  - The logic is duplicated for some errors that are treated similarly (ex. 
> NOT_COORDINATOR)
>  - Some errors are not handled similarly in both requests (ex. 
> COORDINATOR_NOT_AVAILABLE handled and retried for OffsetCommit but not 
> OffsetFetch). Note that the specific errors handled by each request were kept 
> the same as in the legacy ConsumerCoordinator but this should be reviewed, in 
> an attempt to handle the same errors, in the same way, whenever possible.
> Note that the legacy approach handles expected errors for each path (FETCH 
> and COMMIT), retrying on those when needed, but does not retry on unexpected 
> retriable errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16093: Fix spurious REST-related warnings on Connect startup [kafka]

2024-01-10 Thread via GitHub


C0urante commented on code in PR #15149:
URL: https://github.com/apache/kafka/pull/15149#discussion_r1447487472


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java:
##
@@ -41,18 +41,11 @@ public class HerderRequestHandler {
 
 private final RestClient restClient;
 
-private volatile long requestTimeoutMs;
+private final RestRequestTimeout requestTimeout;

Review Comment:
   Ah, gotcha. The intent was to make it thread-safe since it's likely that 
writes and reads to/of that value will take place on different threads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]

2024-01-10 Thread via GitHub


ijuma commented on code in PR #15158:
URL: https://github.com/apache/kafka/pull/15158#discussion_r1447483283


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.transaction;
+
+public class TransactionLogConfig {

Review Comment:
   Let's see what @dajac says, but I think a `kafka-transactions` or 
`kafka-transactions-coordinator` module that depends on 
`kafka-group-coordinator` makes sense to me. The former option is shorter and 
as clear while the second follows the same pattern as for the group coordinator



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16093: Fix spurious REST-related warnings on Connect startup [kafka]

2024-01-10 Thread via GitHub


vamossagar12 commented on code in PR #15149:
URL: https://github.com/apache/kafka/pull/15149#discussion_r1447480579


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestRequestTimeout.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest;
+
+public interface RestRequestTimeout {

Review Comment:
   Ok. Makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16093: Fix spurious REST-related warnings on Connect startup [kafka]

2024-01-10 Thread via GitHub


vamossagar12 commented on PR #15149:
URL: https://github.com/apache/kafka/pull/15149#issuecomment-1884969533

   @C0urante , hmm okay. I understand those weren't blocker comments and I 
called them as nits. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16093: Fix spurious REST-related warnings on Connect startup [kafka]

2024-01-10 Thread via GitHub


vamossagar12 commented on code in PR #15149:
URL: https://github.com/apache/kafka/pull/15149#discussion_r1447468334


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java:
##
@@ -41,18 +41,11 @@ public class HerderRequestHandler {
 
 private final RestClient restClient;
 
-private volatile long requestTimeoutMs;
+private final RestRequestTimeout requestTimeout;

Review Comment:
   I didn't intend to mark the final field as volatile. I wanted to know why 
the field was marked as volatile in the old PR since you had made the change. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16107) Ensure consumer does not start fetching from added partitions until onPartitionsAssigned completes

2024-01-10 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16107:
---
Summary: Ensure consumer does not start fetching from added partitions 
until onPartitionsAssigned completes  (was: Ensure consumer does not start 
fetching from added partitions until onPartitionsAssgined completes)

> Ensure consumer does not start fetching from added partitions until 
> onPartitionsAssigned completes
> --
>
> Key: KAFKA-16107
> URL: https://issues.apache.org/jira/browse/KAFKA-16107
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support
>
> In the new consumer implementation, when new partitions are assigned, the 
> subscription state is updated and then the #onPartitionsAssigned triggered. 
> This sequence seems sensible but we need to ensure that no data is fetched 
> until the onPartitionsAssigned completes (where the user could be setting the 
> committed offsets it want to start fetching from).
> We should pause the partitions newly added partitions until 
> onPartitionsAssigned completes, similar to how it's done on revocation to 
> avoid positions getting ahead of the committed offsets.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16052) OOM in Kafka test suite

2024-01-10 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-16052:
-
Fix Version/s: 3.8.0

> OOM in Kafka test suite
> ---
>
> Key: KAFKA-16052
> URL: https://issues.apache.org/jira/browse/KAFKA-16052
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Divij Vaidya
>Priority: Major
> Fix For: 3.8.0
>
> Attachments: Screenshot 2023-12-27 at 14.04.52.png, Screenshot 
> 2023-12-27 at 14.22.21.png, Screenshot 2023-12-27 at 14.45.20.png, Screenshot 
> 2023-12-27 at 15.31.09.png, Screenshot 2023-12-27 at 17.44.09.png, Screenshot 
> 2023-12-28 at 00.13.06.png, Screenshot 2023-12-28 at 00.18.56.png, Screenshot 
> 2023-12-28 at 11.26.03.png, Screenshot 2023-12-28 at 11.26.09.png, Screenshot 
> 2023-12-28 at 18.44.19.png, Screenshot 2024-01-10 at 14.59.47.png, newRM.patch
>
>
> *Problem*
> Our test suite is failing with frequent OOM. Discussion in the mailing list 
> is here: [https://lists.apache.org/thread/d5js0xpsrsvhgjb10mbzo9cwsy8087x4] 
> *Setup*
> To find the source of leaks, I ran the :core:test build target with a single 
> thread (see below on how to do it) and attached a profiler to it. This Jira 
> tracks the list of action items identified from the analysis.
> How to run tests using a single thread:
> {code:java}
> diff --git a/build.gradle b/build.gradle
> index f7abbf4f0b..81df03f1ee 100644
> --- a/build.gradle
> +++ b/build.gradle
> @@ -74,9 +74,8 @@ ext {
>        "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED"
>      )-  maxTestForks = project.hasProperty('maxParallelForks') ? 
> maxParallelForks.toInteger() : Runtime.runtime.availableProcessors()
> -  maxScalacThreads = project.hasProperty('maxScalacThreads') ? 
> maxScalacThreads.toInteger() :
> -      Math.min(Runtime.runtime.availableProcessors(), 8)
> +  maxTestForks = 1
> +  maxScalacThreads = 1
>    userIgnoreFailures = project.hasProperty('ignoreFailures') ? 
> ignoreFailures : false   userMaxTestRetries = 
> project.hasProperty('maxTestRetries') ? maxTestRetries.toInteger() : 0
> diff --git a/gradle.properties b/gradle.properties
> index 4880248cac..ee4b6e3bc1 100644
> --- a/gradle.properties
> +++ b/gradle.properties
> @@ -30,4 +30,4 @@ scalaVersion=2.13.12
>  swaggerVersion=2.2.8
>  task=build
>  org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC
> -org.gradle.parallel=true
> +org.gradle.parallel=false {code}
> *Result of experiment*
> This is how the heap memory utilized looks like, starting from tens of MB to 
> ending with 1.5GB (with spikes of 2GB) of heap being used as the test 
> executes. Note that the total number of threads also increases but it does 
> not correlate with sharp increase in heap memory usage. The heap dump is 
> available at 
> [https://www.dropbox.com/scl/fi/nwtgc6ir6830xlfy9z9cu/GradleWorkerMain_10311_27_12_2023_13_37_08.hprof.zip?rlkey=ozbdgh5vih4rcynnxbatzk7ln=0]
>  
> !Screenshot 2023-12-27 at 14.22.21.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16052) OOM in Kafka test suite

2024-01-10 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805163#comment-17805163
 ] 

Divij Vaidya commented on KAFKA-16052:
--

On current trunk, the heap "at max" goes to 1200MB compared to 1800MB provided 
in the description. I have also not seen any OOM for CI for quite a while now 
based on 
[https://ge.apache.org/scans/failures?failures.failureClassification=all_failures=Execution%20failed%20for%20task%20%27:tools:test%27.%0A%3E%20Process%20%27Gradle%20Test%20Executor%2096%27%20finished%20with%20non-zero%20exit%20value%201%0A%20%20This%20problem%20might%20be%20caused%20by%20incorrect%20test%20process%20configuration.%0A%20%20For%20more%20on%20test%20execution%2C%20please%20refer%20to%20https:%2F%2Fdocs.gradle.org%2F8.5%2Fuserguide%2Fjava_testing.html%23sec:test_execution%20in%20the%20Gradle%20documentation.=kafka=trunk=Europe%2FBerlin]
 

Also, notice the drastic decrease in number of threads in the test (right 
graph) due to fixes made here.

At this stage, I am resolving this Jira based on the above. We have some future 
looking tasks at [https://github.com/apache/kafka/pull/15101] to fix this 
permanently. 


!Screenshot 2024-01-10 at 14.59.47.png!

> OOM in Kafka test suite
> ---
>
> Key: KAFKA-16052
> URL: https://issues.apache.org/jira/browse/KAFKA-16052
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Divij Vaidya
>Priority: Major
> Attachments: Screenshot 2023-12-27 at 14.04.52.png, Screenshot 
> 2023-12-27 at 14.22.21.png, Screenshot 2023-12-27 at 14.45.20.png, Screenshot 
> 2023-12-27 at 15.31.09.png, Screenshot 2023-12-27 at 17.44.09.png, Screenshot 
> 2023-12-28 at 00.13.06.png, Screenshot 2023-12-28 at 00.18.56.png, Screenshot 
> 2023-12-28 at 11.26.03.png, Screenshot 2023-12-28 at 11.26.09.png, Screenshot 
> 2023-12-28 at 18.44.19.png, Screenshot 2024-01-10 at 14.59.47.png, newRM.patch
>
>
> *Problem*
> Our test suite is failing with frequent OOM. Discussion in the mailing list 
> is here: [https://lists.apache.org/thread/d5js0xpsrsvhgjb10mbzo9cwsy8087x4] 
> *Setup*
> To find the source of leaks, I ran the :core:test build target with a single 
> thread (see below on how to do it) and attached a profiler to it. This Jira 
> tracks the list of action items identified from the analysis.
> How to run tests using a single thread:
> {code:java}
> diff --git a/build.gradle b/build.gradle
> index f7abbf4f0b..81df03f1ee 100644
> --- a/build.gradle
> +++ b/build.gradle
> @@ -74,9 +74,8 @@ ext {
>        "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED"
>      )-  maxTestForks = project.hasProperty('maxParallelForks') ? 
> maxParallelForks.toInteger() : Runtime.runtime.availableProcessors()
> -  maxScalacThreads = project.hasProperty('maxScalacThreads') ? 
> maxScalacThreads.toInteger() :
> -      Math.min(Runtime.runtime.availableProcessors(), 8)
> +  maxTestForks = 1
> +  maxScalacThreads = 1
>    userIgnoreFailures = project.hasProperty('ignoreFailures') ? 
> ignoreFailures : false   userMaxTestRetries = 
> project.hasProperty('maxTestRetries') ? maxTestRetries.toInteger() : 0
> diff --git a/gradle.properties b/gradle.properties
> index 4880248cac..ee4b6e3bc1 100644
> --- a/gradle.properties
> +++ b/gradle.properties
> @@ -30,4 +30,4 @@ scalaVersion=2.13.12
>  swaggerVersion=2.2.8
>  task=build
>  org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC
> -org.gradle.parallel=true
> +org.gradle.parallel=false {code}
> *Result of experiment*
> This is how the heap memory utilized looks like, starting from tens of MB to 
> ending with 1.5GB (with spikes of 2GB) of heap being used as the test 
> executes. Note that the total number of threads also increases but it does 
> not correlate with sharp increase in heap memory usage. The heap dump is 
> available at 
> [https://www.dropbox.com/scl/fi/nwtgc6ir6830xlfy9z9cu/GradleWorkerMain_10311_27_12_2023_13_37_08.hprof.zip?rlkey=ozbdgh5vih4rcynnxbatzk7ln=0]
>  
> !Screenshot 2023-12-27 at 14.22.21.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16108) Backport fix for KAFKA-16093 to 3.7

2024-01-10 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-16108:
-

 Summary: Backport fix for KAFKA-16093 to 3.7
 Key: KAFKA-16108
 URL: https://issues.apache.org/jira/browse/KAFKA-16108
 Project: Kafka
  Issue Type: Improvement
  Components: connect
Reporter: Chris Egerton
Assignee: Chris Egerton
 Fix For: 3.7.1


A fix for KAFKA-16093 is present on the branches trunk (the version for which 
is currently 3.8.0-SNAPSHOT) and 3.6. We are in code freeze for the 3.7.0 
release, and this issue is not a blocker, so it cannot be backported right now.

We should backport the fix once 3.7.0 has been released and before 3.7.1 is 
released.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16107) Ensure consumer does not start fetching from added partitions until onPartitionsAssgined completes

2024-01-10 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16107:
--

 Summary: Ensure consumer does not start fetching from added 
partitions until onPartitionsAssgined completes
 Key: KAFKA-16107
 URL: https://issues.apache.org/jira/browse/KAFKA-16107
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Reporter: Lianet Magrans


In the new consumer implementation, when new partitions are assigned, the 
subscription state is updated and then the #onPartitionsAssigned triggered. 
This sequence seems sensible but we need to ensure that no data is fetched 
until the onPartitionsAssigned completes (where the user could be setting the 
committed offsets it want to start fetching from).
We should pause the partitions newly added partitions until 
onPartitionsAssigned completes, similar to how it's done on revocation to avoid 
positions getting ahead of the committed offsets.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16093) Spurious warnings logged to stderr about empty path annotations and providers not implementing provider interfaces

2024-01-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-16093:
--
Fix Version/s: 3.6.2

> Spurious warnings logged to stderr about empty path annotations and providers 
> not implementing provider interfaces
> --
>
> Key: KAFKA-16093
> URL: https://issues.apache.org/jira/browse/KAFKA-16093
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Affects Versions: 3.6.0, 3.7.0, 3.6.1, 3.6.2, 3.8.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Minor
> Fix For: 3.6.2, 3.8.0
>
>
> Some warnings get logged to stderr on Connect startup. For example:
> {quote}Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. 
> Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
> in SERVER runtime does not implement any provider interfaces applicable in 
> the SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be 
> ignored. 
> Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource will 
> be ignored. 
> Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Jan 08, 2024 1:48:18 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be 
> ignored. 
> Jan 08, 2024 1:48:19 PM org.glassfish.jersey.internal.Errors logErrors
> WARNING: The following warnings have been detected: WARNING: The 
> (sub)resource method listLoggers in 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectors in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method createConnector in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectorPlugins in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> contains empty path annotation.
> WARNING: The (sub)resource method serverInfo in 
> org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty 
> path annotation.
> {quote}
> These are benign, but can confuse and even frighten new users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16052) OOM in Kafka test suite

2024-01-10 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-16052:
-
Attachment: Screenshot 2024-01-10 at 14.59.47.png

> OOM in Kafka test suite
> ---
>
> Key: KAFKA-16052
> URL: https://issues.apache.org/jira/browse/KAFKA-16052
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Divij Vaidya
>Priority: Major
> Attachments: Screenshot 2023-12-27 at 14.04.52.png, Screenshot 
> 2023-12-27 at 14.22.21.png, Screenshot 2023-12-27 at 14.45.20.png, Screenshot 
> 2023-12-27 at 15.31.09.png, Screenshot 2023-12-27 at 17.44.09.png, Screenshot 
> 2023-12-28 at 00.13.06.png, Screenshot 2023-12-28 at 00.18.56.png, Screenshot 
> 2023-12-28 at 11.26.03.png, Screenshot 2023-12-28 at 11.26.09.png, Screenshot 
> 2023-12-28 at 18.44.19.png, Screenshot 2024-01-10 at 14.59.47.png, newRM.patch
>
>
> *Problem*
> Our test suite is failing with frequent OOM. Discussion in the mailing list 
> is here: [https://lists.apache.org/thread/d5js0xpsrsvhgjb10mbzo9cwsy8087x4] 
> *Setup*
> To find the source of leaks, I ran the :core:test build target with a single 
> thread (see below on how to do it) and attached a profiler to it. This Jira 
> tracks the list of action items identified from the analysis.
> How to run tests using a single thread:
> {code:java}
> diff --git a/build.gradle b/build.gradle
> index f7abbf4f0b..81df03f1ee 100644
> --- a/build.gradle
> +++ b/build.gradle
> @@ -74,9 +74,8 @@ ext {
>        "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED"
>      )-  maxTestForks = project.hasProperty('maxParallelForks') ? 
> maxParallelForks.toInteger() : Runtime.runtime.availableProcessors()
> -  maxScalacThreads = project.hasProperty('maxScalacThreads') ? 
> maxScalacThreads.toInteger() :
> -      Math.min(Runtime.runtime.availableProcessors(), 8)
> +  maxTestForks = 1
> +  maxScalacThreads = 1
>    userIgnoreFailures = project.hasProperty('ignoreFailures') ? 
> ignoreFailures : false   userMaxTestRetries = 
> project.hasProperty('maxTestRetries') ? maxTestRetries.toInteger() : 0
> diff --git a/gradle.properties b/gradle.properties
> index 4880248cac..ee4b6e3bc1 100644
> --- a/gradle.properties
> +++ b/gradle.properties
> @@ -30,4 +30,4 @@ scalaVersion=2.13.12
>  swaggerVersion=2.2.8
>  task=build
>  org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC
> -org.gradle.parallel=true
> +org.gradle.parallel=false {code}
> *Result of experiment*
> This is how the heap memory utilized looks like, starting from tens of MB to 
> ending with 1.5GB (with spikes of 2GB) of heap being used as the test 
> executes. Note that the total number of threads also increases but it does 
> not correlate with sharp increase in heap memory usage. The heap dump is 
> available at 
> [https://www.dropbox.com/scl/fi/nwtgc6ir6830xlfy9z9cu/GradleWorkerMain_10311_27_12_2023_13_37_08.hprof.zip?rlkey=ozbdgh5vih4rcynnxbatzk7ln=0]
>  
> !Screenshot 2023-12-27 at 14.22.21.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >