[jira] [Updated] (KAFKA-10434) Remove deprecated methods on WindowStore

2020-08-29 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya updated KAFKA-10434:
-
Labels: needs-kip  (was: kip-required)

> Remove deprecated methods on WindowStore
> 
>
> Key: KAFKA-10434
> URL: https://issues.apache.org/jira/browse/KAFKA-10434
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: needs-kip
>
> From [https://github.com/apache/kafka/pull/9138#discussion_r474985997] and 
> [https://github.com/apache/kafka/pull/9138#discussion_r474995606] :
> WindowStore contains ReadOnlyWindowStore methods.
> We could consider:
>  * Moving read methods from WindowStore to ReadOnlyWindowStore and/or
>  * Consider removing long based methods



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-683284825


   > If it's meant to be called by every request, then maybe we should have the 
delayed actions in a separate class instead of ReplicaManager. Other classes 
could, in theory, add their own delayed actions to this queue too.
   
   I preferred this idea as it prevent us from missing any action.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo opened a new pull request #9228: KAFKA-10445: Align IQ SessionStore API with Instant-based methods as ReadOnlyWindowStore

2020-08-29 Thread GitBox


jeqo opened a new pull request #9228:
URL: https://github.com/apache/kafka/pull/9228


   
[KIP-666](https://cwiki.apache.org/confluence/display/KAFKA/KIP-666%3A+Add+Instant-based+methods+to+ReadOnlySessionStore)
 proposal
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479661386



##
File path: core/src/main/scala/kafka/server/ActionQueue.scala
##
@@ -0,0 +1,48 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.util.concurrent.LinkedBlockingDeque
+
+/**
+ * This queue is used to collect actions which need to be executed later. One 
use case is that ReplicaManager#appendRecords
+ * produces record changes so we need to check and complete delayed requests. 
In order to avoid conflicting locking,
+ * we add those actions to this queue and then complete them at the end of 
KafkaApis.handle() or DelayedJoin.onExpiration.
+ */
+class ActionQueue {
+  private val queue = new LinkedBlockingDeque[() => Unit]()
+
+  /**
+   * add action to this queue.
+   * @param action action
+   */
+  def add(action: () => Unit): Unit = queue.put(action)
+
+  /**
+   * picks up a action to complete.

Review comment:
   a action => an action

##
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##
@@ -100,41 +99,22 @@ abstract class DelayedOperation(override val delayMs: Long,
   def tryComplete(): Boolean
 
   /**
-   * Thread-safe variant of tryComplete() that attempts completion only if the 
lock can be acquired
-   * without blocking.
*
-   * If threadA acquires the lock and performs the check for completion before 
completion criteria is met
-   * and threadB satisfies the completion criteria, but fails to acquire the 
lock because threadA has not
-   * yet released the lock, we need to ensure that completion is attempted 
again without blocking threadA
-   * or threadB. `tryCompletePending` is set by threadB when it fails to 
acquire the lock and at least one
-   * of threadA or threadB will attempt completion of the operation if this 
flag is set. This ensures that
-   * every invocation of `maybeTryComplete` is followed by at least one 
invocation of `tryComplete` until
-   * the operation is actually completed.
+   * There is a long story about using "lock" or "tryLock".
+   *
+   * 1) using lock - There was a lot of cases that a thread holds a group lock 
and then it tries to hold more group
+   * locks to complete delayed requests. Unfortunately, the scenario causes 
deadlock and so we had introduced the
+   * "tryLock" to avoid deadlock.
+   *
+   * 2) using tryLock -  However, the "tryLock" causes another issue that the 
delayed requests may be into
+   * oblivion if the thread, which should complete the delayed requests, fails 
to get the lock.
+   *
+   * Now, we go back to use "lock" and make sure the thread which tries to 
complete delayed requests does NOT hold lock.
+   * The approach is that ReplicaManager collects all actions, which are used 
to complete delayed requests, in a queue.
+   * KafkaApis.handle() and the expiration thread for certain delayed 
operations (e.g. DelayedJoin) pick up and then
+   * execute an action when no lock is held.

Review comment:
   Perhaps we could add a note at the top of DelayedOperation so that 
people are aware of the need to complete actions for new DelayedOperations in 
the future.

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -585,6 +591,23 @@ class ReplicaManager(val config: KafkaConfig,
 result.info.logStartOffset, 
result.info.recordErrors.asJava, result.info.errorMessage)) // response status
   }
 
+  actionQueue.add {
+() =>
+  localProduceResults.foreach {
+case (topicPartition, result) =>
+  val requestKey = TopicPartitionOperationKey(topicPartition)
+  if (result.info.leaderHWIncremented) {

Review comment:
   It seems that we have to distinguish 3 states here: (1) records not 
appended due to an error; (2) records appended successfully and HWM advanced; 
(3) records appended successfully and HWM not advanced. In case (1), no 
purgatory needs to be checked.

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -180,6 +181,11 @@ class KafkaApis(val requestChannel: RequestChannel,
 c

[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479677889



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -180,6 +181,11 @@ class KafkaApis(val requestChannel: RequestChannel,
 case ApiKeys.DESCRIBE_CLIENT_QUOTAS => 
handleDescribeClientQuotasRequest(request)
 case ApiKeys.ALTER_CLIENT_QUOTAS => 
handleAlterClientQuotasRequest(request)
   }
+
+  // try to complete delayed action. In order to avoid conflicting 
locking, the actions to complete delayed requests
+  // are kept in a queue. We add the logic to check the ReplicaManager 
queue at the end of KafkaApis.handle() and the
+  // expiration thread for certain delayed operations (e.g. DelayedJoin)
+  actionQueue.tryCompleteAction()

Review comment:
   Completing delayed actions may cause exception. Should exception be 
swallowed and log if we move the completion to the final block?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479678653



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -585,6 +591,23 @@ class ReplicaManager(val config: KafkaConfig,
 result.info.logStartOffset, 
result.info.recordErrors.asJava, result.info.errorMessage)) // response status
   }
 
+  actionQueue.add {
+() =>
+  localProduceResults.foreach {
+case (topicPartition, result) =>
+  val requestKey = TopicPartitionOperationKey(topicPartition)
+  if (result.info.leaderHWIncremented) {

Review comment:
   you are right. I miss the exception in ```appendToLocalLog```. Let me 
revert this change





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


ijuma commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479678761



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -562,6 +564,10 @@ class ReplicaManager(val config: KafkaConfig,
* Append messages to leader replicas of the partition, and wait for them to 
be replicated to other replicas;
* the callback function will be triggered either when timeout or the 
required acks are satisfied;
* if the callback function itself is already synchronized on some object 
then pass this object to avoid deadlock.
+   *
+   * Noted that all pending delayed check operations are stored in a queue. 
All callers to ReplicaManager.appendRecords()
+   * are expected to take up to 1 item from that queue and check the 
completeness for all affected partitions, without

Review comment:
   What's the reasoning for taking just 1 item @junrao? Could this cause 
the queue to grow over time?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


ijuma commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479678812



##
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##
@@ -134,6 +134,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
   private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
   private val KAFKA_BROKER_ID: String = "kafka.broker.id"
 
+  private val actionQueue = new ActionQueue

Review comment:
   Another approach would be for this queue to be per request thread 
instead of per server. That would simplify concurrency handling.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479679930



##
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##
@@ -100,41 +99,22 @@ abstract class DelayedOperation(override val delayMs: Long,
   def tryComplete(): Boolean
 
   /**
-   * Thread-safe variant of tryComplete() that attempts completion only if the 
lock can be acquired
-   * without blocking.
*
-   * If threadA acquires the lock and performs the check for completion before 
completion criteria is met
-   * and threadB satisfies the completion criteria, but fails to acquire the 
lock because threadA has not
-   * yet released the lock, we need to ensure that completion is attempted 
again without blocking threadA
-   * or threadB. `tryCompletePending` is set by threadB when it fails to 
acquire the lock and at least one
-   * of threadA or threadB will attempt completion of the operation if this 
flag is set. This ensures that
-   * every invocation of `maybeTryComplete` is followed by at least one 
invocation of `tryComplete` until
-   * the operation is actually completed.
+   * There is a long story about using "lock" or "tryLock".
+   *
+   * 1) using lock - There was a lot of cases that a thread holds a group lock 
and then it tries to hold more group
+   * locks to complete delayed requests. Unfortunately, the scenario causes 
deadlock and so we had introduced the
+   * "tryLock" to avoid deadlock.
+   *
+   * 2) using tryLock -  However, the "tryLock" causes another issue that the 
delayed requests may be into
+   * oblivion if the thread, which should complete the delayed requests, fails 
to get the lock.
+   *
+   * Now, we go back to use "lock" and make sure the thread which tries to 
complete delayed requests does NOT hold lock.
+   * The approach is that ReplicaManager collects all actions, which are used 
to complete delayed requests, in a queue.
+   * KafkaApis.handle() and the expiration thread for certain delayed 
operations (e.g. DelayedJoin) pick up and then
+   * execute an action when no lock is held.

Review comment:
   It is dangerous to complete delayed actions by ```DelayedOperation``` as 
most methods of ```DelayedOperation``` are executed with locking. We, now, 
depend on ```KafkaApi. handler``` to complete the delayed action produced by 
someone who can't complete delayed action due to locking. For example, 
```DelayedJoin.onComplete``` can produce an new action and the new action can't 
be completed by ```DelayedJoin``` itself due to group locking.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479680637



##
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##
@@ -134,6 +134,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
   private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
   private val KAFKA_BROKER_ID: String = "kafka.broker.id"
 
+  private val actionQueue = new ActionQueue

Review comment:
   The thread has to pass queue to method of ReplicManager/GroupCoordinator 
if queue is kept by thread. A lot of methods are included so that is a big 
patch. I prefer to keep small patch though it gets bigger now :(





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479683036



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -562,6 +564,10 @@ class ReplicaManager(val config: KafkaConfig,
* Append messages to leader replicas of the partition, and wait for them to 
be replicated to other replicas;
* the callback function will be triggered either when timeout or the 
required acks are satisfied;
* if the callback function itself is already synchronized on some object 
then pass this object to avoid deadlock.
+   *
+   * Noted that all pending delayed check operations are stored in a queue. 
All callers to ReplicaManager.appendRecords()
+   * are expected to take up to 1 item from that queue and check the 
completeness for all affected partitions, without

Review comment:
   It should be fine to let handler complete actions as much as possible 
since the response is created before handling delayed actions.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479684163



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -85,6 +85,8 @@ object LogAppendInfo {
  * @param validBytes The number of valid bytes
  * @param offsetsMonotonic Are the offsets in this message set monotonically 
increasing
  * @param lastOffsetOfFirstBatch The last offset of the first batch
+ * @param leaderHWIncremented true if the high watermark is increased when 
appending record. Otherwise, false.
+ *this field is updated after appending record so 
the default value is None.

Review comment:
   Perhaps we can make this a bit clearer. Sth like the following.
   
   leaderHWIncremented has 3 possible values: (1) If records are not appended 
due to an error, the value will be None; (2) if records are appended 
successfully and HWM is advanced, the value is Some(true); (3) if records are 
appended successfully and HWM is not advanced, the value is Some(false).

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -180,6 +181,11 @@ class KafkaApis(val requestChannel: RequestChannel,
 case ApiKeys.DESCRIBE_CLIENT_QUOTAS => 
handleDescribeClientQuotasRequest(request)
 case ApiKeys.ALTER_CLIENT_QUOTAS => 
handleAlterClientQuotasRequest(request)
   }
+
+  // try to complete delayed action. In order to avoid conflicting 
locking, the actions to complete delayed requests
+  // are kept in a queue. We add the logic to check the ReplicaManager 
queue at the end of KafkaApis.handle() and the
+  // expiration thread for certain delayed operations (e.g. DelayedJoin)
+  actionQueue.tryCompleteAction()

Review comment:
   Yes, if actionQueue.tryCompleteAction() throws an exception, we can just 
catch it and log a warning in finally since the response has been sent by then.

##
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##
@@ -134,6 +134,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
   private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
   private val KAFKA_BROKER_ID: String = "kafka.broker.id"
 
+  private val actionQueue = new ActionQueue

Review comment:
   Note that the action queue is not only called by requests threads, but 
also by the expiration thread for certain delayed operations.

##
File path: core/src/main/scala/kafka/server/DelayedOperation.scala
##
@@ -100,41 +99,22 @@ abstract class DelayedOperation(override val delayMs: Long,
   def tryComplete(): Boolean
 
   /**
-   * Thread-safe variant of tryComplete() that attempts completion only if the 
lock can be acquired
-   * without blocking.
*
-   * If threadA acquires the lock and performs the check for completion before 
completion criteria is met
-   * and threadB satisfies the completion criteria, but fails to acquire the 
lock because threadA has not
-   * yet released the lock, we need to ensure that completion is attempted 
again without blocking threadA
-   * or threadB. `tryCompletePending` is set by threadB when it fails to 
acquire the lock and at least one
-   * of threadA or threadB will attempt completion of the operation if this 
flag is set. This ensures that
-   * every invocation of `maybeTryComplete` is followed by at least one 
invocation of `tryComplete` until
-   * the operation is actually completed.
+   * There is a long story about using "lock" or "tryLock".
+   *
+   * 1) using lock - There was a lot of cases that a thread holds a group lock 
and then it tries to hold more group
+   * locks to complete delayed requests. Unfortunately, the scenario causes 
deadlock and so we had introduced the
+   * "tryLock" to avoid deadlock.
+   *
+   * 2) using tryLock -  However, the "tryLock" causes another issue that the 
delayed requests may be into
+   * oblivion if the thread, which should complete the delayed requests, fails 
to get the lock.
+   *
+   * Now, we go back to use "lock" and make sure the thread which tries to 
complete delayed requests does NOT hold lock.
+   * The approach is that ReplicaManager collects all actions, which are used 
to complete delayed requests, in a queue.
+   * KafkaApis.handle() and the expiration thread for certain delayed 
operations (e.g. DelayedJoin) pick up and then
+   * execute an action when no lock is held.

Review comment:
   I was thinking to add a comment so that if someone adds a future delayed 
operation that calls ReplicaManager.appendRecords() in onComplete() like 
DelayedJoin, he/she is aware that this operation's onExpiration() needs to call 
actionQueue.tryCompleteAction().

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -562,6 +564,10 @@ class ReplicaManager(val config: KafkaConfig,
* Append messages to leader replicas of the partition, and wait for them to 
be replicated to o

[GitHub] [kafka] ijuma opened a new pull request #9229: MINOR: Reduce allocations in requests via buffer caching

2020-08-29 Thread GitBox


ijuma opened a new pull request #9229:
URL: https://github.com/apache/kafka/pull/9229


   Use a caching BufferSupplier per request handler thread so that
   decompression buffers are cached if supported by the underlying
   CompressionType. This reduces allocations significantly for LZ4 when the
   number of partitions is high. The decompression buffer size is typically
   64 KB, so a produce request with 1000 partitions results in 64 MB of
   allocations even if each produce batch is small (likely, when there are
   so many partitions).
   
   I did a quick producer perf local test with 5000 partitions, 1 KB record
   size,
   1 broker, lz4 and ~0.5 for the producer compression rate metric:
   
   Before this change:
   > 2000 records sent, 346314.349535 records/sec (330.27 MB/sec),
   148.33 ms avg latency, 2267.00 ms max latency, 115 ms 50th, 383 ms 95th,
   777 ms 99th, 1514 ms 99.9th.
   
   After this change:
   > 2000 records sent, 431956.113259 records/sec (411.95 MB/sec),
   117.79 ms avg latency, 1219.00 ms max latency, 99 ms 50th, 295 ms 95th,
   440 ms 99th, 662 ms 99.9th.
   
   That's a 25% throughput improvement and p999 latency was reduced to
   under half (in this test).
   
   TODO: Remove default arguments
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


ijuma commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479686292



##
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##
@@ -134,6 +134,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
   private val KAFKA_CLUSTER_ID: String = "kafka.cluster.id"
   private val KAFKA_BROKER_ID: String = "kafka.broker.id"
 
+  private val actionQueue = new ActionQueue

Review comment:
   Good point @junrao.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


ijuma commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479686398



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -85,6 +85,8 @@ object LogAppendInfo {
  * @param validBytes The number of valid bytes
  * @param offsetsMonotonic Are the offsets in this message set monotonically 
increasing
  * @param lastOffsetOfFirstBatch The last offset of the first batch
+ * @param leaderHWIncremented true if the high watermark is increased when 
appending record. Otherwise, false.
+ *this field is updated after appending record so 
the default value is None.

Review comment:
   @chia7712 I suggest using a sealed trait with 3 case objects to make 
this clearer. Using `Option[Boolean]` as a tristate value is generally best 
avoided.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479686517



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -85,6 +85,8 @@ object LogAppendInfo {
  * @param validBytes The number of valid bytes
  * @param offsetsMonotonic Are the offsets in this message set monotonically 
increasing
  * @param lastOffsetOfFirstBatch The last offset of the first batch
+ * @param leaderHWIncremented true if the high watermark is increased when 
appending record. Otherwise, false.
+ *this field is updated after appending record so 
the default value is None.

Review comment:
   Will copy that





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479689381



##
File path: core/src/main/scala/kafka/server/ActionQueue.scala
##
@@ -0,0 +1,46 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.util.concurrent.LinkedBlockingDeque
+
+/**
+ * This queue is used to collect actions which need to be executed later. One 
use case is that ReplicaManager#appendRecords
+ * produces record changes so we need to check and complete delayed requests. 
In order to avoid conflicting locking,
+ * we add those actions to this queue and then complete them at the end of 
KafkaApis.handle() or DelayedJoin.onExpiration.
+ */
+class ActionQueue {
+  private val queue = new LinkedBlockingDeque[() => Unit]()
+
+  /**
+   * add action to this queue.
+   * @param action action
+   */
+  def add(action: () => Unit): Unit = queue.put(action)
+
+  /**
+   * picks up an action to complete.
+   */
+  def tryCompleteAction(): Unit = {

Review comment:
   @junrao @ijuma please take a look at this method





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


ijuma commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479689457



##
File path: core/src/main/scala/kafka/server/ActionQueue.scala
##
@@ -37,7 +37,10 @@ class ActionQueue {
* picks up an action to complete.
*/
   def tryCompleteAction(): Unit = {
-val action = queue.poll()
-if (action != null) action()
+var action = queue.poll()
+while (action != null) {
+  action()

Review comment:
   Main thing to decide is what to do in case of exception, do we stop 
processing or do we continue?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


ijuma commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479689653



##
File path: core/src/main/scala/kafka/server/ActionQueue.scala
##
@@ -0,0 +1,46 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.util.concurrent.LinkedBlockingDeque
+
+/**
+ * This queue is used to collect actions which need to be executed later. One 
use case is that ReplicaManager#appendRecords
+ * produces record changes so we need to check and complete delayed requests. 
In order to avoid conflicting locking,
+ * we add those actions to this queue and then complete them at the end of 
KafkaApis.handle() or DelayedJoin.onExpiration.
+ */
+class ActionQueue {
+  private val queue = new LinkedBlockingDeque[() => Unit]()

Review comment:
   Why are we using a blocking queue? It doesn't seem like we ever need the 
blocking functionality. Am I missing something?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479689664



##
File path: core/src/main/scala/kafka/server/ActionQueue.scala
##
@@ -0,0 +1,46 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.util.concurrent.LinkedBlockingDeque
+
+/**
+ * This queue is used to collect actions which need to be executed later. One 
use case is that ReplicaManager#appendRecords
+ * produces record changes so we need to check and complete delayed requests. 
In order to avoid conflicting locking,
+ * we add those actions to this queue and then complete them at the end of 
KafkaApis.handle() or DelayedJoin.onExpiration.
+ */
+class ActionQueue {
+  private val queue = new LinkedBlockingDeque[() => Unit]()
+
+  /**
+   * add action to this queue.
+   * @param action action
+   */
+  def add(action: () => Unit): Unit = queue.put(action)
+
+  /**
+   * picks up an action to complete.
+   */
+  def tryCompleteAction(): Unit = {

Review comment:
   I prefer to just catch it and log a warning as the response has been 
processed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479689694



##
File path: core/src/main/scala/kafka/server/ActionQueue.scala
##
@@ -37,7 +37,10 @@ class ActionQueue {
* picks up an action to complete.
*/
   def tryCompleteAction(): Unit = {
-val action = queue.poll()
-if (action != null) action()
+var action = queue.poll()
+while (action != null) {
+  action()

Review comment:
   I prefer to just catch it and log a warning as the response has been 
processed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479689838



##
File path: core/src/main/scala/kafka/server/ActionQueue.scala
##
@@ -0,0 +1,46 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.util.concurrent.LinkedBlockingDeque
+
+/**
+ * This queue is used to collect actions which need to be executed later. One 
use case is that ReplicaManager#appendRecords
+ * produces record changes so we need to check and complete delayed requests. 
In order to avoid conflicting locking,
+ * we add those actions to this queue and then complete them at the end of 
KafkaApis.handle() or DelayedJoin.onExpiration.
+ */
+class ActionQueue {
+  private val queue = new LinkedBlockingDeque[() => Unit]()

Review comment:
   you are right. How about using ```ConcurrentLinkedQueue``` instead?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479689694



##
File path: core/src/main/scala/kafka/server/ActionQueue.scala
##
@@ -37,7 +37,10 @@ class ActionQueue {
* picks up an action to complete.
*/
   def tryCompleteAction(): Unit = {
-val action = queue.poll()
-if (action != null) action()
+var action = queue.poll()
+while (action != null) {
+  action()

Review comment:
   I prefer to just catch it and log a warning as the response has been 
processed. WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9220: KAFKA-10433 Reuse the ByteBuffer in validating compressed records

2020-08-29 Thread GitBox


ijuma commented on pull request #9220:
URL: https://github.com/apache/kafka/pull/9220#issuecomment-683342367


   @chia7712 here's what I had in mind:
   https://github.com/apache/kafka/pull/9229
   
   What do you think?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9223: KAFKA-10438 Lazy initialization of record header to reduce memory usa…

2020-08-29 Thread GitBox


ijuma commented on a change in pull request #9223:
URL: https://github.com/apache/kafka/pull/9223#discussion_r479690701



##
File path: 
clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
##
@@ -270,7 +270,7 @@ public void testInvalidNumHeadersPartial() throws 
IOException {
 DefaultRecord.readPartiallyFrom(inputStream, skipArray, 0L, 0L, 
RecordBatch.NO_SEQUENCE, null);
 }
 
-@Test(expected = StringIndexOutOfBoundsException.class)
+@Test(expected = InvalidRecordException.class)

Review comment:
   Since we parse the header lazily now, what is causing this exception?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9223: KAFKA-10438 Lazy initialization of record header to reduce memory usa…

2020-08-29 Thread GitBox


ijuma commented on pull request #9223:
URL: https://github.com/apache/kafka/pull/9223#issuecomment-683342810


   Good change. Any thoughts on test for this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


ijuma commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479691180



##
File path: core/src/main/scala/kafka/server/ActionQueue.scala
##
@@ -0,0 +1,46 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.util.concurrent.LinkedBlockingDeque
+
+/**
+ * This queue is used to collect actions which need to be executed later. One 
use case is that ReplicaManager#appendRecords
+ * produces record changes so we need to check and complete delayed requests. 
In order to avoid conflicting locking,
+ * we add those actions to this queue and then complete them at the end of 
KafkaApis.handle() or DelayedJoin.onExpiration.
+ */
+class ActionQueue {
+  private val queue = new LinkedBlockingDeque[() => Unit]()

Review comment:
   Yes, I think that's better. One thing I was wondering about is whether 
contention is going to be an issue for this `ActionQueue`. Multiple threads are 
adding items to it and then trying to consume from it. I haven't thought about 
all the details, but would a thread local work better? In that case, each 
thread would add and then drain. This works fine for the request threads, but I 
wasn't sure about the other case that @junrao pointed out.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9229: MINOR: Reduce allocations in requests via buffer caching

2020-08-29 Thread GitBox


chia7712 commented on pull request #9229:
URL: https://github.com/apache/kafka/pull/9229#issuecomment-683343524


   This patch makes each request (handler) thread have a ```BufferSupplier``` 
to simplify concurrency handling (by contrast, #9220 offers a thread-safe 
BufferSupplier). 
   
   This idea is good to me :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479692892



##
File path: core/src/main/scala/kafka/server/ActionQueue.scala
##
@@ -0,0 +1,46 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.util.concurrent.LinkedBlockingDeque
+
+/**
+ * This queue is used to collect actions which need to be executed later. One 
use case is that ReplicaManager#appendRecords
+ * produces record changes so we need to check and complete delayed requests. 
In order to avoid conflicting locking,
+ * we add those actions to this queue and then complete them at the end of 
KafkaApis.handle() or DelayedJoin.onExpiration.
+ */
+class ActionQueue {
+  private val queue = new LinkedBlockingDeque[() => Unit]()

Review comment:
   Handler (thread) can have local ActionQueue and it is passed to each 
method to collect delayed action. ```DelayedJoin``` is specific case since the 
delayed actions are possible to be access by two thread (timeout thread and 
handler thread). A simple way to address thread local is that ```DelayedJoin``` 
owns a ActionQueue and the queue is passed to ```coordinator.onCompleteJoin``` 
and then the queue is consumed by ```DelayedJoin.onExpiration```. Both 
```onComplete``` and ```onExpiration``` are thread-safe so use a thread local 
queue is safe.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479692892



##
File path: core/src/main/scala/kafka/server/ActionQueue.scala
##
@@ -0,0 +1,46 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.util.concurrent.LinkedBlockingDeque
+
+/**
+ * This queue is used to collect actions which need to be executed later. One 
use case is that ReplicaManager#appendRecords
+ * produces record changes so we need to check and complete delayed requests. 
In order to avoid conflicting locking,
+ * we add those actions to this queue and then complete them at the end of 
KafkaApis.handle() or DelayedJoin.onExpiration.
+ */
+class ActionQueue {
+  private val queue = new LinkedBlockingDeque[() => Unit]()

Review comment:
   Handler (thread) can have local ActionQueue and it is passed to each 
method to collect delayed actions. ```DelayedJoin``` is specific case since the 
delayed actions are possible to be access by two thread (timeout thread and 
handler thread). A simple way to address thread local is that ```DelayedJoin``` 
owns an ActionQueue and the queue is passed to ```coordinator.onCompleteJoin``` 
and then the queue is consumed by ```DelayedJoin.onExpiration```. Both 
```onComplete``` and ```onExpiration``` are thread-safe so use a thread local 
queue is safe.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9229: MINOR: Reduce allocations in requests via buffer caching

2020-08-29 Thread GitBox


chia7712 commented on pull request #9229:
URL: https://github.com/apache/kafka/pull/9229#issuecomment-683347067


   Could we use ThreadLocal to keep those thread resources, like BufferSupplier 
and ActionQueue, to simplify the method arguments? The cost of ThreadLocal is 
low and it is easy to add new thread local resource in the future (and we don’t 
need to changes a lot of method arguments)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9229: MINOR: Reduce allocations in requests via buffer caching

2020-08-29 Thread GitBox


ijuma commented on pull request #9229:
URL: https://github.com/apache/kafka/pull/9229#issuecomment-683352094


   In my opinion, thread locals are most useful when one doesn't control the 
code. For cases like this, being explicit makes it easier to reason about and 
also test. Even if it's a bit more work initially.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9229: MINOR: Reduce allocations in requests via buffer caching

2020-08-29 Thread GitBox


ijuma commented on pull request #9229:
URL: https://github.com/apache/kafka/pull/9229#issuecomment-683352558


   @chia7712 One option would be for me to introduce a `RequestContext` case 
class and add the `BufferSupplier` as one of the fields. It would be easy to 
extend this class with request bound elements like `ActionQueue`. Thoughts?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479706303



##
File path: core/src/main/scala/kafka/server/ActionQueue.scala
##
@@ -37,7 +37,10 @@ class ActionQueue {
* picks up an action to complete.
*/
   def tryCompleteAction(): Unit = {
-val action = queue.poll()
-if (action != null) action()
+var action = queue.poll()
+while (action != null) {
+  action()

Review comment:
   Perhaps we could do the try/catch of each action here instead of 
KafkaApis. This way, we are guaranteed that all pending actions are processed 
in time.

##
File path: core/src/main/scala/kafka/server/ActionQueue.scala
##
@@ -0,0 +1,46 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.util.concurrent.LinkedBlockingDeque
+
+/**
+ * This queue is used to collect actions which need to be executed later. One 
use case is that ReplicaManager#appendRecords
+ * produces record changes so we need to check and complete delayed requests. 
In order to avoid conflicting locking,
+ * we add those actions to this queue and then complete them at the end of 
KafkaApis.handle() or DelayedJoin.onExpiration.
+ */
+class ActionQueue {
+  private val queue = new LinkedBlockingDeque[() => Unit]()
+
+  /**
+   * add action to this queue.
+   * @param action action
+   */
+  def add(action: () => Unit): Unit = queue.put(action)
+
+  /**
+   * try to complete all delayed actions
+   */
+  def tryCompleteActions(): Unit = {
+var action = queue.poll()
+while (action != null) {

Review comment:
   If we are unlucky, a single thread could be held up in this loop for a 
long time. Perhaps we could let each thread only complete the number of actions 
that it sees when entering tryCompleteActions().

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -562,6 +564,10 @@ class ReplicaManager(val config: KafkaConfig,
* Append messages to leader replicas of the partition, and wait for them to 
be replicated to other replicas;
* the callback function will be triggered either when timeout or the 
required acks are satisfied;
* if the callback function itself is already synchronized on some object 
then pass this object to avoid deadlock.
+   *
+   * Noted that all pending delayed check operations are stored in a queue. 
All callers to ReplicaManager.appendRecords()
+   * are expected to take up to 1 item from that queue and check the 
completeness for all affected partitions, without

Review comment:
   Since we are draining more than 1 item now, this comment is no longer 
accurate.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9229: MINOR: Reduce allocations in requests via buffer caching

2020-08-29 Thread GitBox


chia7712 commented on pull request #9229:
URL: https://github.com/apache/kafka/pull/9229#issuecomment-683369221


   > One option would be for me to introduce a RequestContext case class and 
add the BufferSupplier as one of the fields. It would be easy to extend this 
class with request bound elements like ActionQueue. Thoughts?
   
   It is great! +1



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-29 Thread GitBox


ijuma commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479716882



##
File path: core/src/main/scala/kafka/server/ActionQueue.scala
##
@@ -0,0 +1,46 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.util.concurrent.LinkedBlockingDeque
+
+/**
+ * This queue is used to collect actions which need to be executed later. One 
use case is that ReplicaManager#appendRecords
+ * produces record changes so we need to check and complete delayed requests. 
In order to avoid conflicting locking,
+ * we add those actions to this queue and then complete them at the end of 
KafkaApis.handle() or DelayedJoin.onExpiration.
+ */
+class ActionQueue {
+  private val queue = new LinkedBlockingDeque[() => Unit]()

Review comment:
   Maybe we can go with a single `ActionQueue` in this PR. We can submit a 
separate PR for reducing the contention by having one per thread.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

2020-08-29 Thread GitBox


dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this 
feature and functionality using them. Fitting user and client-id into the 
`DescribeConfigs` API was awkward so I thought that the next best step would be 
to create a specialized set of APIs, similar to  
[KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client).
 These changes allow for a more expressive and extensible interface for 
describing and altering client configs. I'm wondering if I should create a new 
KIP and branch so that the old implementation can be referenced without digging 
into commit or page history. Do you have a preference? 
   
   I am also working on having the clients register the configs that they 
support with the brokers. I tried tying the registration to connectionId in the 
hopes that this would give a unique identifier to each running application. 
This connectionId includes ip:port of the client as well as the broker. The 
issue here is that even if registration was tied to just ip:port of a client 
without the ip:port of the broker, the client uses different ports when talking 
to different brokers. This leads me to believe that tying supported config 
registration to the ip:port of a client will not work. Would it be safe to 
assume that clients with the same ip address are all the same version? Do you 
have any suggestions for what identifier config registration should be tied to 
if this assumption cannot be made?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

2020-08-29 Thread GitBox


dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this 
feature and functionality using them. Fitting user and client-id into the 
`DescribeConfigs` API was awkward, so I thought that the next best step would 
be to create a specialized set of APIs similar to  
[KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client).
 These changes allow for a more expressive and extensible interface for 
describing and altering client configs. I'm wondering if I should create a new 
KIP and branch so that the old implementation can be referenced without digging 
into commit or page history. Do you have a preference? 
   
   I am also working on having the clients register the configs that they 
support with the brokers. I tried tying the registration to connectionId in the 
hopes that this would give a unique identifier to each running application. 
However, this will not work since it can change while a client is active. 
Similarly, tying registration to ip:port will not work because a client talks 
to different brokers on different ports. Would it be safe to assume that 
clients with the same ip address are all the same version? Do you have any 
suggestions for what identifier config registration should be tied to if this 
assumption cannot be made?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

2020-08-29 Thread GitBox


dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this 
feature and functionality using them. Fitting user and client-id into the 
`DescribeConfigs` API was awkward, so I thought that the next best step would 
be to create a specialized set of APIs similar to  
[KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client).
 These changes allow for a more expressive and well-defined interface for 
describing and altering client configs. I'm wondering if I should create a new 
KIP and branch so that the old implementation can be referenced without digging 
into commit or page history. Do you have a preference? 
   
   I am also working on having the clients register the configs that they 
support with the brokers. I tried tying the registration to connectionId in the 
hopes that this would give a unique identifier to each running application. 
However, this will not work since it can change while a client is active. 
Similarly, tying registration to ip:port will not work because a client talks 
to different brokers on different ports. Would it be safe to assume that 
clients with the same ip address are all the same version? Do you have any 
suggestions for what identifier config registration should be tied to if this 
assumption cannot be made?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org