[jira] [Updated] (KAFKA-10434) Remove deprecated methods on WindowStore
[ https://issues.apache.org/jira/browse/KAFKA-10434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jorge Esteban Quilcate Otoya updated KAFKA-10434: - Labels: needs-kip (was: kip-required) > Remove deprecated methods on WindowStore > > > Key: KAFKA-10434 > URL: https://issues.apache.org/jira/browse/KAFKA-10434 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jorge Esteban Quilcate Otoya >Priority: Major > Labels: needs-kip > > From [https://github.com/apache/kafka/pull/9138#discussion_r474985997] and > [https://github.com/apache/kafka/pull/9138#discussion_r474995606] : > WindowStore contains ReadOnlyWindowStore methods. > We could consider: > * Moving read methods from WindowStore to ReadOnlyWindowStore and/or > * Consider removing long based methods -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-683284825 > If it's meant to be called by every request, then maybe we should have the delayed actions in a separate class instead of ReplicaManager. Other classes could, in theory, add their own delayed actions to this queue too. I preferred this idea as it prevent us from missing any action. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo opened a new pull request #9228: KAFKA-10445: Align IQ SessionStore API with Instant-based methods as ReadOnlyWindowStore
jeqo opened a new pull request #9228: URL: https://github.com/apache/kafka/pull/9228 [KIP-666](https://cwiki.apache.org/confluence/display/KAFKA/KIP-666%3A+Add+Instant-based+methods+to+ReadOnlySessionStore) proposal ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479661386 ## File path: core/src/main/scala/kafka/server/ActionQueue.scala ## @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.LinkedBlockingDeque + +/** + * This queue is used to collect actions which need to be executed later. One use case is that ReplicaManager#appendRecords + * produces record changes so we need to check and complete delayed requests. In order to avoid conflicting locking, + * we add those actions to this queue and then complete them at the end of KafkaApis.handle() or DelayedJoin.onExpiration. + */ +class ActionQueue { + private val queue = new LinkedBlockingDeque[() => Unit]() + + /** + * add action to this queue. + * @param action action + */ + def add(action: () => Unit): Unit = queue.put(action) + + /** + * picks up a action to complete. Review comment: a action => an action ## File path: core/src/main/scala/kafka/server/DelayedOperation.scala ## @@ -100,41 +99,22 @@ abstract class DelayedOperation(override val delayMs: Long, def tryComplete(): Boolean /** - * Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired - * without blocking. * - * If threadA acquires the lock and performs the check for completion before completion criteria is met - * and threadB satisfies the completion criteria, but fails to acquire the lock because threadA has not - * yet released the lock, we need to ensure that completion is attempted again without blocking threadA - * or threadB. `tryCompletePending` is set by threadB when it fails to acquire the lock and at least one - * of threadA or threadB will attempt completion of the operation if this flag is set. This ensures that - * every invocation of `maybeTryComplete` is followed by at least one invocation of `tryComplete` until - * the operation is actually completed. + * There is a long story about using "lock" or "tryLock". + * + * 1) using lock - There was a lot of cases that a thread holds a group lock and then it tries to hold more group + * locks to complete delayed requests. Unfortunately, the scenario causes deadlock and so we had introduced the + * "tryLock" to avoid deadlock. + * + * 2) using tryLock - However, the "tryLock" causes another issue that the delayed requests may be into + * oblivion if the thread, which should complete the delayed requests, fails to get the lock. + * + * Now, we go back to use "lock" and make sure the thread which tries to complete delayed requests does NOT hold lock. + * The approach is that ReplicaManager collects all actions, which are used to complete delayed requests, in a queue. + * KafkaApis.handle() and the expiration thread for certain delayed operations (e.g. DelayedJoin) pick up and then + * execute an action when no lock is held. Review comment: Perhaps we could add a note at the top of DelayedOperation so that people are aware of the need to complete actions for new DelayedOperations in the future. ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -585,6 +591,23 @@ class ReplicaManager(val config: KafkaConfig, result.info.logStartOffset, result.info.recordErrors.asJava, result.info.errorMessage)) // response status } + actionQueue.add { +() => + localProduceResults.foreach { +case (topicPartition, result) => + val requestKey = TopicPartitionOperationKey(topicPartition) + if (result.info.leaderHWIncremented) { Review comment: It seems that we have to distinguish 3 states here: (1) records not appended due to an error; (2) records appended successfully and HWM advanced; (3) records appended successfully and HWM not advanced. In case (1), no purgatory needs to be checked. ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -180,6 +181,11 @@ class KafkaApis(val requestChannel: RequestChannel, c
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479677889 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -180,6 +181,11 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request) case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotasRequest(request) } + + // try to complete delayed action. In order to avoid conflicting locking, the actions to complete delayed requests + // are kept in a queue. We add the logic to check the ReplicaManager queue at the end of KafkaApis.handle() and the + // expiration thread for certain delayed operations (e.g. DelayedJoin) + actionQueue.tryCompleteAction() Review comment: Completing delayed actions may cause exception. Should exception be swallowed and log if we move the completion to the final block? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479678653 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -585,6 +591,23 @@ class ReplicaManager(val config: KafkaConfig, result.info.logStartOffset, result.info.recordErrors.asJava, result.info.errorMessage)) // response status } + actionQueue.add { +() => + localProduceResults.foreach { +case (topicPartition, result) => + val requestKey = TopicPartitionOperationKey(topicPartition) + if (result.info.leaderHWIncremented) { Review comment: you are right. I miss the exception in ```appendToLocalLog```. Let me revert this change This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
ijuma commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479678761 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -562,6 +564,10 @@ class ReplicaManager(val config: KafkaConfig, * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; * the callback function will be triggered either when timeout or the required acks are satisfied; * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock. + * + * Noted that all pending delayed check operations are stored in a queue. All callers to ReplicaManager.appendRecords() + * are expected to take up to 1 item from that queue and check the completeness for all affected partitions, without Review comment: What's the reasoning for taking just 1 item @junrao? Could this cause the queue to grow over time? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
ijuma commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479678812 ## File path: core/src/main/scala/kafka/server/KafkaServer.scala ## @@ -134,6 +134,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id" private val KAFKA_BROKER_ID: String = "kafka.broker.id" + private val actionQueue = new ActionQueue Review comment: Another approach would be for this queue to be per request thread instead of per server. That would simplify concurrency handling. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479679930 ## File path: core/src/main/scala/kafka/server/DelayedOperation.scala ## @@ -100,41 +99,22 @@ abstract class DelayedOperation(override val delayMs: Long, def tryComplete(): Boolean /** - * Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired - * without blocking. * - * If threadA acquires the lock and performs the check for completion before completion criteria is met - * and threadB satisfies the completion criteria, but fails to acquire the lock because threadA has not - * yet released the lock, we need to ensure that completion is attempted again without blocking threadA - * or threadB. `tryCompletePending` is set by threadB when it fails to acquire the lock and at least one - * of threadA or threadB will attempt completion of the operation if this flag is set. This ensures that - * every invocation of `maybeTryComplete` is followed by at least one invocation of `tryComplete` until - * the operation is actually completed. + * There is a long story about using "lock" or "tryLock". + * + * 1) using lock - There was a lot of cases that a thread holds a group lock and then it tries to hold more group + * locks to complete delayed requests. Unfortunately, the scenario causes deadlock and so we had introduced the + * "tryLock" to avoid deadlock. + * + * 2) using tryLock - However, the "tryLock" causes another issue that the delayed requests may be into + * oblivion if the thread, which should complete the delayed requests, fails to get the lock. + * + * Now, we go back to use "lock" and make sure the thread which tries to complete delayed requests does NOT hold lock. + * The approach is that ReplicaManager collects all actions, which are used to complete delayed requests, in a queue. + * KafkaApis.handle() and the expiration thread for certain delayed operations (e.g. DelayedJoin) pick up and then + * execute an action when no lock is held. Review comment: It is dangerous to complete delayed actions by ```DelayedOperation``` as most methods of ```DelayedOperation``` are executed with locking. We, now, depend on ```KafkaApi. handler``` to complete the delayed action produced by someone who can't complete delayed action due to locking. For example, ```DelayedJoin.onComplete``` can produce an new action and the new action can't be completed by ```DelayedJoin``` itself due to group locking. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479680637 ## File path: core/src/main/scala/kafka/server/KafkaServer.scala ## @@ -134,6 +134,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id" private val KAFKA_BROKER_ID: String = "kafka.broker.id" + private val actionQueue = new ActionQueue Review comment: The thread has to pass queue to method of ReplicManager/GroupCoordinator if queue is kept by thread. A lot of methods are included so that is a big patch. I prefer to keep small patch though it gets bigger now :( This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479683036 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -562,6 +564,10 @@ class ReplicaManager(val config: KafkaConfig, * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; * the callback function will be triggered either when timeout or the required acks are satisfied; * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock. + * + * Noted that all pending delayed check operations are stored in a queue. All callers to ReplicaManager.appendRecords() + * are expected to take up to 1 item from that queue and check the completeness for all affected partitions, without Review comment: It should be fine to let handler complete actions as much as possible since the response is created before handling delayed actions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479684163 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -85,6 +85,8 @@ object LogAppendInfo { * @param validBytes The number of valid bytes * @param offsetsMonotonic Are the offsets in this message set monotonically increasing * @param lastOffsetOfFirstBatch The last offset of the first batch + * @param leaderHWIncremented true if the high watermark is increased when appending record. Otherwise, false. + *this field is updated after appending record so the default value is None. Review comment: Perhaps we can make this a bit clearer. Sth like the following. leaderHWIncremented has 3 possible values: (1) If records are not appended due to an error, the value will be None; (2) if records are appended successfully and HWM is advanced, the value is Some(true); (3) if records are appended successfully and HWM is not advanced, the value is Some(false). ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -180,6 +181,11 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request) case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotasRequest(request) } + + // try to complete delayed action. In order to avoid conflicting locking, the actions to complete delayed requests + // are kept in a queue. We add the logic to check the ReplicaManager queue at the end of KafkaApis.handle() and the + // expiration thread for certain delayed operations (e.g. DelayedJoin) + actionQueue.tryCompleteAction() Review comment: Yes, if actionQueue.tryCompleteAction() throws an exception, we can just catch it and log a warning in finally since the response has been sent by then. ## File path: core/src/main/scala/kafka/server/KafkaServer.scala ## @@ -134,6 +134,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id" private val KAFKA_BROKER_ID: String = "kafka.broker.id" + private val actionQueue = new ActionQueue Review comment: Note that the action queue is not only called by requests threads, but also by the expiration thread for certain delayed operations. ## File path: core/src/main/scala/kafka/server/DelayedOperation.scala ## @@ -100,41 +99,22 @@ abstract class DelayedOperation(override val delayMs: Long, def tryComplete(): Boolean /** - * Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired - * without blocking. * - * If threadA acquires the lock and performs the check for completion before completion criteria is met - * and threadB satisfies the completion criteria, but fails to acquire the lock because threadA has not - * yet released the lock, we need to ensure that completion is attempted again without blocking threadA - * or threadB. `tryCompletePending` is set by threadB when it fails to acquire the lock and at least one - * of threadA or threadB will attempt completion of the operation if this flag is set. This ensures that - * every invocation of `maybeTryComplete` is followed by at least one invocation of `tryComplete` until - * the operation is actually completed. + * There is a long story about using "lock" or "tryLock". + * + * 1) using lock - There was a lot of cases that a thread holds a group lock and then it tries to hold more group + * locks to complete delayed requests. Unfortunately, the scenario causes deadlock and so we had introduced the + * "tryLock" to avoid deadlock. + * + * 2) using tryLock - However, the "tryLock" causes another issue that the delayed requests may be into + * oblivion if the thread, which should complete the delayed requests, fails to get the lock. + * + * Now, we go back to use "lock" and make sure the thread which tries to complete delayed requests does NOT hold lock. + * The approach is that ReplicaManager collects all actions, which are used to complete delayed requests, in a queue. + * KafkaApis.handle() and the expiration thread for certain delayed operations (e.g. DelayedJoin) pick up and then + * execute an action when no lock is held. Review comment: I was thinking to add a comment so that if someone adds a future delayed operation that calls ReplicaManager.appendRecords() in onComplete() like DelayedJoin, he/she is aware that this operation's onExpiration() needs to call actionQueue.tryCompleteAction(). ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -562,6 +564,10 @@ class ReplicaManager(val config: KafkaConfig, * Append messages to leader replicas of the partition, and wait for them to be replicated to o
[GitHub] [kafka] ijuma opened a new pull request #9229: MINOR: Reduce allocations in requests via buffer caching
ijuma opened a new pull request #9229: URL: https://github.com/apache/kafka/pull/9229 Use a caching BufferSupplier per request handler thread so that decompression buffers are cached if supported by the underlying CompressionType. This reduces allocations significantly for LZ4 when the number of partitions is high. The decompression buffer size is typically 64 KB, so a produce request with 1000 partitions results in 64 MB of allocations even if each produce batch is small (likely, when there are so many partitions). I did a quick producer perf local test with 5000 partitions, 1 KB record size, 1 broker, lz4 and ~0.5 for the producer compression rate metric: Before this change: > 2000 records sent, 346314.349535 records/sec (330.27 MB/sec), 148.33 ms avg latency, 2267.00 ms max latency, 115 ms 50th, 383 ms 95th, 777 ms 99th, 1514 ms 99.9th. After this change: > 2000 records sent, 431956.113259 records/sec (411.95 MB/sec), 117.79 ms avg latency, 1219.00 ms max latency, 99 ms 50th, 295 ms 95th, 440 ms 99th, 662 ms 99.9th. That's a 25% throughput improvement and p999 latency was reduced to under half (in this test). TODO: Remove default arguments ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
ijuma commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479686292 ## File path: core/src/main/scala/kafka/server/KafkaServer.scala ## @@ -134,6 +134,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id" private val KAFKA_BROKER_ID: String = "kafka.broker.id" + private val actionQueue = new ActionQueue Review comment: Good point @junrao. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
ijuma commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479686398 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -85,6 +85,8 @@ object LogAppendInfo { * @param validBytes The number of valid bytes * @param offsetsMonotonic Are the offsets in this message set monotonically increasing * @param lastOffsetOfFirstBatch The last offset of the first batch + * @param leaderHWIncremented true if the high watermark is increased when appending record. Otherwise, false. + *this field is updated after appending record so the default value is None. Review comment: @chia7712 I suggest using a sealed trait with 3 case objects to make this clearer. Using `Option[Boolean]` as a tristate value is generally best avoided. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479686517 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -85,6 +85,8 @@ object LogAppendInfo { * @param validBytes The number of valid bytes * @param offsetsMonotonic Are the offsets in this message set monotonically increasing * @param lastOffsetOfFirstBatch The last offset of the first batch + * @param leaderHWIncremented true if the high watermark is increased when appending record. Otherwise, false. + *this field is updated after appending record so the default value is None. Review comment: Will copy that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479689381 ## File path: core/src/main/scala/kafka/server/ActionQueue.scala ## @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.LinkedBlockingDeque + +/** + * This queue is used to collect actions which need to be executed later. One use case is that ReplicaManager#appendRecords + * produces record changes so we need to check and complete delayed requests. In order to avoid conflicting locking, + * we add those actions to this queue and then complete them at the end of KafkaApis.handle() or DelayedJoin.onExpiration. + */ +class ActionQueue { + private val queue = new LinkedBlockingDeque[() => Unit]() + + /** + * add action to this queue. + * @param action action + */ + def add(action: () => Unit): Unit = queue.put(action) + + /** + * picks up an action to complete. + */ + def tryCompleteAction(): Unit = { Review comment: @junrao @ijuma please take a look at this method This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
ijuma commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479689457 ## File path: core/src/main/scala/kafka/server/ActionQueue.scala ## @@ -37,7 +37,10 @@ class ActionQueue { * picks up an action to complete. */ def tryCompleteAction(): Unit = { -val action = queue.poll() -if (action != null) action() +var action = queue.poll() +while (action != null) { + action() Review comment: Main thing to decide is what to do in case of exception, do we stop processing or do we continue? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
ijuma commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479689653 ## File path: core/src/main/scala/kafka/server/ActionQueue.scala ## @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.LinkedBlockingDeque + +/** + * This queue is used to collect actions which need to be executed later. One use case is that ReplicaManager#appendRecords + * produces record changes so we need to check and complete delayed requests. In order to avoid conflicting locking, + * we add those actions to this queue and then complete them at the end of KafkaApis.handle() or DelayedJoin.onExpiration. + */ +class ActionQueue { + private val queue = new LinkedBlockingDeque[() => Unit]() Review comment: Why are we using a blocking queue? It doesn't seem like we ever need the blocking functionality. Am I missing something? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479689664 ## File path: core/src/main/scala/kafka/server/ActionQueue.scala ## @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.LinkedBlockingDeque + +/** + * This queue is used to collect actions which need to be executed later. One use case is that ReplicaManager#appendRecords + * produces record changes so we need to check and complete delayed requests. In order to avoid conflicting locking, + * we add those actions to this queue and then complete them at the end of KafkaApis.handle() or DelayedJoin.onExpiration. + */ +class ActionQueue { + private val queue = new LinkedBlockingDeque[() => Unit]() + + /** + * add action to this queue. + * @param action action + */ + def add(action: () => Unit): Unit = queue.put(action) + + /** + * picks up an action to complete. + */ + def tryCompleteAction(): Unit = { Review comment: I prefer to just catch it and log a warning as the response has been processed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479689694 ## File path: core/src/main/scala/kafka/server/ActionQueue.scala ## @@ -37,7 +37,10 @@ class ActionQueue { * picks up an action to complete. */ def tryCompleteAction(): Unit = { -val action = queue.poll() -if (action != null) action() +var action = queue.poll() +while (action != null) { + action() Review comment: I prefer to just catch it and log a warning as the response has been processed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479689838 ## File path: core/src/main/scala/kafka/server/ActionQueue.scala ## @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.LinkedBlockingDeque + +/** + * This queue is used to collect actions which need to be executed later. One use case is that ReplicaManager#appendRecords + * produces record changes so we need to check and complete delayed requests. In order to avoid conflicting locking, + * we add those actions to this queue and then complete them at the end of KafkaApis.handle() or DelayedJoin.onExpiration. + */ +class ActionQueue { + private val queue = new LinkedBlockingDeque[() => Unit]() Review comment: you are right. How about using ```ConcurrentLinkedQueue``` instead? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479689694 ## File path: core/src/main/scala/kafka/server/ActionQueue.scala ## @@ -37,7 +37,10 @@ class ActionQueue { * picks up an action to complete. */ def tryCompleteAction(): Unit = { -val action = queue.poll() -if (action != null) action() +var action = queue.poll() +while (action != null) { + action() Review comment: I prefer to just catch it and log a warning as the response has been processed. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9220: KAFKA-10433 Reuse the ByteBuffer in validating compressed records
ijuma commented on pull request #9220: URL: https://github.com/apache/kafka/pull/9220#issuecomment-683342367 @chia7712 here's what I had in mind: https://github.com/apache/kafka/pull/9229 What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9223: KAFKA-10438 Lazy initialization of record header to reduce memory usa…
ijuma commented on a change in pull request #9223: URL: https://github.com/apache/kafka/pull/9223#discussion_r479690701 ## File path: clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java ## @@ -270,7 +270,7 @@ public void testInvalidNumHeadersPartial() throws IOException { DefaultRecord.readPartiallyFrom(inputStream, skipArray, 0L, 0L, RecordBatch.NO_SEQUENCE, null); } -@Test(expected = StringIndexOutOfBoundsException.class) +@Test(expected = InvalidRecordException.class) Review comment: Since we parse the header lazily now, what is causing this exception? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9223: KAFKA-10438 Lazy initialization of record header to reduce memory usa…
ijuma commented on pull request #9223: URL: https://github.com/apache/kafka/pull/9223#issuecomment-683342810 Good change. Any thoughts on test for this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
ijuma commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479691180 ## File path: core/src/main/scala/kafka/server/ActionQueue.scala ## @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.LinkedBlockingDeque + +/** + * This queue is used to collect actions which need to be executed later. One use case is that ReplicaManager#appendRecords + * produces record changes so we need to check and complete delayed requests. In order to avoid conflicting locking, + * we add those actions to this queue and then complete them at the end of KafkaApis.handle() or DelayedJoin.onExpiration. + */ +class ActionQueue { + private val queue = new LinkedBlockingDeque[() => Unit]() Review comment: Yes, I think that's better. One thing I was wondering about is whether contention is going to be an issue for this `ActionQueue`. Multiple threads are adding items to it and then trying to consume from it. I haven't thought about all the details, but would a thread local work better? In that case, each thread would add and then drain. This works fine for the request threads, but I wasn't sure about the other case that @junrao pointed out. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9229: MINOR: Reduce allocations in requests via buffer caching
chia7712 commented on pull request #9229: URL: https://github.com/apache/kafka/pull/9229#issuecomment-683343524 This patch makes each request (handler) thread have a ```BufferSupplier``` to simplify concurrency handling (by contrast, #9220 offers a thread-safe BufferSupplier). This idea is good to me :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479692892 ## File path: core/src/main/scala/kafka/server/ActionQueue.scala ## @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.LinkedBlockingDeque + +/** + * This queue is used to collect actions which need to be executed later. One use case is that ReplicaManager#appendRecords + * produces record changes so we need to check and complete delayed requests. In order to avoid conflicting locking, + * we add those actions to this queue and then complete them at the end of KafkaApis.handle() or DelayedJoin.onExpiration. + */ +class ActionQueue { + private val queue = new LinkedBlockingDeque[() => Unit]() Review comment: Handler (thread) can have local ActionQueue and it is passed to each method to collect delayed action. ```DelayedJoin``` is specific case since the delayed actions are possible to be access by two thread (timeout thread and handler thread). A simple way to address thread local is that ```DelayedJoin``` owns a ActionQueue and the queue is passed to ```coordinator.onCompleteJoin``` and then the queue is consumed by ```DelayedJoin.onExpiration```. Both ```onComplete``` and ```onExpiration``` are thread-safe so use a thread local queue is safe. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479692892 ## File path: core/src/main/scala/kafka/server/ActionQueue.scala ## @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.LinkedBlockingDeque + +/** + * This queue is used to collect actions which need to be executed later. One use case is that ReplicaManager#appendRecords + * produces record changes so we need to check and complete delayed requests. In order to avoid conflicting locking, + * we add those actions to this queue and then complete them at the end of KafkaApis.handle() or DelayedJoin.onExpiration. + */ +class ActionQueue { + private val queue = new LinkedBlockingDeque[() => Unit]() Review comment: Handler (thread) can have local ActionQueue and it is passed to each method to collect delayed actions. ```DelayedJoin``` is specific case since the delayed actions are possible to be access by two thread (timeout thread and handler thread). A simple way to address thread local is that ```DelayedJoin``` owns an ActionQueue and the queue is passed to ```coordinator.onCompleteJoin``` and then the queue is consumed by ```DelayedJoin.onExpiration```. Both ```onComplete``` and ```onExpiration``` are thread-safe so use a thread local queue is safe. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9229: MINOR: Reduce allocations in requests via buffer caching
chia7712 commented on pull request #9229: URL: https://github.com/apache/kafka/pull/9229#issuecomment-683347067 Could we use ThreadLocal to keep those thread resources, like BufferSupplier and ActionQueue, to simplify the method arguments? The cost of ThreadLocal is low and it is easy to add new thread local resource in the future (and we don’t need to changes a lot of method arguments) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9229: MINOR: Reduce allocations in requests via buffer caching
ijuma commented on pull request #9229: URL: https://github.com/apache/kafka/pull/9229#issuecomment-683352094 In my opinion, thread locals are most useful when one doesn't control the code. For cases like this, being explicit makes it easier to reason about and also test. Even if it's a bit more work initially. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9229: MINOR: Reduce allocations in requests via buffer caching
ijuma commented on pull request #9229: URL: https://github.com/apache/kafka/pull/9229#issuecomment-683352558 @chia7712 One option would be for me to introduce a `RequestContext` case class and add the `BufferSupplier` as one of the fields. It would be easy to extend this class with request bound elements like `ActionQueue`. Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479706303 ## File path: core/src/main/scala/kafka/server/ActionQueue.scala ## @@ -37,7 +37,10 @@ class ActionQueue { * picks up an action to complete. */ def tryCompleteAction(): Unit = { -val action = queue.poll() -if (action != null) action() +var action = queue.poll() +while (action != null) { + action() Review comment: Perhaps we could do the try/catch of each action here instead of KafkaApis. This way, we are guaranteed that all pending actions are processed in time. ## File path: core/src/main/scala/kafka/server/ActionQueue.scala ## @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.LinkedBlockingDeque + +/** + * This queue is used to collect actions which need to be executed later. One use case is that ReplicaManager#appendRecords + * produces record changes so we need to check and complete delayed requests. In order to avoid conflicting locking, + * we add those actions to this queue and then complete them at the end of KafkaApis.handle() or DelayedJoin.onExpiration. + */ +class ActionQueue { + private val queue = new LinkedBlockingDeque[() => Unit]() + + /** + * add action to this queue. + * @param action action + */ + def add(action: () => Unit): Unit = queue.put(action) + + /** + * try to complete all delayed actions + */ + def tryCompleteActions(): Unit = { +var action = queue.poll() +while (action != null) { Review comment: If we are unlucky, a single thread could be held up in this loop for a long time. Perhaps we could let each thread only complete the number of actions that it sees when entering tryCompleteActions(). ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -562,6 +564,10 @@ class ReplicaManager(val config: KafkaConfig, * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas; * the callback function will be triggered either when timeout or the required acks are satisfied; * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock. + * + * Noted that all pending delayed check operations are stored in a queue. All callers to ReplicaManager.appendRecords() + * are expected to take up to 1 item from that queue and check the completeness for all affected partitions, without Review comment: Since we are draining more than 1 item now, this comment is no longer accurate. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9229: MINOR: Reduce allocations in requests via buffer caching
chia7712 commented on pull request #9229: URL: https://github.com/apache/kafka/pull/9229#issuecomment-683369221 > One option would be for me to introduce a RequestContext case class and add the BufferSupplier as one of the fields. It would be easy to extend this class with request bound elements like ActionQueue. Thoughts? It is great! +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
ijuma commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r479716882 ## File path: core/src/main/scala/kafka/server/ActionQueue.scala ## @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.LinkedBlockingDeque + +/** + * This queue is used to collect actions which need to be executed later. One use case is that ReplicaManager#appendRecords + * produces record changes so we need to check and complete delayed requests. In order to avoid conflicting locking, + * we add those actions to this queue and then complete them at the end of KafkaApis.handle() or DelayedJoin.onExpiration. + */ +class ActionQueue { + private val queue = new LinkedBlockingDeque[() => Unit]() Review comment: Maybe we can go with a single `ActionQueue` in this PR. We can submit a separate PR for reducing the contention by having one per thread. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation
dielhennr edited a comment on pull request #9101: URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305 Hey @jsancio , I added some work in progress to this branch including new APIs for this feature and functionality using them. Fitting user and client-id into the `DescribeConfigs` API was awkward so I thought that the next best step would be to create a specialized set of APIs, similar to [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and extensible interface for describing and altering client configs. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Do you have a preference? I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. This connectionId includes ip:port of the client as well as the broker. The issue here is that even if registration was tied to just ip:port of a client without the ip:port of the broker, the client uses different ports when talking to different brokers. This leads me to believe that tying supported config registration to the ip:port of a client will not work. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation
dielhennr edited a comment on pull request #9101: URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305 Hey @jsancio , I added some work in progress to this branch including new APIs for this feature and functionality using them. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and extensible interface for describing and altering client configs. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Do you have a preference? I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since it can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation
dielhennr edited a comment on pull request #9101: URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305 Hey @jsancio , I added some work in progress to this branch including new APIs for this feature and functionality using them. Fitting user and client-id into the `DescribeConfigs` API was awkward, so I thought that the next best step would be to create a specialized set of APIs similar to [KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client). These changes allow for a more expressive and well-defined interface for describing and altering client configs. I'm wondering if I should create a new KIP and branch so that the old implementation can be referenced without digging into commit or page history. Do you have a preference? I am also working on having the clients register the configs that they support with the brokers. I tried tying the registration to connectionId in the hopes that this would give a unique identifier to each running application. However, this will not work since it can change while a client is active. Similarly, tying registration to ip:port will not work because a client talks to different brokers on different ports. Would it be safe to assume that clients with the same ip address are all the same version? Do you have any suggestions for what identifier config registration should be tied to if this assumption cannot be made? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org