[jira] [Created] (KAFKA-14424) Cancellation of an ongoing replica reassignment should have sanity checks

2022-11-29 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-14424:
--

 Summary: Cancellation of an ongoing replica reassignment should 
have sanity checks
 Key: KAFKA-14424
 URL: https://issues.apache.org/jira/browse/KAFKA-14424
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


When reassigning replicas, Kafka runs a sanity check to ensure all of the 
target replicas are alive before allowing the reassignment request to proceed.
However, for an AlterPartitionReassignments request that cancels an ongoing 
reassignment, there is no such check.
The result is that if the original replicas are offline, the cancellation may 
result in partitions
without any leaders. This problem has been observed in our clusters.

 

There should be some sanity check to ensure the cancellation would also land 
the partitions in valid states, e.g. by ensuring all of the original replicas 
are all alive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14381) Support listing all partitions being reassigned in a cluster

2022-11-10 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-14381:
--

 Summary: Support listing all partitions being reassigned in a 
cluster
 Key: KAFKA-14381
 URL: https://issues.apache.org/jira/browse/KAFKA-14381
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


The current implementation of kafka-topics.sh doesn't support listing all of 
the partitions that are being reassigned within a cluster.

Showing such info can be really useful during troubleshooting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14213) Reduce lock contention between control-plane-kafka-request-handler and kafka-log-cleaner-thread

2022-09-09 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-14213:
--

 Summary: Reduce lock contention between 
control-plane-kafka-request-handler and kafka-log-cleaner-thread
 Key: KAFKA-14213
 URL: https://issues.apache.org/jira/browse/KAFKA-14213
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


We found that the StopReplica request's local processing time may be quite long 
due to the reasons explained below. The impact of a time-consuming StopReplica 
request is that all subsequent requests from the controller are blocked, 
causing slow convergence on the metadata plane.

 

The long local processing time is because the 
control-plane-kafka-request-handler thread is blocked on a lock to abort 
logCleaning with the following stack trace:

 
{code:java}
"control-plane-kafka-request-handler-0"
java.lang.Thread.State: WAITING at 
java.base@11.0.13/jdk.internal.misc.Unsafe.park(Native Method) at 
java.base@11.0.13/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
 at 
java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
 at 
java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:917)
 at 
java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1240)
 at 
java.base@11.0.13/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:267)
 at kafka.log.LogCleanerManager.abortCleaning(LogCleanerManager.scala:266) at 
kafka.log.LogCleaner.abortCleaning(LogCleaner.scala:205) at 
kafka.log.LogManager.asyncDelete(LogManager.scala:1039)
{code}
 

 

In the mean time, the kafka-log-cleaner-thread is holding the lock and busy 
figuring out the filthiest compacted log, which can be a very time consuming 
task. Our periodic thread dumps captured many snapshots with the 
kafka-log-cleaner-thread in either of the following 2 stack traces:

 

 
{code:java}
"kafka-log-cleaner-thread-0"
java.lang.Thread.State: RUNNABLE at 
kafka.log.TimeIndex.(TimeIndex.scala:57) at 
kafka.log.LazyIndex$.$anonfun$forTime$1(LazyIndex.scala:109) at 
kafka.log.LazyIndex$$$Lambda$1871/0x000800f4d840.apply(Unknown Source) at 
kafka.log.LazyIndex.$anonfun$get$1(LazyIndex.scala:63) at 
kafka.log.LazyIndex.get(LazyIndex.scala:60) at 
kafka.log.LogSegment.timeIndex(LogSegment.scala:66) at 
kafka.log.LogSegment.maxTimestampAndOffsetSoFar(LogSegment.scala:107) at 
kafka.log.LogSegment.maxTimestampSoFar(LogSegment.scala:113) at 
kafka.log.LogSegment.largestTimestamp(LogSegment.scala:640) at 
kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4(LogCleanerManager.scala:617)
 at 
kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4$adapted(LogCleanerManager.scala:616)
 at kafka.log.LogCleanerManager$$$Lambda$2215/0x00080104d040.apply(Unknown 
Source) at scala.collection.Iterator.find(Iterator.scala:993) at 
scala.collection.Iterator.find$(Iterator.scala:990) at 
scala.collection.AbstractIterator.find(Iterator.scala:1429) at 
scala.collection.IterableLike.find(IterableLike.scala:81) at 
scala.collection.IterableLike.find$(IterableLike.scala:80) at 
scala.collection.AbstractIterable.find(Iterable.scala:56) at 
kafka.log.LogCleanerManager$.cleanableOffsets(LogCleanerManager.scala:616) at 
kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:186)
 at kafka.log.LogCleanerManager$$Lambda$2212/0x00080104f040.apply(Unknown 
Source) at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at 
scala.collection.TraversableLike$$Lambda$891/0x000800a89840.apply(Unknown 
Source) at scala.collection.immutable.List.foreach(List.scala:392)
{code}
 

 

 
{code:java}
"kafka-log-cleaner-thread-0" java.lang.Thread.State: RUNNABLE at 
java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread0(Native Method) at 
java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread(FileDispatcherImpl.java:54)
 at java.base@11.0.13/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:274) 
at java.base@11.0.13/sun.nio.ch.IOUtil.read(IOUtil.java:245) at 
java.base@11.0.13/sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:811)
 at java.base@11.0.13/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:796) 
at org.apache.kafka.common.utils.Utils.readFully(Utils.java:1114) at 
org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:1087) at 
org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:69)
 at 
org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:42)
 at 
org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35)
 at 
org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24)
 at 
org.apache.kafka.common.utils.AbstractIterator.maybeComput

[jira] [Created] (KAFKA-13815) Avoid reinitialization for a replica that is being deleted

2022-04-11 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-13815:
--

 Summary: Avoid reinitialization for a replica that is being deleted
 Key: KAFKA-13815
 URL: https://issues.apache.org/jira/browse/KAFKA-13815
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


https://issues.apache.org/jira/browse/KAFKA-10002

identified that deletion of replicas can be slow when a StopReplica request is 
being
processed, and has implemented a change to improve the efficiency.
We found that the efficiency can be further improved by avoiding the 
reinitialization of the
leader epoch cache and partition metadata for a replica that needs to be 
deleted.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13797) Adding metric to indicate metadata response outgoing bytes rate

2022-04-04 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-13797:
--

 Summary: Adding metric to indicate metadata response outgoing 
bytes rate
 Key: KAFKA-13797
 URL: https://issues.apache.org/jira/browse/KAFKA-13797
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


It's not a common case, but we experienced the following problem in one of our 
clusters.

The use case involves dynamically creating and deleting topics in the cluster, 
and the clients were constantly checking if a topic exists in a cluster using 
the special type of Metadata requests whose topics field is null in order to 
retrieve all topics before checking a topic's existence.

A high rate of such Metadata requests generated a heavy load on brokers in the 
cluster. 

Yet, currently, there is no metric to indicate the metadata response outgoing 
bytes rate.

We propose to add such a metric in order to make the troubleshooting of such 
cases easier.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13188) Release the memory back into MemoryPool

2021-08-10 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-13188:
--

 Summary: Release the memory back into MemoryPool
 Key: KAFKA-13188
 URL: https://issues.apache.org/jira/browse/KAFKA-13188
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


Tushar made a [hotfix change|https://github.com/linkedin/kafka/pull/186] to the 
linkedin/kafka repo hosting apache kafka 2.4.

The change is about releasing memory back to the MemoryPool for the kafka 
consumer, and his benchmark showed significant improvement in terms of the 
memory graduating from Young Gen and promoted to Old Gen.

Given the benefit, the change should also be added trunk.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12315) Clearing the ZkReplicaStateMachine request batch state upon ControllerMovedException

2021-02-08 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-12315:
--

 Summary: Clearing the ZkReplicaStateMachine request batch state 
upon ControllerMovedException
 Key: KAFKA-12315
 URL: https://issues.apache.org/jira/browse/KAFKA-12315
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang
 Attachments: controller_moved_left_over_state.png

As shown in the attached sequence diagram, during topic deletion the following 
sequence of events can happen
1. The ZkReplicaStateMachine calls 
AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and
   adds some entries to its stopReplicaRequestMap

2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr

3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a 
ControllerMovedException may be thrown
   due to zkVersion check failure

4. The ControllerMovedException is captured by the ZkPartitionStateMachine and 
an error such as the following is created:

2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=31862] Controller moved to another broker when moving some 
replicas to OfflineReplica state
org.apache.kafka.common.errors.ControllerMovedException: Controller epoch 
zkVersion check fails. Expected zkVersion = 139

5. The ControllerMovedException is rethrown and captured by the 
KafkaController, which will resign

At this point, the controller has resigned, however the stopReplicaRequestMap 
state populated in step 1 hasn't been cleared.

Later on, when the controller wins an election and becomes the active 
controller again, an IllegalStateException will be triggered due to the left 
over state:

```
2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=31862] Error while moving some replicas to OnlineReplica state
java.lang.IllegalStateException: Controller to broker state change requests 
batch is not empty while creating a new one. Some StopReplica state changes 
Map(6121 -> ListB\
uffer(StopRepl\
icaRequestInfo([Topic=,Partition=2,Replica=6121],false))) 
might be lost
at 
kafka.controller.AbstractControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:383)
 ~[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:109)
 ~[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.ReplicaStateMachine.startup(ReplicaStateMachine.scala:40) 
~[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:365)
 ~[kafka_2.12-2.4.1.10.jar:?]
at kafka.controller.KafkaController.elect(KafkaController.scala:1484) 
~[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.KafkaController.processReelect(KafkaController.scala:1972) 
~[kafka_2.12-2.4.1.10.jar:?]
at kafka.controller.KafkaController.process(KafkaController.scala:2065) 
~[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53) 
~[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:137)
 ~[kafka_2.12-2.4.1.10.jar:?]
at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
[scala-library-2.12.10.jar:?]
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) 
[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:137)
 [kafka_2.12-2.4.1.10.jar:?]
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) 
[kafka_2.12-2.4.1.10.jar:?]
```
Essentially, the controller is not able to transition some replicas to 
OnlineReplica state, and it cannot send any requests to any brokers via the 
ReplicaStateMachine.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10751) Generate log to help estimate messages lost during ULE

2020-11-19 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-10751:
--

 Summary: Generate log to help estimate messages lost during ULE
 Key: KAFKA-10751
 URL: https://issues.apache.org/jira/browse/KAFKA-10751
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang
Assignee: Lucas Wang


During Unclean Leader Election, there could be data loss due to truncation at 
the resigned leader.

Suppose there are 3 brokers that has replicas for a given partition:
Broker A (leader) with largest offset 9 (log end offset 10)
Broker B (follower) with largest offset 4 (log end offset 5)
Broker C (follower) with largest offset 1 (log end offset 2)

Only the leader A is in the ISR with B and C lagging behind.
Now an unclean leader election causes the leadership to be transferred to C. 
Broker A would need to truncate 8 messages, and Broker B 3 messages.

Case 1: if these messages have been produced with acks=0 or 1, then clients 
would experience 8 lost messages.
Case 2: if the client is using acks=all and the partition's minISR setting is 
2, and further let's assume broker B dropped out of the ISR after receiving the 
message with offset 4, then only the messages with offset<=4 have been acked to 
the client. The truncation effectively causes the client to lose 3 messages.

Knowing the exact amount of data loss involves knowing the client's acks 
setting when the messages are produced, and also whether the messages have been 
sufficiently replicated according to the MinISR setting.


If getting the exact data loss is too involved, at least there should be logs 
to help ESTIMATE the amount of data loss.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10734) Speedup the processing of LeaderAndIsr request

2020-11-17 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-10734:
--

 Summary: Speedup the processing of LeaderAndIsr request
 Key: KAFKA-10734
 URL: https://issues.apache.org/jira/browse/KAFKA-10734
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang
Assignee: Lucas Wang


Consider the case where a LeaderAndIsr request contains many partitions, of 
which the broker is asked to become the follower. Let's call these partitions 
*partitionsToMakeFollower*. Further more, let's assume the cluster has n 
brokers and each broker is configured to have m replica fetchers (via the 
num.replica.fetchers config). 
The broker is likely to have (n-1) * m fetcher threads.
Processing the LeaderAndIsr request requires
1. removing the "partitionsToMakeFollower" from all of the fetcher threads 
sequentially so that they won't be fetching from obsolete leaders.

2. adding the "partitionsToMakeFollower" to all of the fetcher threads 
sequentially

3. shutting down the idle fetcher threads sequentially (by checking the number 
of partitions held by each fetcher thread)

On top of that, for each of the 3 operations above, the operation is handled by 
the request handler thread (i.e. io thread). And to complete the operation, the 
request handler thread needs to contend for the "partitionMapLock" with the 
corresponding fetcher thread. In the worst case, the request handler thread is 
blocked for (n-1) * m times for removing the partitions, another (n-1) * m 
times for adding the partitions, and yet another (n-1) * m times for shutting 
down the idle fetcher threads.

Overall, all of the blocking can result in a significant delay in processing 
the LeaderAndIsr request. The further implication is that if the follower 
delays its fetching from the leader, there could be under MinISR partitions in 
the cluster, causing unavailability for clients.

This ticket is created to track speedup in the processing of the LeaderAndIsr 
request.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-10-10 Thread Lucas Wang
Thanks for your review, Joel and Dong.
I've updated the KIP according to Dong's last comments.

Cheers!
Lucas

On Tue, Oct 9, 2018 at 10:06 PM Dong Lin  wrote:

> Hey Lucas,
>
> Thanks for the KIP. Looks good overall. +1
>
> I have two trivial comments which may be a bit useful to reader.
>
> - Can we include the default value for the new config in Public Interface
> section? Typically the default value of the new config is an important part
> of public interface and we usually specify it in the KIP's public interface
> section.
> - Can we change "whose default capacity is 20" to  "whose capacity is 20"
> in the section "How are controller requests handled over the dedicated
> connections"? The use of word "default" seems to suggest that this is
> configurable.
>
> Thanks,
> Dong
>
> On Mon, Jun 18, 2018 at 1:04 PM Lucas Wang  wrote:
>
> > Hi All,
> >
> > I've addressed a couple of comments in the discussion thread for KIP-291,
> > and
> > got no objections after making the changes. Therefore I would like to
> start
> > the voting thread.
> >
> > KIP:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-291%3A+Have+separate+queues+for+control+requests+and+data+requests
> >
> > Thanks for your time!
> > Lucas
> >
>


Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-10-09 Thread Lucas Wang
Thanks Jun, I've updated the KIP with the new names.

Hi Joel, Becket, Dong, Ismael,
Since you've reviewed this KIP in the past, can you please review it again?
Thanks a lot!

Lucas

On Mon, Oct 8, 2018 at 6:10 PM Jun Rao  wrote:

> Hi, Lucas,
>
> Yes, the new names sound good to me.
>
> Thanks,
>
> Jun
>
> On Fri, Oct 5, 2018 at 1:12 PM, Lucas Wang  wrote:
>
> > Thanks for the suggestion, Ismael. I like it.
> >
> > Jun,
> > I'm excited to get the +1, thanks a lot!
> > Meanwhile what do you feel about renaming the metrics and config to
> >
> > ControlPlaneRequestQueueSize
> >
> > ControlPlaneNetworkProcessorIdlePercent
> >
> > ControlPlaneRequestHandlerIdlePercent
> >
> > control.plane.listener.name
> >
> > ?
> >
> >
> > Thanks,
> >
> > Lucas
> >
> > On Thu, Oct 4, 2018 at 11:38 AM Ismael Juma  wrote:
> >
> > > Have we considered control plane if we think control by itself is
> > > ambiguous? I agree with the original concern that "controller" may be
> > > confusing for something that affects all brokers.
> > >
> > > Ismael
> > >
> > >
> > > On 4 Oct 2018 11:08 am, "Lucas Wang"  wrote:
> > >
> > > Thanks Jun. I've changed the KIP with the suggested 2 step upgrade.
> > > Please take a look again when you have time.
> > >
> > > Regards,
> > > Lucas
> > >
> > >
> > > On Thu, Oct 4, 2018 at 10:06 AM Jun Rao  wrote:
> > >
> > > > Hi, Lucas,
> > > >
> > > > 200. That's a valid concern. So, we can probably just keep the
> current
> > > > name.
> > > >
> > > > 201. I am thinking that you would upgrade in the same way as changing
> > > > inter.broker.listener.name. This requires 2 rounds of rolling
> restart.
> > > In
> > > > the first round, we add the controller endpoint to the listeners w/o
> > > > setting controller.listener.name. In the second round, every broker
> > sets
> > > > controller.listener.name. At that point, the controller listener is
> > > ready
> > > > in every broker.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Tue, Oct 2, 2018 at 10:38 AM, Lucas Wang 
> > > wrote:
> > > >
> > > > > Thanks for the further comments, Jun.
> > > > >
> > > > > 200. Currently in the code base, we have the term of "ControlBatch"
> > > > related
> > > > > to
> > > > > idempotent/transactional producing. Do you think it's a concern for
> > > > reusing
> > > > > the term "control"?
> > > > >
> > > > > 201. It's not clear to me how it would work by following the same
> > > > strategy
> > > > > for "controller.listener.name".
> > > > > Say the new controller has its "controller.listener.name" set to
> the
> > > > value
> > > > > "CONTROLLER", and broker 1
> > > > > has picked up this KIP by announcing
> > > > > "endpoints": [
> > > > > "CONTROLLER://broker1.example.com:9091",
> > > > > "INTERNAL://broker1.example.com:9092",
> > > > > "EXTERNAL://host1.example.com:9093"
> > > > > ],
> > > > >
> > > > > while broker2 has not picked up the change, and is announcing
> > > > > "endpoints": [
> > > > > "INTERNAL://broker2.example.com:9092",
> > > > > "EXTERNAL://host2.example.com:9093"
> > > > > ],
> > > > > to support both broker 1 for the new behavior and broker 2 for the
> > old
> > > > > behavior, it seems the controller must
> > > > > check their published endpoints. Am I missing something?
> > > > >
> > > > > Thanks!
> > > > > Lucas
> > > > >
> > > > > On Mon, Oct 1, 2018 at 6:29 PM Jun Rao  wrote:
> > > > >
> > > > > > Hi, Lucas,
> > > > > >
> > > > > > Sorry for the delay. The updated wiki looks good to me overall.
> > Just
> > > a
> > > > > > couple more minor comments.
> > > > > >
> > > > > > 

Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-10-05 Thread Lucas Wang
Thanks for the suggestion, Ismael. I like it.

Jun,
I'm excited to get the +1, thanks a lot!
Meanwhile what do you feel about renaming the metrics and config to

ControlPlaneRequestQueueSize

ControlPlaneNetworkProcessorIdlePercent

ControlPlaneRequestHandlerIdlePercent

control.plane.listener.name

?


Thanks,

Lucas

On Thu, Oct 4, 2018 at 11:38 AM Ismael Juma  wrote:

> Have we considered control plane if we think control by itself is
> ambiguous? I agree with the original concern that "controller" may be
> confusing for something that affects all brokers.
>
> Ismael
>
>
> On 4 Oct 2018 11:08 am, "Lucas Wang"  wrote:
>
> Thanks Jun. I've changed the KIP with the suggested 2 step upgrade.
> Please take a look again when you have time.
>
> Regards,
> Lucas
>
>
> On Thu, Oct 4, 2018 at 10:06 AM Jun Rao  wrote:
>
> > Hi, Lucas,
> >
> > 200. That's a valid concern. So, we can probably just keep the current
> > name.
> >
> > 201. I am thinking that you would upgrade in the same way as changing
> > inter.broker.listener.name. This requires 2 rounds of rolling restart.
> In
> > the first round, we add the controller endpoint to the listeners w/o
> > setting controller.listener.name. In the second round, every broker sets
> > controller.listener.name. At that point, the controller listener is
> ready
> > in every broker.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Oct 2, 2018 at 10:38 AM, Lucas Wang 
> wrote:
> >
> > > Thanks for the further comments, Jun.
> > >
> > > 200. Currently in the code base, we have the term of "ControlBatch"
> > related
> > > to
> > > idempotent/transactional producing. Do you think it's a concern for
> > reusing
> > > the term "control"?
> > >
> > > 201. It's not clear to me how it would work by following the same
> > strategy
> > > for "controller.listener.name".
> > > Say the new controller has its "controller.listener.name" set to the
> > value
> > > "CONTROLLER", and broker 1
> > > has picked up this KIP by announcing
> > > "endpoints": [
> > > "CONTROLLER://broker1.example.com:9091",
> > > "INTERNAL://broker1.example.com:9092",
> > > "EXTERNAL://host1.example.com:9093"
> > > ],
> > >
> > > while broker2 has not picked up the change, and is announcing
> > > "endpoints": [
> > > "INTERNAL://broker2.example.com:9092",
> > > "EXTERNAL://host2.example.com:9093"
> > > ],
> > > to support both broker 1 for the new behavior and broker 2 for the old
> > > behavior, it seems the controller must
> > > check their published endpoints. Am I missing something?
> > >
> > > Thanks!
> > > Lucas
> > >
> > > On Mon, Oct 1, 2018 at 6:29 PM Jun Rao  wrote:
> > >
> > > > Hi, Lucas,
> > > >
> > > > Sorry for the delay. The updated wiki looks good to me overall. Just
> a
> > > > couple more minor comments.
> > > >
> > > > 200.
> kafka.network:name=ControllerRequestQueueSize,type=RequestChannel:
> > > The
> > > > name ControllerRequestQueueSize gives the impression that it's only
> for
> > > the
> > > > controller broker. Perhaps we can just rename all metrics and configs
> > > from
> > > > controller to control. This indicates that the threads and the queues
> > are
> > > > for the control requests (as oppose to data requests).
> > > >
> > > > 201. ": In this scenario, the controller
> > will
> > > > have the "controller.listener.name" config set to a value like
> > > > "CONTROLLER", however the broker's exposed endpoints do not have an
> > entry
> > > > corresponding to the new listener name. Hence the controller should
> > > > preserve the existing behavior by determining the endpoint using
> > > > *inter-broker-listener-name *value. The end result should be the same
> > > > behavior as today." Currently, the controller makes connections based
> > on
> > > > its local inter.broker.listener.name config without checking the
> > target
> > > > broker's ZK registration. For consistency, perhaps we can just follow
> > the
> > > > same strategy for controller.listener.name. 

Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-10-04 Thread Lucas Wang
Thanks Jun. I've changed the KIP with the suggested 2 step upgrade.
Please take a look again when you have time.

Regards,
Lucas

On Thu, Oct 4, 2018 at 10:06 AM Jun Rao  wrote:

> Hi, Lucas,
>
> 200. That's a valid concern. So, we can probably just keep the current
> name.
>
> 201. I am thinking that you would upgrade in the same way as changing
> inter.broker.listener.name. This requires 2 rounds of rolling restart. In
> the first round, we add the controller endpoint to the listeners w/o
> setting controller.listener.name. In the second round, every broker sets
> controller.listener.name. At that point, the controller listener is ready
> in every broker.
>
> Thanks,
>
> Jun
>
> On Tue, Oct 2, 2018 at 10:38 AM, Lucas Wang  wrote:
>
> > Thanks for the further comments, Jun.
> >
> > 200. Currently in the code base, we have the term of "ControlBatch"
> related
> > to
> > idempotent/transactional producing. Do you think it's a concern for
> reusing
> > the term "control"?
> >
> > 201. It's not clear to me how it would work by following the same
> strategy
> > for "controller.listener.name".
> > Say the new controller has its "controller.listener.name" set to the
> value
> > "CONTROLLER", and broker 1
> > has picked up this KIP by announcing
> > "endpoints": [
> > "CONTROLLER://broker1.example.com:9091",
> > "INTERNAL://broker1.example.com:9092",
> > "EXTERNAL://host1.example.com:9093"
> > ],
> >
> > while broker2 has not picked up the change, and is announcing
> > "endpoints": [
> > "INTERNAL://broker2.example.com:9092",
> > "EXTERNAL://host2.example.com:9093"
> > ],
> > to support both broker 1 for the new behavior and broker 2 for the old
> > behavior, it seems the controller must
> > check their published endpoints. Am I missing something?
> >
> > Thanks!
> > Lucas
> >
> > On Mon, Oct 1, 2018 at 6:29 PM Jun Rao  wrote:
> >
> > > Hi, Lucas,
> > >
> > > Sorry for the delay. The updated wiki looks good to me overall. Just a
> > > couple more minor comments.
> > >
> > > 200. kafka.network:name=ControllerRequestQueueSize,type=RequestChannel:
> > The
> > > name ControllerRequestQueueSize gives the impression that it's only for
> > the
> > > controller broker. Perhaps we can just rename all metrics and configs
> > from
> > > controller to control. This indicates that the threads and the queues
> are
> > > for the control requests (as oppose to data requests).
> > >
> > > 201. ": In this scenario, the controller
> will
> > > have the "controller.listener.name" config set to a value like
> > > "CONTROLLER", however the broker's exposed endpoints do not have an
> entry
> > > corresponding to the new listener name. Hence the controller should
> > > preserve the existing behavior by determining the endpoint using
> > > *inter-broker-listener-name *value. The end result should be the same
> > > behavior as today." Currently, the controller makes connections based
> on
> > > its local inter.broker.listener.name config without checking the
> target
> > > broker's ZK registration. For consistency, perhaps we can just follow
> the
> > > same strategy for controller.listener.name. This existing behavior
> seems
> > > simpler to understand and has the benefit of catching inconsistent
> > configs
> > > across brokers.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Oct 1, 2018 at 8:43 AM, Lucas Wang 
> > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Sorry to bother you again. Can you please take a look at the wiki
> again
> > > > when you have time?
> > > >
> > > > Thanks a lot!
> > > > Lucas
> > > >
> > > > On Wed, Sep 19, 2018 at 3:57 PM Lucas Wang 
> > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > Thanks a lot for the detailed explanation.
> > > > > I've restored the wiki to a previous version that does not require
> > > config
> > > > > changes,
> > > > > and keeps the current behavior with the proposed changes turned off
> > by
> > > > > default.
> > > > > I'd appreciate it if 

Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-10-02 Thread Lucas Wang
Thanks for the further comments, Jun.

200. Currently in the code base, we have the term of "ControlBatch" related
to
idempotent/transactional producing. Do you think it's a concern for reusing
the term "control"?

201. It's not clear to me how it would work by following the same strategy
for "controller.listener.name".
Say the new controller has its "controller.listener.name" set to the value
"CONTROLLER", and broker 1
has picked up this KIP by announcing
"endpoints": [
"CONTROLLER://broker1.example.com:9091",
"INTERNAL://broker1.example.com:9092",
"EXTERNAL://host1.example.com:9093"
],

while broker2 has not picked up the change, and is announcing
"endpoints": [
"INTERNAL://broker2.example.com:9092",
"EXTERNAL://host2.example.com:9093"
],
to support both broker 1 for the new behavior and broker 2 for the old
behavior, it seems the controller must
check their published endpoints. Am I missing something?

Thanks!
Lucas

On Mon, Oct 1, 2018 at 6:29 PM Jun Rao  wrote:

> Hi, Lucas,
>
> Sorry for the delay. The updated wiki looks good to me overall. Just a
> couple more minor comments.
>
> 200. kafka.network:name=ControllerRequestQueueSize,type=RequestChannel: The
> name ControllerRequestQueueSize gives the impression that it's only for the
> controller broker. Perhaps we can just rename all metrics and configs from
> controller to control. This indicates that the threads and the queues are
> for the control requests (as oppose to data requests).
>
> 201. ": In this scenario, the controller will
> have the "controller.listener.name" config set to a value like
> "CONTROLLER", however the broker's exposed endpoints do not have an entry
> corresponding to the new listener name. Hence the controller should
> preserve the existing behavior by determining the endpoint using
> *inter-broker-listener-name *value. The end result should be the same
> behavior as today." Currently, the controller makes connections based on
> its local inter.broker.listener.name config without checking the target
> broker's ZK registration. For consistency, perhaps we can just follow the
> same strategy for controller.listener.name. This existing behavior seems
> simpler to understand and has the benefit of catching inconsistent configs
> across brokers.
>
> Thanks,
>
> Jun
>
> On Mon, Oct 1, 2018 at 8:43 AM, Lucas Wang  wrote:
>
> > Hi Jun,
> >
> > Sorry to bother you again. Can you please take a look at the wiki again
> > when you have time?
> >
> > Thanks a lot!
> > Lucas
> >
> > On Wed, Sep 19, 2018 at 3:57 PM Lucas Wang 
> wrote:
> >
> > > Hi Jun,
> > >
> > > Thanks a lot for the detailed explanation.
> > > I've restored the wiki to a previous version that does not require
> config
> > > changes,
> > > and keeps the current behavior with the proposed changes turned off by
> > > default.
> > > I'd appreciate it if you can review it again.
> > >
> > > Thanks!
> > > Lucas
> > >
> > > On Tue, Sep 18, 2018 at 1:48 PM Jun Rao  wrote:
> > >
> > >> Hi, Lucas,
> > >>
> > >> When upgrading to a minor release, I think the expectation is that a
> > user
> > >> wouldn't need to make any config changes, other than the usual
> > >> inter.broker.protocol. If we require other config changes during an
> > >> upgrade, then it's probably better to do that in a major release.
> > >>
> > >> Regarding your proposal, I think removing host/advertised_host in
> favor
> > of
> > >> listeners:advertised_listeners seems useful regardless of this KIP.
> > >> However, that can probably wait until a major release.
> > >>
> > >> As for the controller listener, I am not sure if one has to set it. To
> > >> make
> > >> a cluster healthy, one sort of have to make sure that the request
> queue
> > is
> > >> never full and no request will be sitting in the request queue for
> long.
> > >> If
> > >> one does that, setting the controller listener may not be necessary.
> On
> > >> the
> > >> flip side, even if one sets the controller listener, but the request
> > queue
> > >> and the request time for the data part are still high, the cluster may
> > >> still not be healthy. Given that we have already started the 2.1
> release
> > >> planning, perhaps we can start 

Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-10-01 Thread Lucas Wang
Hi Jun,

Sorry to bother you again. Can you please take a look at the wiki again
when you have time?

Thanks a lot!
Lucas

On Wed, Sep 19, 2018 at 3:57 PM Lucas Wang  wrote:

> Hi Jun,
>
> Thanks a lot for the detailed explanation.
> I've restored the wiki to a previous version that does not require config
> changes,
> and keeps the current behavior with the proposed changes turned off by
> default.
> I'd appreciate it if you can review it again.
>
> Thanks!
> Lucas
>
> On Tue, Sep 18, 2018 at 1:48 PM Jun Rao  wrote:
>
>> Hi, Lucas,
>>
>> When upgrading to a minor release, I think the expectation is that a user
>> wouldn't need to make any config changes, other than the usual
>> inter.broker.protocol. If we require other config changes during an
>> upgrade, then it's probably better to do that in a major release.
>>
>> Regarding your proposal, I think removing host/advertised_host in favor of
>> listeners:advertised_listeners seems useful regardless of this KIP.
>> However, that can probably wait until a major release.
>>
>> As for the controller listener, I am not sure if one has to set it. To
>> make
>> a cluster healthy, one sort of have to make sure that the request queue is
>> never full and no request will be sitting in the request queue for long.
>> If
>> one does that, setting the controller listener may not be necessary. On
>> the
>> flip side, even if one sets the controller listener, but the request queue
>> and the request time for the data part are still high, the cluster may
>> still not be healthy. Given that we have already started the 2.1 release
>> planning, perhaps we can start with not requiring the controller listener.
>> If this is indeed something that everyone wants to set, we can make it a
>> required config in a major release.
>>
>> Thanks,
>>
>> Jun
>>
>> On Tue, Sep 11, 2018 at 3:46 PM, Lucas Wang 
>> wrote:
>>
>> > @Jun Rao 
>> >
>> > I made the recent config changes after thinking about the default
>> behavior
>> > for adopting this KIP.
>> > I think there are basically two options:
>> > 1. By default, the behavior proposed in this KIP is turned off, and
>> > operators can turn it
>> > on by adding the "controller.listener.name" config and entries in the
>> > "listeners" and "advertised.listeners" list.
>> > If no "controller.listener.name" is added, it'll be the *same as* the "
>> > inter.broker.listener.name",
>> > and the proposed feature is effectively turned off.
>> > This has been the assumption in the KIP writeup before.
>> >
>> > 2. By default, the behavior proposed in this KIP is turned on, and
>> > operators are forced to
>> > recognize the proposed change if their "listeners" config is set (this
>> is
>> > most likely in production environments),
>> > by allocating a new port for controller connections, and adding a new
>> > endpoint to the "listeners" config.
>> > For cases where "listeners" is not set explicitly,
>> > there needs to be a default value for it that includes the controller
>> > listener name,
>> > e.g. "PLAINTEXT://:9092,CONTROLLER://:9091"
>> >
>> > I chose to go with option 2 since as author of this KIP,
>> > I naturally think in the long run, it's worth the effort to adopt this
>> > feature,
>> > in order to prevent issues under circumstances listed in the motivation
>> > section.
>> >
>> > 100, following the argument above, I want to enforce the separation
>> > between controller
>> > and data plane requests. Hence the "controller.listener.name" should
>> > never be the same
>> > as the "inter.broker.listener.name", which is intended for data plane
>> > requests.
>> >
>> > 101, the default value for "listeners" will be
>> > "PLAINTEXT://:9092,CONTROLLER://:9091",
>> > making values of "host", and "port" not being used under any config
>> > settings.
>> > And the default value for "advertised.listeners" will be derived from
>> > "listeners",
>> > making the values of "advertised.host", and "advertised.port" not being
>> > used any more.
>> >
>> > 102, for upgrading, a single broker that has "listeners" and/or
>> > "advertised.listeners" set,
>> > must add a

Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-09-19 Thread Lucas Wang
Hi Jun,

Thanks a lot for the detailed explanation.
I've restored the wiki to a previous version that does not require config
changes,
and keeps the current behavior with the proposed changes turned off by
default.
I'd appreciate it if you can review it again.

Thanks!
Lucas

On Tue, Sep 18, 2018 at 1:48 PM Jun Rao  wrote:

> Hi, Lucas,
>
> When upgrading to a minor release, I think the expectation is that a user
> wouldn't need to make any config changes, other than the usual
> inter.broker.protocol. If we require other config changes during an
> upgrade, then it's probably better to do that in a major release.
>
> Regarding your proposal, I think removing host/advertised_host in favor of
> listeners:advertised_listeners seems useful regardless of this KIP.
> However, that can probably wait until a major release.
>
> As for the controller listener, I am not sure if one has to set it. To make
> a cluster healthy, one sort of have to make sure that the request queue is
> never full and no request will be sitting in the request queue for long. If
> one does that, setting the controller listener may not be necessary. On the
> flip side, even if one sets the controller listener, but the request queue
> and the request time for the data part are still high, the cluster may
> still not be healthy. Given that we have already started the 2.1 release
> planning, perhaps we can start with not requiring the controller listener.
> If this is indeed something that everyone wants to set, we can make it a
> required config in a major release.
>
> Thanks,
>
> Jun
>
> On Tue, Sep 11, 2018 at 3:46 PM, Lucas Wang  wrote:
>
> > @Jun Rao 
> >
> > I made the recent config changes after thinking about the default
> behavior
> > for adopting this KIP.
> > I think there are basically two options:
> > 1. By default, the behavior proposed in this KIP is turned off, and
> > operators can turn it
> > on by adding the "controller.listener.name" config and entries in the
> > "listeners" and "advertised.listeners" list.
> > If no "controller.listener.name" is added, it'll be the *same as* the "
> > inter.broker.listener.name",
> > and the proposed feature is effectively turned off.
> > This has been the assumption in the KIP writeup before.
> >
> > 2. By default, the behavior proposed in this KIP is turned on, and
> > operators are forced to
> > recognize the proposed change if their "listeners" config is set (this is
> > most likely in production environments),
> > by allocating a new port for controller connections, and adding a new
> > endpoint to the "listeners" config.
> > For cases where "listeners" is not set explicitly,
> > there needs to be a default value for it that includes the controller
> > listener name,
> > e.g. "PLAINTEXT://:9092,CONTROLLER://:9091"
> >
> > I chose to go with option 2 since as author of this KIP,
> > I naturally think in the long run, it's worth the effort to adopt this
> > feature,
> > in order to prevent issues under circumstances listed in the motivation
> > section.
> >
> > 100, following the argument above, I want to enforce the separation
> > between controller
> > and data plane requests. Hence the "controller.listener.name" should
> > never be the same
> > as the "inter.broker.listener.name", which is intended for data plane
> > requests.
> >
> > 101, the default value for "listeners" will be
> > "PLAINTEXT://:9092,CONTROLLER://:9091",
> > making values of "host", and "port" not being used under any config
> > settings.
> > And the default value for "advertised.listeners" will be derived from
> > "listeners",
> > making the values of "advertised.host", and "advertised.port" not being
> > used any more.
> >
> > 102, for upgrading, a single broker that has "listeners" and/or
> > "advertised.listeners" set,
> > must add a new endpoint for the CONTROLLER listener name, or end up using
> > the default listeners "PLAINTEXT://:9092,CONTROLLER://:9091".
> > During rolling upgrade, in cases of  +  or
> >   + 
> > we still need to fall back to the current behavior. However after the
> > rolling upgrade is done,
> > it is guaranteed that the controller plane and data plane are separated,
> > given
> > the "controller.listener.name" must be different from "
> > inter.broker.listener.name".
> >
> > @Ismael Juma 
> > Tha

Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-09-11 Thread Lucas Wang
@Jun Rao 

I made the recent config changes after thinking about the default behavior
for adopting this KIP.
I think there are basically two options:
1. By default, the behavior proposed in this KIP is turned off, and
operators can turn it
on by adding the "controller.listener.name" config and entries in the
"listeners" and "advertised.listeners" list.
If no "controller.listener.name" is added, it'll be the *same as* the "
inter.broker.listener.name",
and the proposed feature is effectively turned off.
This has been the assumption in the KIP writeup before.

2. By default, the behavior proposed in this KIP is turned on, and
operators are forced to
recognize the proposed change if their "listeners" config is set (this is
most likely in production environments),
by allocating a new port for controller connections, and adding a new
endpoint to the "listeners" config.
For cases where "listeners" is not set explicitly,
there needs to be a default value for it that includes the controller
listener name,
e.g. "PLAINTEXT://:9092,CONTROLLER://:9091"

I chose to go with option 2 since as author of this KIP,
I naturally think in the long run, it's worth the effort to adopt this
feature,
in order to prevent issues under circumstances listed in the motivation
section.

100, following the argument above, I want to enforce the separation between
controller
and data plane requests. Hence the "controller.listener.name" should never
be the same
as the "inter.broker.listener.name", which is intended for data plane
requests.

101, the default value for "listeners" will be
"PLAINTEXT://:9092,CONTROLLER://:9091",
making values of "host", and "port" not being used under any config
settings.
And the default value for "advertised.listeners" will be derived from
"listeners",
making the values of "advertised.host", and "advertised.port" not being
used any more.

102, for upgrading, a single broker that has "listeners" and/or
"advertised.listeners" set,
must add a new endpoint for the CONTROLLER listener name, or end up using
the default listeners "PLAINTEXT://:9092,CONTROLLER://:9091".
During rolling upgrade, in cases of  +  or   + 
we still need to fall back to the current behavior. However after the
rolling upgrade is done,
it is guaranteed that the controller plane and data plane are separated,
given
the "controller.listener.name" must be different from "
inter.broker.listener.name".

@Ismael Juma 
Thanks for pointing that out. I did not know that.
However my question is if the argument above makes sense, and my code change
causes the configs "host", "port", "advertised.host", "advertised.port" to
be not used under any circumstance,
then it's no different from removing them.
Anyway if there is still a concern about removing them, is there a new
major new version
now or in the future where I can remove them?

Thanks!
Lucas

On Mon, Sep 10, 2018 at 1:30 PM Ismael Juma  wrote:

> To be clear, we can only remove configs in major new versions. Otherwise,
> we can only deprecate them.
>
> Ismael
>
> On Mon, Sep 10, 2018 at 10:47 AM Jun Rao  wrote:
>
> > Hi, Lucas,
> >
> > For the network idlePct, your understanding is correct. Currently,
> > networkIdlePct metric is calculated as the average of (1 - io-ratio) in
> the
> > selector of all network threads.
> >
> > The metrics part looks good to me in the updated KIP.
> >
> > I am not still not quite sure about the configs.
> >
> > 100. "Whenever the "controller.listener.name" is set, upon broker
> startup,
> > we will validate its value and make sure it's different from the
> > *inter-broker-listener-name *value." Does that mean that
> > controller.listener.name has to be different from
> > inter.broker.listener.name?
> > That seems limiting.
> >
> > 101. The KIP says that advertised.listeners and listeners will now have a
> > different default value including controller. Could you document what the
> > default value looks like?
> >
> > 102. About removing the the following configs. How does that affect the
> > upgrade path? Do we now expect a user to add a new config when upgrading
> > from an old version to a new one?
> > host
> > port
> > advertised.host
> > advertised.port
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, Sep 6, 2018 at 5:14 PM, Lucas Wang 
> wrote:
> >
> > > @Jun Rao 
> > >
> > > One clarification, currently on the selector level, we have the
> > > io-wait-ratio metric.
> > > For the n

Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-09-06 Thread Lucas Wang
@Jun Rao 

One clarification, currently on the selector level, we have the
io-wait-ratio metric.
For the new controller *network* thread, we can use it directly for
IdlePct, instead of using 1- io-ratio,
so that the logic is similar to the current average IdlePct for network
threads. Is that correct?

I've revised the KIP by adding two new metrics for measuring the IdlePct
for the two additional threads.
Please take a look again. Thanks!

Lucas





On Wed, Sep 5, 2018 at 5:01 PM Jun Rao  wrote:

> Hi, Lucas,
>
> Thanks for the updated KIP.
>
> For monitoring the network thread utilization for the control plane, we
> already have the metric io-ratio at the selector level (idlePct is 1 -
> io-ratio). So, we just need to give that selector a meaningful name.
>
> For monitoring the io thread utilization for the control plane, it's
> probably useful to have a separate metric for that. The controller request
> queue size may not reflect the history in a window.
>
> Jun
>
> On Wed, Sep 5, 2018 at 3:38 PM, Lucas Wang  wrote:
>
> > Thanks Jun for your quick response. It looks like I forgot to click the
> > "Update" button, :)
> > It's updated now.
> >
> > Regarding the idle ratio metrics for the additional threads, I discussed
> > with Joel,
> > and think they are not as useful, and I added our reasoning in the last
> > paragraph of the
> > "How are controller requests handled over the dedicated connections?"
> > section.
> > On the other hand, we don't strongly oppose adding them if you think they
> > are necessary.
> >
> > Thanks,
> > Lucas
> >
> >
> > On Wed, Sep 5, 2018 at 3:12 PM Jun Rao  wrote:
> >
> > > Hi, Lucas,
> > >
> > > Thanks for the reply. Have you actually updated the KIP? The wiki says
> > that
> > > it's last updated on Aug. 22. and some of the changes that you
> mentioned
> > > (#1 and #3) are not there.
> > >
> > > Also, regarding Joel's comment on network/request idle ratio metrics,
> > could
> > > you comment on whether they include the new controller listener? If
> not,
> > do
> > > we need additional metrics to measure the utilization of the io thread
> > for
> > > the control plane?
> > >
> > > Jun
> > >
> > > On Mon, Aug 27, 2018 at 6:26 PM, Lucas Wang 
> > wrote:
> > >
> > > > Thanks for the comments, Jun.
> > > >
> > > > 1. I think the answer should be no, since the "
> > > inter.broker.listener.name"
> > > > are also used
> > > > for replication traffic, and merging these two types of request to
> the
> > > > single threaded tunnel
> > > > would defeat the purpose of this KIP and also hurt replication
> > > throughput.
> > > > So I think that means
> > > > we should validate to make sure when the new config is set, it's
> > > different
> > > > from "inter.broker.listener.name"
> > > > or "security.inter.broker.protocol", whichever is set.
> > > >
> > > > 2. Normally all broker configs in a given cluster are changed at the
> > same
> > > > time. If there is a typo in the
> > > > controller.listener.name and it's not available in the endpoints
> list,
> > > we
> > > > could catch it, give an error
> > > > and block restart of the first broker in the cluster. With that, we
> > could
> > > > keep the current behavior
> > > > in the KIP write up that falls back to "inter.broker.listener.nam"
> when
> > > the
> > > > "controller.listener.name"
> > > > is not found during the migration phase of this KIP. Thoughts?
> > > >
> > > > 3. That makes sense, and I've changed it.
> > > >
> > > > Thanks,
> > > > Lucas
> > > >
> > > > On Thu, Aug 23, 2018 at 3:46 PM Jun Rao  wrote:
> > > >
> > > > > Hi, Lucas,
> > > > >
> > > > > Sorry for the delay. The new proposal looks good to me overall. A
> few
> > > > minor
> > > > > comments below.
> > > > >
> > > > > 1. It's possible that listener.name.for.controller is set, but set
> to
> > > the
> > > > > same value as inter.broker.listener.name. In that case, should we
> > > have a
> > > > > single network thread and the request handling thread for that
> > > listener?
> > > > >

Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-09-05 Thread Lucas Wang
Thanks Jun for your quick response. It looks like I forgot to click the
"Update" button, :)
It's updated now.

Regarding the idle ratio metrics for the additional threads, I discussed
with Joel,
and think they are not as useful, and I added our reasoning in the last
paragraph of the
"How are controller requests handled over the dedicated connections?"
section.
On the other hand, we don't strongly oppose adding them if you think they
are necessary.

Thanks,
Lucas


On Wed, Sep 5, 2018 at 3:12 PM Jun Rao  wrote:

> Hi, Lucas,
>
> Thanks for the reply. Have you actually updated the KIP? The wiki says that
> it's last updated on Aug. 22. and some of the changes that you mentioned
> (#1 and #3) are not there.
>
> Also, regarding Joel's comment on network/request idle ratio metrics, could
> you comment on whether they include the new controller listener? If not, do
> we need additional metrics to measure the utilization of the io thread for
> the control plane?
>
> Jun
>
> On Mon, Aug 27, 2018 at 6:26 PM, Lucas Wang  wrote:
>
> > Thanks for the comments, Jun.
> >
> > 1. I think the answer should be no, since the "
> inter.broker.listener.name"
> > are also used
> > for replication traffic, and merging these two types of request to the
> > single threaded tunnel
> > would defeat the purpose of this KIP and also hurt replication
> throughput.
> > So I think that means
> > we should validate to make sure when the new config is set, it's
> different
> > from "inter.broker.listener.name"
> > or "security.inter.broker.protocol", whichever is set.
> >
> > 2. Normally all broker configs in a given cluster are changed at the same
> > time. If there is a typo in the
> > controller.listener.name and it's not available in the endpoints list,
> we
> > could catch it, give an error
> > and block restart of the first broker in the cluster. With that, we could
> > keep the current behavior
> > in the KIP write up that falls back to "inter.broker.listener.nam" when
> the
> > "controller.listener.name"
> > is not found during the migration phase of this KIP. Thoughts?
> >
> > 3. That makes sense, and I've changed it.
> >
> > Thanks,
> > Lucas
> >
> > On Thu, Aug 23, 2018 at 3:46 PM Jun Rao  wrote:
> >
> > > Hi, Lucas,
> > >
> > > Sorry for the delay. The new proposal looks good to me overall. A few
> > minor
> > > comments below.
> > >
> > > 1. It's possible that listener.name.for.controller is set, but set to
> the
> > > same value as inter.broker.listener.name. In that case, should we
> have a
> > > single network thread and the request handling thread for that
> listener?
> > >
> > > 2. Currently, the controller always picks the listener specified by
> > > inter.broker.listener.name even if the listener name is not present in
> > the
> > > receiving broker. This KIP proposes a slightly different approach for
> > > picking listener.name.for.controller only when the receiving end has
> the
> > > listener and switches listener.name.for.controller otherwise. There are
> > > some tradeoffs between the two approaches. To change the inter broker
> > > listener, the former requires 2 steps: (1) adding the new listener to
> > > listener list in every broker and (2) changing
> > > listener.name.for.controller.
> > > The latter can do both changes in 1 step. On the hand, if
> > > listener.name.for.controller
> > > is mis-configured, the former will report an error and the latter will
> > hide
> > > it (so the user may not know the misconfiguration). It seems that we
> > should
> > > pick one approach to handle both listener.name.for.controller and
> > > inter.broker.listener.name consistently. To me, the former seems
> > slightly
> > > better.
> > >
> > > 3. To be consistent with the existing naming, should
> > > listener.name.for.controller
> > > be controller.listener.name?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Thu, Aug 9, 2018 at 3:21 PM, Lucas Wang 
> > wrote:
> > >
> > > > Hi Jun and Joel,
> > > >
> > > > The KIP writeup has changed by quite a bit since your +1.
> > > > Can you please take another review? Thanks a lot for your time!
> > > >
> > > > Lucas
> > > >
> > > > On Tue, Jul 17, 2018 at 10:33 AM, Joel Koshy 
> > > wrote:
> > > >
> > > > > +1 on the KIP.
> > > > >
> > > > > (I'm not sure we actually necessary to introduce the condition
> > > variables
> > > > > for the concern that Jun raised, but it's an implementation detail
> > that
> > > > we
> > > > > can defer to a discussion in the PR.)
> > > > >
> > > > > On Sat, Jul 14, 2018 at 10:45 PM, Lucas Wang <
> lucasatu...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > I agree by using the conditional variables, there is no need to
> add
> > > > such
> > > > > a
> > > > > > new config.
> > > > > > Also thanks for approving this KIP.
> > > > > >
> > > > > > Lucas
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-08-27 Thread Lucas Wang
Thanks for the comments, Jun.

1. I think the answer should be no, since the "inter.broker.listener.name"
are also used
for replication traffic, and merging these two types of request to the
single threaded tunnel
would defeat the purpose of this KIP and also hurt replication throughput.
So I think that means
we should validate to make sure when the new config is set, it's different
from "inter.broker.listener.name"
or "security.inter.broker.protocol", whichever is set.

2. Normally all broker configs in a given cluster are changed at the same
time. If there is a typo in the
controller.listener.name and it's not available in the endpoints list, we
could catch it, give an error
and block restart of the first broker in the cluster. With that, we could
keep the current behavior
in the KIP write up that falls back to "inter.broker.listener.nam" when the
"controller.listener.name"
is not found during the migration phase of this KIP. Thoughts?

3. That makes sense, and I've changed it.

Thanks,
Lucas

On Thu, Aug 23, 2018 at 3:46 PM Jun Rao  wrote:

> Hi, Lucas,
>
> Sorry for the delay. The new proposal looks good to me overall. A few minor
> comments below.
>
> 1. It's possible that listener.name.for.controller is set, but set to the
> same value as inter.broker.listener.name. In that case, should we have a
> single network thread and the request handling thread for that listener?
>
> 2. Currently, the controller always picks the listener specified by
> inter.broker.listener.name even if the listener name is not present in the
> receiving broker. This KIP proposes a slightly different approach for
> picking listener.name.for.controller only when the receiving end has the
> listener and switches listener.name.for.controller otherwise. There are
> some tradeoffs between the two approaches. To change the inter broker
> listener, the former requires 2 steps: (1) adding the new listener to
> listener list in every broker and (2) changing
> listener.name.for.controller.
> The latter can do both changes in 1 step. On the hand, if
> listener.name.for.controller
> is mis-configured, the former will report an error and the latter will hide
> it (so the user may not know the misconfiguration). It seems that we should
> pick one approach to handle both listener.name.for.controller and
> inter.broker.listener.name consistently. To me, the former seems slightly
> better.
>
> 3. To be consistent with the existing naming, should
> listener.name.for.controller
> be controller.listener.name?
>
> Thanks,
>
> Jun
>
>
> On Thu, Aug 9, 2018 at 3:21 PM, Lucas Wang  wrote:
>
> > Hi Jun and Joel,
> >
> > The KIP writeup has changed by quite a bit since your +1.
> > Can you please take another review? Thanks a lot for your time!
> >
> > Lucas
> >
> > On Tue, Jul 17, 2018 at 10:33 AM, Joel Koshy 
> wrote:
> >
> > > +1 on the KIP.
> > >
> > > (I'm not sure we actually necessary to introduce the condition
> variables
> > > for the concern that Jun raised, but it's an implementation detail that
> > we
> > > can defer to a discussion in the PR.)
> > >
> > > On Sat, Jul 14, 2018 at 10:45 PM, Lucas Wang 
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > I agree by using the conditional variables, there is no need to add
> > such
> > > a
> > > > new config.
> > > > Also thanks for approving this KIP.
> > > >
> > > > Lucas
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-08-27 Thread Lucas Wang
Thanks for the comments, Joel.

I addressed all but the last one, where Jun also shared a comment in the
Vote thread to
change it to "controller.listener.name". I actually feel CONTROLLER is
better since it's a well defined
concept in Kafka, while it's easier to confuse people with CONTROL since
in the code we refer to some request used for transactional producing as a
CONTROL batch.

Thanks,
Lucas


[jira] [Created] (KAFKA-7350) Improve or remove the PreferredReplicaImbalanceCount metric

2018-08-27 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-7350:
-

 Summary: Improve or remove the PreferredReplicaImbalanceCount 
metric
 Key: KAFKA-7350
 URL: https://issues.apache.org/jira/browse/KAFKA-7350
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang
Assignee: Lucas Wang


In KAFKA-6753, we identified that in the ControllerEventManager, updating two 
metrics after processing every controller event ends up consuming too much CPU. 
The first metric OfflinePartitionCount is resolved in KAFKA-6753, and this 
ticket is for tracking progress on the 2nd metric 
PreferredReplicaImbalanceCount. 

The options we have about this metric include
1. Remove this metric given that if necessary, the value of this metric can be 
derived by getting the metadata of all topics in the cluster
2. Piggyback the update of the metric every time the auto leader balancer runs. 
The benefit is keeping this metric. However the downside is that this metric 
may then get obsolete and incorrect depending on when it's checked.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6753) Speed up event processing on the controller

2018-08-27 Thread Lucas Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang resolved KAFKA-6753.
---
Resolution: Fixed

> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>    Reporter: Lucas Wang
>    Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can 
> be quite large, all seen by the controller. Even if the time spent to check a 
> single partition is small, the accumulation effect of so many partitions in 
> the cluster can make the invocation to update metrics quite expensive. One 
> might argue that maybe the logic for processing each single partition is not 
> optimized, we checked the CPU percentage of leaf nodes in the profiling 
> result, and found that inside the loops of collection objects, e.g. the set 
> of all partitions, no single function dominates the processing. Hence the 
> large number of the partitions in a cluster is the main contributor to the 
> slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high 
> number of events to be processed by the controller, one invocation after 
> processing any event.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-08-21 Thread Lucas Wang
Hi Eno,

I fully agree with Becket here. If the motivation section makes sense, and
we know we can get burnt by this problem,
then the exact numbers (which vary case by case according to the config
settings and traffic pattern)
are no longer as important.

Thanks,
Lucas


On Tue, Aug 21, 2018 at 9:39 AM Becket Qin  wrote:

> Hi Eno,
>
> Thanks for the comments. This KIP is not really about improving the
> performance in general. It is about ensuring the cluster state can still be
> updated quickly even if the brokers are under heavy load.
>
> We have seen quite often that it took dozens of seconds for a broker to
> process the requests sent by the controller when the cluster is under heavy
> load. This leads to the issues Lucas mentioned in the motivation part.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> > On Aug 20, 2018, at 11:33 PM, Eno Thereska 
> wrote:
> >
> > Hi folks,
> >
> > I looked at the previous numbers that Lucas provided (thanks!) but it's
> > still not clear to me whether the performance benefits justify the added
> > complexity. I'm looking for some intuition here (a graph would be great
> but
> > not required): for a small/medium/large cluster, what are the expected
> > percentage of control requests today that will benefit from the change?
> > It's a bit hard to go through this level of detail without knowing the
> > expected end-to-end benefit. The best folks to answer this might be ones
> > running such clusters, and ideally should pitch in with some data.
> >
> > Thanks
> > Eno
> >
> > On Mon, Aug 20, 2018 at 7:29 AM, Becket Qin 
> wrote:
> >
> >> Hi Lucas,
> >>
> >> In KIP-103, we introduced a convention to define and look up the
> listeners.
> >> So it would be good if the later KIPs can follow the same convention.
> >>
> >> From what I understand, the advertised.listeners is actually designed
> for
> >> our purpose, i.e. providing a list of listeners that can be used in
> >> different cases. In KIP-103 it was used to separate internal traffic
> from
> >> the external traffic. It is not just for the user traffic or data
> >> only. So adding
> >> a controller listener is not repurposing the config. Also, ZK structure
> is
> >> only visible to brokers, the clients will still only see the listeners
> they
> >> are seeing today.
> >>
> >> For this KIP, we are essentially trying to separate the controller
> traffic
> >> from the inter-broker data traffic. So adding a new
> >> listener.name.for.controller config seems reasonable. The behavior would
> >> be:
> >> 1. If the listener.name.for.controller is set, the broker-controller
> >> communication will go through that listener.
> >> 2. Otherwise, the controller traffic falls back to use
> >> inter.broker.listener.name or inter.broker.security.protocol, which is
> the
> >> current behavior.
> >>
> >> Regarding updating the security protocol with one line change v.s
> two-lines
> >> change, I am a little confused, can you elaborate?
> >>
> >> Regarding the possibility of hurry and misreading. It is the system
> admin's
> >> responsibility to configure the right listener to ensure that different
> >> kinds of traffic are using the correct endpoints. So I think it is
> better
> >> that we always follow the same of convention instead of doing it in
> >> different ways.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >>
> >>
> >> On Fri, Aug 17, 2018 at 4:34 AM, Lucas Wang 
> wrote:
> >>
> >>> Thanks for the review, Becket.
> >>>
> >>> (1) After comparing the two approaches, I still feel the current
> writeup
> >> is
> >>> a little better.
> >>> a. The current writeup asks for an explicit endpoint while reusing the
> >>> existing "inter.broker.listener.name" with the exactly same semantic,
> >>> and your proposed change asks for a new listener name for controller
> >> while
> >>> reusing the existing "advertised.listeners" config with a slight
> semantic
> >>> change since a new controller endpoint needs to be added to it.
> >>> Hence conceptually the current writeup requires one config change
> instead
> >>> of two.
> >>> Also with one listener name, e.g. INTERNAL, for inter broker traffic,
> >>> instead of two, e.g. "INTERNAL" and "CONTROLLER",
> >>&g

[jira] [Reopened] (KAFKA-6753) Speed up event processing on the controller

2018-08-21 Thread Lucas Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang reopened KAFKA-6753:
---

> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>    Reporter: Lucas Wang
>    Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can 
> be quite large, all seen by the controller. Even if the time spent to check a 
> single partition is small, the accumulation effect of so many partitions in 
> the cluster can make the invocation to update metrics quite expensive. One 
> might argue that maybe the logic for processing each single partition is not 
> optimized, we checked the CPU percentage of leaf nodes in the profiling 
> result, and found that inside the loops of collection objects, e.g. the set 
> of all partitions, no single function dominates the processing. Hence the 
> large number of the partitions in a cluster is the main contributor to the 
> slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high 
> number of events to be processed by the controller, one invocation after 
> processing any event.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-08-21 Thread Lucas Wang
Thanks Becket. Following the convention of KIP-103 makes sense.
I've updated the KIP with your proposed changes. Please take another look.

Lucas

On Mon, Aug 20, 2018 at 7:29 AM Becket Qin  wrote:

> Hi Lucas,
>
> In KIP-103, we introduced a convention to define and look up the listeners.
> So it would be good if the later KIPs can follow the same convention.
>
> From what I understand, the advertised.listeners is actually designed for
> our purpose, i.e. providing a list of listeners that can be used in
> different cases. In KIP-103 it was used to separate internal traffic from
> the external traffic. It is not just for the user traffic or data
> only. So adding
> a controller listener is not repurposing the config. Also, ZK structure is
> only visible to brokers, the clients will still only see the listeners they
> are seeing today.
>
> For this KIP, we are essentially trying to separate the controller traffic
> from the inter-broker data traffic. So adding a new
> listener.name.for.controller config seems reasonable. The behavior would
> be:
> 1. If the listener.name.for.controller is set, the broker-controller
> communication will go through that listener.
> 2. Otherwise, the controller traffic falls back to use
> inter.broker.listener.name or inter.broker.security.protocol, which is the
> current behavior.
>
> Regarding updating the security protocol with one line change v.s two-lines
> change, I am a little confused, can you elaborate?
>
> Regarding the possibility of hurry and misreading. It is the system admin's
> responsibility to configure the right listener to ensure that different
> kinds of traffic are using the correct endpoints. So I think it is better
> that we always follow the same of convention instead of doing it in
> different ways.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Fri, Aug 17, 2018 at 4:34 AM, Lucas Wang  wrote:
>
> > Thanks for the review, Becket.
> >
> > (1) After comparing the two approaches, I still feel the current writeup
> is
> > a little better.
> > a. The current writeup asks for an explicit endpoint while reusing the
> > existing "inter.broker.listener.name" with the exactly same semantic,
> > and your proposed change asks for a new listener name for controller
> while
> > reusing the existing "advertised.listeners" config with a slight semantic
> > change since a new controller endpoint needs to be added to it.
> > Hence conceptually the current writeup requires one config change instead
> > of two.
> > Also with one listener name, e.g. INTERNAL, for inter broker traffic,
> > instead of two, e.g. "INTERNAL" and "CONTROLLER",
> > if an operator decides to switch from PLAINTEXT to SSL for internal
> > traffic, chances are that she wants to upgrade
> > both controller connections and data connections, she only needs to
> update
> > one line in
> > the "listener.security.protocol.map" config, and avoids possible
> mistakes.
> >
> >
> > b. When this KIP is picked up by an operator who is in a hurry without
> > reading the docs, if she sees a
> > new listener name for controller is required, and chances are there is
> > already a list of listeners,
> > it's possible for her to simply choose an existing listener name, without
> > explicitly creating
> > the new CONTROLLER listener and endpoints. If this is done, Kafka will be
> > run with the existing
> > behavior, defeating the purpose of this KIP.
> > In comparison, if she sees a separate endpoint is being asked, I feel
> it's
> > unlikely for her to
> > copy and paste an existing endpoint.
> >
> > Please let me know your comments.
> >
> > (2) Good catch, it's a typo, and it's been fixed.
> >
> > Thanks,
> > Lucas
> >
>


Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-08-16 Thread Lucas Wang
Thanks for the review, Becket.

(1) After comparing the two approaches, I still feel the current writeup is
a little better.
a. The current writeup asks for an explicit endpoint while reusing the
existing "inter.broker.listener.name" with the exactly same semantic,
and your proposed change asks for a new listener name for controller while
reusing the existing "advertised.listeners" config with a slight semantic
change since a new controller endpoint needs to be added to it.
Hence conceptually the current writeup requires one config change instead
of two.
Also with one listener name, e.g. INTERNAL, for inter broker traffic,
instead of two, e.g. "INTERNAL" and "CONTROLLER",
if an operator decides to switch from PLAINTEXT to SSL for internal
traffic, chances are that she wants to upgrade
both controller connections and data connections, she only needs to update
one line in
the "listener.security.protocol.map" config, and avoids possible mistakes.


b. When this KIP is picked up by an operator who is in a hurry without
reading the docs, if she sees a
new listener name for controller is required, and chances are there is
already a list of listeners,
it's possible for her to simply choose an existing listener name, without
explicitly creating
the new CONTROLLER listener and endpoints. If this is done, Kafka will be
run with the existing
behavior, defeating the purpose of this KIP.
In comparison, if she sees a separate endpoint is being asked, I feel it's
unlikely for her to
copy and paste an existing endpoint.

Please let me know your comments.

(2) Good catch, it's a typo, and it's been fixed.

Thanks,
Lucas


[jira] [Resolved] (KAFKA-6974) Changes the interaction between request handler threads and fetcher threads into an ASYNC model

2018-08-15 Thread Lucas Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang resolved KAFKA-6974.
---
Resolution: Won't Fix

> Changes the interaction between request handler threads and fetcher threads 
> into an ASYNC model
> ---
>
> Key: KAFKA-6974
> URL: https://issues.apache.org/jira/browse/KAFKA-6974
> Project: Kafka
>  Issue Type: Improvement
>    Reporter: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> At LinkedIn, occasionally our clients complain about receiving consant 
> NotLeaderForPartition exceptions 
> Investigations:
> For one investigated case, the cluster was going through a rolling bounce. 
> And we saw there was a ~8 minutes delay between an old partition leader 
> resigning and the new leader becoming active, based on entries of "Broker xxx 
> handling LeaderAndIsr request" in the state change log.
> Our monitoring shows the LeaderAndISR request local time during the incident 
> went up to ~4 minutes.
> Explanations:
> One possible explanation of the ~8 minutes of delay is:
> During controlled shutdown of a broker, the partitions whose leaders lie on 
> the shutting down broker need to go through leadership transitions. And the 
> controller process partitions in batches with each batch having 
> config.controlledShutdownPartitionBatchSize partitions, e.g. 100.
> If the 1st LeaderAndISR sent to a new leader broker takes too long, e.g. 4 
> minutes, then the subsequent LeaderAndISR requests can have an accumulated 
> delay of maybe 4 minutes, 8 minutes, or even 12 minutes... The reason is that 
> subsequent LeaderAndISR requests are blocked in a muted channel, given only 
> one LeaderAndISR request can be processed at a time with a 
> maxInFlightRequestsPerConnection setting of 1. When that happens, no existing 
> metric would show the total delay of 8 or 12 minutes for muted requests.
> Now the question is why it took ~4 minutes for the the 1st LeaderAndISR 
> request to finish.
> Explanation for the ~4 minutes of local time for LeaderAndISR request:
> During processing of an LeaderAndISR request, the request handler thread 
> needs to add partitions to or remove partitions from partitionStates field of 
> the ReplicaFetcherThread, also shutdown idle fetcher threads by checking the 
> size of the partitionStates field. On the other hand, background fetcher 
> threads need to iterate through all the partitions in partitionStates in 
> order to build fetch request, and process fetch responses. The 
> synchronization between request handler thread and the fetcher threads is 
> done through a partitionMapLock. 
> Specifically, the fetcher threads may acquire the partitionMapLock, and then 
> calls the following functions for processing the fetch response
> (1) processPartitionData, which in turn calls 
> (2) Replica.maybeIncrementLogStartOffset, which calls 
> (3) Log.maybeIncrementLogStartOffset, which calls 
> (4) LeaderEpochCache.clearAndFlushEarliest.
> Now two factors contribute to the long holding of the partitionMapLock,
> 1. function (4) above entails calling sync() to make sure data gets 
> persistent to the disk, which may potentially have a long latency
> 2. All the 4 functions above can potentially be called for each partition in 
> the fetch response, multiplying the sync() latency by a factor of n.
> The end result is that the request handler thread got blocked for a long time 
> trying to acquire the partitionMapLock of some fetcher inside 
> AbstractFetcherManager.shutdownIdleFetcherThreads since checking each 
> fetcher's partitionCount requires getting the partitionMapLock.
> In our testing environment, we reproduced the problem and confirmed the 
> explanation above with a request handler thread getting blocked for 10 
> seconds trying to acquire the partitionMapLock of one particular fetcher 
> thread, while there are many log entries showing "Incrementing log start 
> offset of partition..."
> Proposed change:
> We propose to change the interaction between the request handler threads and 
> the fetcher threads to an ASYNC model by using an event queue. All requests 
> to add or remove partitions, or shutdown idle fetcher threads are modeled as 
> items in the event queue. And only the fetcher threads can take items out of 
> the event queue and actually process them.
> In the new ASYNC model, in order to be able to process an infinite sequence 
> of FetchRequests, a fetcher thread initially has one FetchRequest, and after 
> it's done processing one FetchRequest, it en

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-08-13 Thread Lucas Wang
@Becket

Makes sense. I've updated the KIP by adding the following paragraph to the
motivation section

> Today there is no separate between controller requests and regular data
> plane requests. Specifically (1) a controller in a cluster uses the same
> advertised endpoints to connect to brokers as what clients and regular
> brokers use for exchanging data (2) on the broker side, the same network
> (processor) thread could be multiplexed by handling a controller connection
> and many other data plane connections (3) after a controller request is
> read from the socket, it is enqueued into the single FIFO requestQueue,
> which is used for all types of requests (4) request handler threads poll
> requests from the requestQueue and handles the controller requests with the
> same priority as regular data requests.
>
> Because of the multiplexing at every stage of request handling, controller
> requests could be significantly delayed under the following scenarios:
>
>1. The requestQueue is full, and therefore blocks a network
>(processor) thread that has read a controller request from the socket.
>2. A controller request is enqueued into the requestQueue after a
>backlog of data requests, and experiences a long queuing time in the
>requestQueue.
>
>
Please let me know if that looks ok or any other change you'd like to make.
Thanks!

Lucas

On Mon, Aug 13, 2018 at 6:33 AM, Becket Qin  wrote:

> Hi Lucas,
>
> Thanks for the explanation. It might be a nitpick, but it seems better to
> mention in the motivation part that today the client requests and
> controller requests are not only sharing the same queue, but also a bunch
> of things else, so that we can avoid asking people to read the rejected
> alternatives.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
>
>
>
> On Fri, Aug 10, 2018 at 6:23 AM, Lucas Wang  wrote:
>
> > @Becket,
> >
> > I've asked for review by Jun and Joel in the vote thread.
> > Regarding the separate thread and port, I did talk about it in the
> rejected
> > alternative design 1.
> > Please let me know if you'd like more elaboration or moving it to the
> > motivation, etc.
> >
> > Thanks,
> > Lucas
> >
> > On Wed, Aug 8, 2018 at 3:59 PM, Becket Qin  wrote:
> >
> > > Hi Lucas,
> > >
> > > Yes, a separate Jira is OK.
> > >
> > > Since the proposal has significantly changed since the initial vote
> > > started. We probably should let the others who have already voted know
> > and
> > > ensure they are happy with the updated proposal.
> > > Also, it seems the motivation part of the KIP wiki is still just
> talking
> > > about the separate queue and not fully cover the changes we make now,
> > e.g.
> > > separate thread, port, etc. We might want to explain a bit more so for
> > > people who did not follow the discussion mail thread also understand
> the
> > > whole proposal.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Wed, Aug 8, 2018 at 12:44 PM, Lucas Wang 
> > wrote:
> > >
> > > > Hi Becket,
> > > >
> > > > Thanks for the review. The current write up in the KIP won’t change
> the
> > > > ordering behavior. Are you ok with addressing that as a separate
> > > > independent issue (I’ll create a separate ticket for it)?
> > > > If so, can you please give me a +1 on the vote thread?
> > > >
> > > > Thanks,
> > > > Lucas
> > > >
> > > > On Tue, Aug 7, 2018 at 7:34 PM Becket Qin 
> > wrote:
> > > >
> > > > > Thanks for the updated KIP wiki, Lucas. Looks good to me overall.
> > > > >
> > > > > It might be an implementation detail, but do we still plan to use
> the
> > > > > correlation id to ensure the request processing order?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > > On Tue, Jul 31, 2018 at 3:39 AM, Lucas Wang  >
> > > > wrote:
> > > > >
> > > > > > Thanks for your review, Dong.
> > > > > > Ack that these configs will have a bigger impact for users.
> > > > > >
> > > > > > On the other hand, I would argue that the request queue becoming
> > full
> > > > > > may or may not be a rare scenario.
> > > > > > How often the request queue gets full depends on the request
>

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-08-09 Thread Lucas Wang
@Becket,

I've asked for review by Jun and Joel in the vote thread.
Regarding the separate thread and port, I did talk about it in the rejected
alternative design 1.
Please let me know if you'd like more elaboration or moving it to the
motivation, etc.

Thanks,
Lucas

On Wed, Aug 8, 2018 at 3:59 PM, Becket Qin  wrote:

> Hi Lucas,
>
> Yes, a separate Jira is OK.
>
> Since the proposal has significantly changed since the initial vote
> started. We probably should let the others who have already voted know and
> ensure they are happy with the updated proposal.
> Also, it seems the motivation part of the KIP wiki is still just talking
> about the separate queue and not fully cover the changes we make now, e.g.
> separate thread, port, etc. We might want to explain a bit more so for
> people who did not follow the discussion mail thread also understand the
> whole proposal.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Aug 8, 2018 at 12:44 PM, Lucas Wang  wrote:
>
> > Hi Becket,
> >
> > Thanks for the review. The current write up in the KIP won’t change the
> > ordering behavior. Are you ok with addressing that as a separate
> > independent issue (I’ll create a separate ticket for it)?
> > If so, can you please give me a +1 on the vote thread?
> >
> > Thanks,
> > Lucas
> >
> > On Tue, Aug 7, 2018 at 7:34 PM Becket Qin  wrote:
> >
> > > Thanks for the updated KIP wiki, Lucas. Looks good to me overall.
> > >
> > > It might be an implementation detail, but do we still plan to use the
> > > correlation id to ensure the request processing order?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Tue, Jul 31, 2018 at 3:39 AM, Lucas Wang 
> > wrote:
> > >
> > > > Thanks for your review, Dong.
> > > > Ack that these configs will have a bigger impact for users.
> > > >
> > > > On the other hand, I would argue that the request queue becoming full
> > > > may or may not be a rare scenario.
> > > > How often the request queue gets full depends on the request incoming
> > > rate,
> > > > the request processing rate, and the size of the request queue.
> > > > When that happens, the dedicated endpoints design can better handle
> > > > it than any of the previously discussed options.
> > > >
> > > > Another reason I made the change was that I have the same taste
> > > > as Becket that it's a better separation of the control plane from the
> > > data
> > > > plane.
> > > >
> > > > Finally, I want to clarify that this change is NOT motivated by the
> > > > out-of-order
> > > > processing discussion. The latter problem is orthogonal to this KIP,
> > and
> > > it
> > > > can happen in any of the design options we discussed for this KIP so
> > far.
> > > > So I'd like to address out-of-order processing separately in another
> > > > thread,
> > > > and avoid mentioning it in this KIP.
> > > >
> > > > Thanks,
> > > > Lucas
> > > >
> > > > On Fri, Jul 27, 2018 at 7:51 PM, Dong Lin 
> wrote:
> > > >
> > > > > Hey Lucas,
> > > > >
> > > > > Thanks for the update.
> > > > >
> > > > > The current KIP propose new broker configs
> "listeners.for.controller"
> > > and
> > > > > "advertised.listeners.for.controller". This is going to be a big
> > change
> > > > > since listeners are among the most important configs that every
> user
> > > > needs
> > > > > to change. According to the rejected alternative section, it seems
> > that
> > > > the
> > > > > reason to add these two configs is to improve performance when the
> > data
> > > > > request queue is full rather than for correctness. It should be a
> > very
> > > > rare
> > > > > scenario and I am not sure we should add configs for all users just
> > to
> > > > > improve the performance in such rare scenario.
> > > > >
> > > > > Also, if the new design is based on the issues which are discovered
> > in
> > > > the
> > > > > recent discussion, e.g. out of order processing if we don't use a
> > > > dedicated
> > > > > thread for controller request, it may be useful to explain the
> >

Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-08-09 Thread Lucas Wang
Hi Jun and Joel,

The KIP writeup has changed by quite a bit since your +1.
Can you please take another review? Thanks a lot for your time!

Lucas

On Tue, Jul 17, 2018 at 10:33 AM, Joel Koshy  wrote:

> +1 on the KIP.
>
> (I'm not sure we actually necessary to introduce the condition variables
> for the concern that Jun raised, but it's an implementation detail that we
> can defer to a discussion in the PR.)
>
> On Sat, Jul 14, 2018 at 10:45 PM, Lucas Wang 
> wrote:
>
> > Hi Jun,
> >
> > I agree by using the conditional variables, there is no need to add such
> a
> > new config.
> > Also thanks for approving this KIP.
> >
> > Lucas
> >
>


Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-08-07 Thread Lucas Wang
Hi Becket,

Thanks for the review. The current write up in the KIP won’t change the
ordering behavior. Are you ok with addressing that as a separate
independent issue (I’ll create a separate ticket for it)?
If so, can you please give me a +1 on the vote thread?

Thanks,
Lucas

On Tue, Aug 7, 2018 at 7:34 PM Becket Qin  wrote:

> Thanks for the updated KIP wiki, Lucas. Looks good to me overall.
>
> It might be an implementation detail, but do we still plan to use the
> correlation id to ensure the request processing order?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Jul 31, 2018 at 3:39 AM, Lucas Wang  wrote:
>
> > Thanks for your review, Dong.
> > Ack that these configs will have a bigger impact for users.
> >
> > On the other hand, I would argue that the request queue becoming full
> > may or may not be a rare scenario.
> > How often the request queue gets full depends on the request incoming
> rate,
> > the request processing rate, and the size of the request queue.
> > When that happens, the dedicated endpoints design can better handle
> > it than any of the previously discussed options.
> >
> > Another reason I made the change was that I have the same taste
> > as Becket that it's a better separation of the control plane from the
> data
> > plane.
> >
> > Finally, I want to clarify that this change is NOT motivated by the
> > out-of-order
> > processing discussion. The latter problem is orthogonal to this KIP, and
> it
> > can happen in any of the design options we discussed for this KIP so far.
> > So I'd like to address out-of-order processing separately in another
> > thread,
> > and avoid mentioning it in this KIP.
> >
> > Thanks,
> > Lucas
> >
> > On Fri, Jul 27, 2018 at 7:51 PM, Dong Lin  wrote:
> >
> > > Hey Lucas,
> > >
> > > Thanks for the update.
> > >
> > > The current KIP propose new broker configs "listeners.for.controller"
> and
> > > "advertised.listeners.for.controller". This is going to be a big change
> > > since listeners are among the most important configs that every user
> > needs
> > > to change. According to the rejected alternative section, it seems that
> > the
> > > reason to add these two configs is to improve performance when the data
> > > request queue is full rather than for correctness. It should be a very
> > rare
> > > scenario and I am not sure we should add configs for all users just to
> > > improve the performance in such rare scenario.
> > >
> > > Also, if the new design is based on the issues which are discovered in
> > the
> > > recent discussion, e.g. out of order processing if we don't use a
> > dedicated
> > > thread for controller request, it may be useful to explain the problem
> in
> > > the motivation section.
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Fri, Jul 27, 2018 at 1:28 PM, Lucas Wang 
> > wrote:
> > >
> > > > A kind reminder for review of this KIP.
> > > >
> > > > Thank you very much!
> > > > Lucas
> > > >
> > > > On Wed, Jul 25, 2018 at 10:23 PM, Lucas Wang 
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > I've updated the KIP by adding the dedicated endpoints for
> controller
> > > > > connections,
> > > > > and pinning threads for controller requests.
> > > > > Also I've updated the title of this KIP. Please take a look and let
> > me
> > > > > know your feedback.
> > > > >
> > > > > Thanks a lot for your time!
> > > > > Lucas
> > > > >
> > > > > On Tue, Jul 24, 2018 at 10:19 AM, Mayuresh Gharat <
> > > > > gharatmayures...@gmail.com> wrote:
> > > > >
> > > > >> Hi Lucas,
> > > > >> I agree, if we want to go forward with a separate controller plane
> > and
> > > > >> data
> > > > >> plane and completely isolate them, having a separate port for
> > > controller
> > > > >> with a separate Acceptor and a Processor sounds ideal to me.
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Mayuresh
> > > > >>
> > > > >>
> > > > >> On Mon, Jul 23, 2018 at 11:04 PM Becket Qin  >
> > > > wrote:
&g

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-30 Thread Lucas Wang
Thanks for your review, Dong.
Ack that these configs will have a bigger impact for users.

On the other hand, I would argue that the request queue becoming full
may or may not be a rare scenario.
How often the request queue gets full depends on the request incoming rate,
the request processing rate, and the size of the request queue.
When that happens, the dedicated endpoints design can better handle
it than any of the previously discussed options.

Another reason I made the change was that I have the same taste
as Becket that it's a better separation of the control plane from the data
plane.

Finally, I want to clarify that this change is NOT motivated by the
out-of-order
processing discussion. The latter problem is orthogonal to this KIP, and it
can happen in any of the design options we discussed for this KIP so far.
So I'd like to address out-of-order processing separately in another thread,
and avoid mentioning it in this KIP.

Thanks,
Lucas

On Fri, Jul 27, 2018 at 7:51 PM, Dong Lin  wrote:

> Hey Lucas,
>
> Thanks for the update.
>
> The current KIP propose new broker configs "listeners.for.controller" and
> "advertised.listeners.for.controller". This is going to be a big change
> since listeners are among the most important configs that every user needs
> to change. According to the rejected alternative section, it seems that the
> reason to add these two configs is to improve performance when the data
> request queue is full rather than for correctness. It should be a very rare
> scenario and I am not sure we should add configs for all users just to
> improve the performance in such rare scenario.
>
> Also, if the new design is based on the issues which are discovered in the
> recent discussion, e.g. out of order processing if we don't use a dedicated
> thread for controller request, it may be useful to explain the problem in
> the motivation section.
>
> Thanks,
> Dong
>
> On Fri, Jul 27, 2018 at 1:28 PM, Lucas Wang  wrote:
>
> > A kind reminder for review of this KIP.
> >
> > Thank you very much!
> > Lucas
> >
> > On Wed, Jul 25, 2018 at 10:23 PM, Lucas Wang 
> > wrote:
> >
> > > Hi All,
> > >
> > > I've updated the KIP by adding the dedicated endpoints for controller
> > > connections,
> > > and pinning threads for controller requests.
> > > Also I've updated the title of this KIP. Please take a look and let me
> > > know your feedback.
> > >
> > > Thanks a lot for your time!
> > > Lucas
> > >
> > > On Tue, Jul 24, 2018 at 10:19 AM, Mayuresh Gharat <
> > > gharatmayures...@gmail.com> wrote:
> > >
> > >> Hi Lucas,
> > >> I agree, if we want to go forward with a separate controller plane and
> > >> data
> > >> plane and completely isolate them, having a separate port for
> controller
> > >> with a separate Acceptor and a Processor sounds ideal to me.
> > >>
> > >> Thanks,
> > >>
> > >> Mayuresh
> > >>
> > >>
> > >> On Mon, Jul 23, 2018 at 11:04 PM Becket Qin 
> > wrote:
> > >>
> > >> > Hi Lucas,
> > >> >
> > >> > Yes, I agree that a dedicated end to end control flow would be
> ideal.
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Jiangjie (Becket) Qin
> > >> >
> > >> > On Tue, Jul 24, 2018 at 1:05 PM, Lucas Wang 
> > >> wrote:
> > >> >
> > >> > > Thanks for the comment, Becket.
> > >> > > So far, we've been trying to avoid making any request handler
> thread
> > >> > > special.
> > >> > > But if we were to follow that path in order to make the two planes
> > >> more
> > >> > > isolated,
> > >> > > what do you think about also having a dedicated processor thread,
> > >> > > and dedicated port for the controller?
> > >> > >
> > >> > > Today one processor thread can handle multiple connections, let's
> > say
> > >> 100
> > >> > > connections
> > >> > >
> > >> > > represented by connection0, ... connection99, among which
> > >> connection0-98
> > >> > > are from clients, while connection99 is from
> > >> > >
> > >> > > the controller. Further let's say after one selector polling,
> there
> > >> are
> > >> > > incoming requests on all conne

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-27 Thread Lucas Wang
A kind reminder for review of this KIP.

Thank you very much!
Lucas

On Wed, Jul 25, 2018 at 10:23 PM, Lucas Wang  wrote:

> Hi All,
>
> I've updated the KIP by adding the dedicated endpoints for controller
> connections,
> and pinning threads for controller requests.
> Also I've updated the title of this KIP. Please take a look and let me
> know your feedback.
>
> Thanks a lot for your time!
> Lucas
>
> On Tue, Jul 24, 2018 at 10:19 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
>> Hi Lucas,
>> I agree, if we want to go forward with a separate controller plane and
>> data
>> plane and completely isolate them, having a separate port for controller
>> with a separate Acceptor and a Processor sounds ideal to me.
>>
>> Thanks,
>>
>> Mayuresh
>>
>>
>> On Mon, Jul 23, 2018 at 11:04 PM Becket Qin  wrote:
>>
>> > Hi Lucas,
>> >
>> > Yes, I agree that a dedicated end to end control flow would be ideal.
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> > On Tue, Jul 24, 2018 at 1:05 PM, Lucas Wang 
>> wrote:
>> >
>> > > Thanks for the comment, Becket.
>> > > So far, we've been trying to avoid making any request handler thread
>> > > special.
>> > > But if we were to follow that path in order to make the two planes
>> more
>> > > isolated,
>> > > what do you think about also having a dedicated processor thread,
>> > > and dedicated port for the controller?
>> > >
>> > > Today one processor thread can handle multiple connections, let's say
>> 100
>> > > connections
>> > >
>> > > represented by connection0, ... connection99, among which
>> connection0-98
>> > > are from clients, while connection99 is from
>> > >
>> > > the controller. Further let's say after one selector polling, there
>> are
>> > > incoming requests on all connections.
>> > >
>> > > When the request queue is full, (either the data request being full in
>> > the
>> > > two queue design, or
>> > >
>> > > the one single queue being full in the deque design), the processor
>> > thread
>> > > will be blocked first
>> > >
>> > > when trying to enqueue the data request from connection0, then
>> possibly
>> > > blocked for the data request
>> > >
>> > > from connection1, ... etc even though the controller request is ready
>> to
>> > be
>> > > enqueued.
>> > >
>> > > To solve this problem, it seems we would need to have a separate port
>> > > dedicated to
>> > >
>> > > the controller, a dedicated processor thread, a dedicated controller
>> > > request queue,
>> > >
>> > > and pinning of one request handler thread for controller requests.
>> > >
>> > > Thanks,
>> > > Lucas
>> > >
>> > >
>> > > On Mon, Jul 23, 2018 at 6:00 PM, Becket Qin 
>> > wrote:
>> > >
>> > > > Personally I am not fond of the dequeue approach simply because it
>> is
>> > > > against the basic idea of isolating the controller plane and data
>> > plane.
>> > > > With a single dequeue, theoretically speaking the controller
>> requests
>> > can
>> > > > starve the clients requests. I would prefer the approach with a
>> > separate
>> > > > controller request queue and a dedicated controller request handler
>> > > thread.
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Jiangjie (Becket) Qin
>> > > >
>> > > > On Tue, Jul 24, 2018 at 8:16 AM, Lucas Wang 
>> > > wrote:
>> > > >
>> > > > > Sure, I can summarize the usage of correlation id. But before I do
>> > > that,
>> > > > it
>> > > > > seems
>> > > > > the same out-of-order processing can also happen to Produce
>> requests
>> > > sent
>> > > > > by producers,
>> > > > > following the same example you described earlier.
>> > > > > If that's the case, I think this probably deserves a separate doc
>> and
>> > > > > design independent of this KIP.
>> > > > >
>> > > > > 

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-25 Thread Lucas Wang
Hi All,

I've updated the KIP by adding the dedicated endpoints for controller
connections,
and pinning threads for controller requests.
Also I've updated the title of this KIP. Please take a look and let me know
your feedback.

Thanks a lot for your time!
Lucas

On Tue, Jul 24, 2018 at 10:19 AM, Mayuresh Gharat <
gharatmayures...@gmail.com> wrote:

> Hi Lucas,
> I agree, if we want to go forward with a separate controller plane and data
> plane and completely isolate them, having a separate port for controller
> with a separate Acceptor and a Processor sounds ideal to me.
>
> Thanks,
>
> Mayuresh
>
>
> On Mon, Jul 23, 2018 at 11:04 PM Becket Qin  wrote:
>
> > Hi Lucas,
> >
> > Yes, I agree that a dedicated end to end control flow would be ideal.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Tue, Jul 24, 2018 at 1:05 PM, Lucas Wang 
> wrote:
> >
> > > Thanks for the comment, Becket.
> > > So far, we've been trying to avoid making any request handler thread
> > > special.
> > > But if we were to follow that path in order to make the two planes more
> > > isolated,
> > > what do you think about also having a dedicated processor thread,
> > > and dedicated port for the controller?
> > >
> > > Today one processor thread can handle multiple connections, let's say
> 100
> > > connections
> > >
> > > represented by connection0, ... connection99, among which
> connection0-98
> > > are from clients, while connection99 is from
> > >
> > > the controller. Further let's say after one selector polling, there are
> > > incoming requests on all connections.
> > >
> > > When the request queue is full, (either the data request being full in
> > the
> > > two queue design, or
> > >
> > > the one single queue being full in the deque design), the processor
> > thread
> > > will be blocked first
> > >
> > > when trying to enqueue the data request from connection0, then possibly
> > > blocked for the data request
> > >
> > > from connection1, ... etc even though the controller request is ready
> to
> > be
> > > enqueued.
> > >
> > > To solve this problem, it seems we would need to have a separate port
> > > dedicated to
> > >
> > > the controller, a dedicated processor thread, a dedicated controller
> > > request queue,
> > >
> > > and pinning of one request handler thread for controller requests.
> > >
> > > Thanks,
> > > Lucas
> > >
> > >
> > > On Mon, Jul 23, 2018 at 6:00 PM, Becket Qin 
> > wrote:
> > >
> > > > Personally I am not fond of the dequeue approach simply because it is
> > > > against the basic idea of isolating the controller plane and data
> > plane.
> > > > With a single dequeue, theoretically speaking the controller requests
> > can
> > > > starve the clients requests. I would prefer the approach with a
> > separate
> > > > controller request queue and a dedicated controller request handler
> > > thread.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Tue, Jul 24, 2018 at 8:16 AM, Lucas Wang 
> > > wrote:
> > > >
> > > > > Sure, I can summarize the usage of correlation id. But before I do
> > > that,
> > > > it
> > > > > seems
> > > > > the same out-of-order processing can also happen to Produce
> requests
> > > sent
> > > > > by producers,
> > > > > following the same example you described earlier.
> > > > > If that's the case, I think this probably deserves a separate doc
> and
> > > > > design independent of this KIP.
> > > > >
> > > > > Lucas
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Jul 23, 2018 at 12:39 PM, Dong Lin 
> > > wrote:
> > > > >
> > > > > > Hey Lucas,
> > > > > >
> > > > > > Could you update the KIP if you are confident with the approach
> > which
> > > > > uses
> > > > > > correlation id? The idea around correlation id is kind of
> scattered
> > > > > across
> > > > > > multiple emails. It will be useful if other reviews can read the
> > KIP
> > > to
>

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-23 Thread Lucas Wang
Thanks for the comment, Becket.
So far, we've been trying to avoid making any request handler thread
special.
But if we were to follow that path in order to make the two planes more
isolated,
what do you think about also having a dedicated processor thread,
and dedicated port for the controller?

Today one processor thread can handle multiple connections, let's say 100
connections

represented by connection0, ... connection99, among which connection0-98
are from clients, while connection99 is from

the controller. Further let's say after one selector polling, there are
incoming requests on all connections.

When the request queue is full, (either the data request being full in the
two queue design, or

the one single queue being full in the deque design), the processor thread
will be blocked first

when trying to enqueue the data request from connection0, then possibly
blocked for the data request

from connection1, ... etc even though the controller request is ready to be
enqueued.

To solve this problem, it seems we would need to have a separate port
dedicated to

the controller, a dedicated processor thread, a dedicated controller
request queue,

and pinning of one request handler thread for controller requests.

Thanks,
Lucas


On Mon, Jul 23, 2018 at 6:00 PM, Becket Qin  wrote:

> Personally I am not fond of the dequeue approach simply because it is
> against the basic idea of isolating the controller plane and data plane.
> With a single dequeue, theoretically speaking the controller requests can
> starve the clients requests. I would prefer the approach with a separate
> controller request queue and a dedicated controller request handler thread.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Jul 24, 2018 at 8:16 AM, Lucas Wang  wrote:
>
> > Sure, I can summarize the usage of correlation id. But before I do that,
> it
> > seems
> > the same out-of-order processing can also happen to Produce requests sent
> > by producers,
> > following the same example you described earlier.
> > If that's the case, I think this probably deserves a separate doc and
> > design independent of this KIP.
> >
> > Lucas
> >
> >
> >
> > On Mon, Jul 23, 2018 at 12:39 PM, Dong Lin  wrote:
> >
> > > Hey Lucas,
> > >
> > > Could you update the KIP if you are confident with the approach which
> > uses
> > > correlation id? The idea around correlation id is kind of scattered
> > across
> > > multiple emails. It will be useful if other reviews can read the KIP to
> > > understand the latest proposal.
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Mon, Jul 23, 2018 at 12:32 PM, Mayuresh Gharat <
> > > gharatmayures...@gmail.com> wrote:
> > >
> > > > I like the idea of the dequeue implementation by Lucas. This will
> help
> > us
> > > > avoid additional queue for controller and additional configs in
> Kafka.
> > > >
> > > > Thanks,
> > > >
> > > > Mayuresh
> > > >
> > > > On Sun, Jul 22, 2018 at 2:58 AM Becket Qin 
> > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > The usage of correlation ID might still be useful to address the
> > cases
> > > > > that the controller epoch and leader epoch check are not sufficient
> > to
> > > > > guarantee correct behavior. For example, if the controller sends a
> > > > > LeaderAndIsrRequest followed by a StopReplicaRequest, and the
> broker
> > > > > processes it in the reverse order, the replica may still be wrongly
> > > > > recreated, right?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > > > On Jul 22, 2018, at 11:47 AM, Jun Rao  wrote:
> > > > > >
> > > > > > Hmm, since we already use controller epoch and leader epoch for
> > > > properly
> > > > > > caching the latest partition state, do we really need correlation
> > id
> > > > for
> > > > > > ordering the controller requests?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Fri, Jul 20, 2018 at 2:18 PM, Becket Qin <
> becket@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > >> Lucas and Mayuresh,
> > > > > >>
> > > > > >> Good idea. The co

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-23 Thread Lucas Wang
Sure, I can summarize the usage of correlation id. But before I do that, it
seems
the same out-of-order processing can also happen to Produce requests sent
by producers,
following the same example you described earlier.
If that's the case, I think this probably deserves a separate doc and
design independent of this KIP.

Lucas



On Mon, Jul 23, 2018 at 12:39 PM, Dong Lin  wrote:

> Hey Lucas,
>
> Could you update the KIP if you are confident with the approach which uses
> correlation id? The idea around correlation id is kind of scattered across
> multiple emails. It will be useful if other reviews can read the KIP to
> understand the latest proposal.
>
> Thanks,
> Dong
>
> On Mon, Jul 23, 2018 at 12:32 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
> > I like the idea of the dequeue implementation by Lucas. This will help us
> > avoid additional queue for controller and additional configs in Kafka.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Sun, Jul 22, 2018 at 2:58 AM Becket Qin  wrote:
> >
> > > Hi Jun,
> > >
> > > The usage of correlation ID might still be useful to address the cases
> > > that the controller epoch and leader epoch check are not sufficient to
> > > guarantee correct behavior. For example, if the controller sends a
> > > LeaderAndIsrRequest followed by a StopReplicaRequest, and the broker
> > > processes it in the reverse order, the replica may still be wrongly
> > > recreated, right?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > > On Jul 22, 2018, at 11:47 AM, Jun Rao  wrote:
> > > >
> > > > Hmm, since we already use controller epoch and leader epoch for
> > properly
> > > > caching the latest partition state, do we really need correlation id
> > for
> > > > ordering the controller requests?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Fri, Jul 20, 2018 at 2:18 PM, Becket Qin 
> > > wrote:
> > > >
> > > >> Lucas and Mayuresh,
> > > >>
> > > >> Good idea. The correlation id should work.
> > > >>
> > > >> In the ControllerChannelManager, a request will be resent until a
> > > response
> > > >> is received. So if the controller to broker connection disconnects
> > after
> > > >> controller sends R1_a, but before the response of R1_a is received,
> a
> > > >> disconnection may cause the controller to resend R1_b. i.e. until R1
> > is
> > > >> acked, R2 won't be sent by the controller.
> > > >> This gives two guarantees:
> > > >> 1. Correlation id wise: R1_a < R1_b < R2.
> > > >> 2. On the broker side, when R2 is seen, R1 must have been processed
> at
> > > >> least once.
> > > >>
> > > >> So on the broker side, with a single thread controller request
> > handler,
> > > the
> > > >> logic should be:
> > > >> 1. Process what ever request seen in the controller request queue
> > > >> 2. For the given epoch, drop request if its correlation id is
> smaller
> > > than
> > > >> that of the last processed request.
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jiangjie (Becket) Qin
> > > >>
> > > >> On Fri, Jul 20, 2018 at 8:07 AM, Jun Rao  wrote:
> > > >>
> > > >>> I agree that there is no strong ordering when there are more than
> one
> > > >>> socket connections. Currently, we rely on controllerEpoch and
> > > leaderEpoch
> > > >>> to ensure that the receiving broker picks up the latest state for
> > each
> > > >>> partition.
> > > >>>
> > > >>> One potential issue with the dequeue approach is that if the queue
> is
> > > >> full,
> > > >>> there is no guarantee that the controller requests will be enqueued
> > > >>> quickly.
> > > >>>
> > > >>> Thanks,
> > > >>>
> > > >>> Jun
> > > >>>
> > > >>> On Fri, Jul 20, 2018 at 5:25 AM, Mayuresh Gharat <
> > > >>> gharatmayures...@gmail.com
> > > >>>> wrote:
> > > >>>
> > > >>>> Yea, the correlationId is only set to 0 in t

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-19 Thread Lucas Wang
@Dong,
Great example and explanation, thanks!

@All
Regarding the example given by Dong, it seems even if we use a queue, and a
dedicated controller request handling thread,
the same result can still happen because R1_a will be sent on one
connection, and R1_b & R2 will be sent on a different connection,
and there is no ordering between different connections on the broker side.
I was discussing with Mayuresh offline, and it seems correlation id within
the same NetworkClient object is monotonically increasing and never reset,
hence a broker can leverage that to properly reject obsolete requests.
Thoughts?

Thanks,
Lucas

On Thu, Jul 19, 2018 at 12:11 PM, Mayuresh Gharat <
gharatmayures...@gmail.com> wrote:

> Actually nvm, correlationId is reset in case of connection loss, I think.
>
> Thanks,
>
> Mayuresh
>
> On Thu, Jul 19, 2018 at 11:11 AM Mayuresh Gharat <
> gharatmayures...@gmail.com>
> wrote:
>
> > I agree with Dong that out-of-order processing can happen with having 2
> > separate queues as well and it can even happen today.
> > Can we use the correlationId in the request from the controller to the
> > broker to handle ordering ?
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> > On Thu, Jul 19, 2018 at 6:41 AM Becket Qin  wrote:
> >
> >> Good point, Joel. I agree that a dedicated controller request handling
> >> thread would be a better isolation. It also solves the reordering issue.
> >>
> >> On Thu, Jul 19, 2018 at 2:23 PM, Joel Koshy 
> wrote:
> >>
> >> > Good example. I think this scenario can occur in the current code as
> >> well
> >> > but with even lower probability given that there are other
> >> non-controller
> >> > requests interleaved. It is still sketchy though and I think a safer
> >> > approach would be separate queues and pinning controller request
> >> handling
> >> > to one handler thread.
> >> >
> >> > On Wed, Jul 18, 2018 at 11:12 PM, Dong Lin 
> wrote:
> >> >
> >> > > Hey Becket,
> >> > >
> >> > > I think you are right that there may be out-of-order processing.
> >> However,
> >> > > it seems that out-of-order processing may also happen even if we
> use a
> >> > > separate queue.
> >> > >
> >> > > Here is the example:
> >> > >
> >> > > - Controller sends R1 and got disconnected before receiving
> response.
> >> > Then
> >> > > it reconnects and sends R2. Both requests now stay in the controller
> >> > > request queue in the order they are sent.
> >> > > - thread1 takes R1_a from the request queue and then thread2 takes
> R2
> >> > from
> >> > > the request queue almost at the same time.
> >> > > - So R1_a and R2 are processed in parallel. There is chance that
> R2's
> >> > > processing is completed before R1.
> >> > >
> >> > > If out-of-order processing can happen for both approaches with very
> >> low
> >> > > probability, it may not be worthwhile to add the extra queue. What
> do
> >> you
> >> > > think?
> >> > >
> >> > > Thanks,
> >> > > Dong
> >> > >
> >> > >
> >> > > On Wed, Jul 18, 2018 at 6:17 PM, Becket Qin 
> >> > wrote:
> >> > >
> >> > > > Hi Mayuresh/Joel,
> >> > > >
> >> > > > Using the request channel as a dequeue was bright up some time ago
> >> when
> >> > > we
> >> > > > initially thinking of prioritizing the request. The concern was
> that
> >> > the
> >> > > > controller requests are supposed to be processed in order. If we
> can
> >> > > ensure
> >> > > > that there is one controller request in the request channel, the
> >> order
> >> > is
> >> > > > not a concern. But in cases that there are more than one
> controller
> >> > > request
> >> > > > inserted into the queue, the controller request order may change
> and
> >> > > cause
> >> > > > problem. For example, think about the following sequence:
> >> > > > 1. Controller successfully sent a request R1 to broker
> >> > > > 2. Broker receives R1 and put the request to the head of the
> request
> >> > > queue.
> >> > >

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-18 Thread Lucas Wang
Hi Dong,

Sure. Regarding the 2nd case you mentioned
"- If the controller has not received response for R1 before it is
disconnected, it will re-send R1 followed by R2 after it is re-connected to
the broker."

with the max inflight request set to 1, after the connection is
re-established, the controller won't send R2
before it gets a response for R1, right? Plus the controller is using
blocking calls for each request, i.e.
NetworkClientUtils.sendAndReceive, with infinite retries for each request
within the same instance of RequestSendThread.
So within the same instance of RequestSendThread, sending out multiple
different requests seems impossible.

However, based on the comments in the source code, it seems multiple
requests can happen if
the broker loses its zk session, and then reconnects with zookeeper,
multiple generations of RequestSendThreads can trigger multiple different
requests.
In that case, we cannot prevent out-of-order processing even with the queue
since those multiple requests are from different connections.
Broker generations can help in those cases, but I won't dive into that
discussion.
Is that right?

Lucas

On Wed, Jul 18, 2018 at 9:08 PM, Dong Lin  wrote:

> Hey Lucas,
>
> I think for now we can probably discuss based on the existing Kafka's
> design where controller to a broker is hard coded to be 1. It looks like
> Becket has provided a good example in which requests from the same
> controller can be processed out of order.
>
> Thanks,
> Dong
>
> On Wed, Jul 18, 2018 at 8:35 PM, Lucas Wang  wrote:
>
> > @Becket and Dong,
> > I think currently the ordering guarantee is achieved because
> > the max inflight request from the controller to a broker is hard coded to
> > be 1.
> >
> > If let's hypothetically say the max inflight requests is > 1, then I
> think
> > Dong
> > is right to say that even the separate queue cannot guarantee ordered
> > processing,
> > For example, Req1 and Req2 are sent to a broker, and after a connection
> > reconnection,
> > both requests are sent again, causing the broker to have 4 requests in
> the
> > following order
> > Req2 > Req1 > Req2 > Req1.
> >
> > In summary, it seems using the dequeue should not cause problems with
> > out-of-order processing.
> > Is that right?
> >
> > Lucas
> >
> > On Wed, Jul 18, 2018 at 6:24 PM, Dong Lin  wrote:
> >
> > > Hey Becket,
> > >
> > > It seems that the requests from the old controller will be discarded
> due
> > to
> > > old controller epoch. It is not clear whether this is a problem.
> > >
> > > And if this out-of-order processing of controller requests is a
> problem,
> > it
> > > seems like an existing problem which also applies to the multi-queue
> > based
> > > design. So it is probably not a concern specific to the use of deque.
> > Does
> > > that sound reasonable?
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > > On Wed, 18 Jul 2018 at 6:17 PM Becket Qin 
> wrote:
> > >
> > > > Hi Mayuresh/Joel,
> > > >
> > > > Using the request channel as a dequeue was bright up some time ago
> when
> > > we
> > > > initially thinking of prioritizing the request. The concern was that
> > the
> > > > controller requests are supposed to be processed in order. If we can
> > > ensure
> > > > that there is one controller request in the request channel, the
> order
> > is
> > > > not a concern. But in cases that there are more than one controller
> > > request
> > > > inserted into the queue, the controller request order may change and
> > > cause
> > > > problem. For example, think about the following sequence:
> > > > 1. Controller successfully sent a request R1 to broker
> > > > 2. Broker receives R1 and put the request to the head of the request
> > > queue.
> > > > 3. Controller to broker connection failed and the controller
> > reconnected
> > > to
> > > > the broker.
> > > > 4. Controller sends a request R2 to the broker
> > > > 5. Broker receives R2 and add it to the head of the request queue.
> > > > Now on the broker side, R2 will be processed before R1 is processed,
> > > which
> > > > may cause problem.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > >
> > > >
> > > > On Thu, Jul 19, 2018 at 3:23 AM, Joel Koshy 
> > wrote:
> 

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-18 Thread Lucas Wang
@Becket and Dong,
I think currently the ordering guarantee is achieved because
the max inflight request from the controller to a broker is hard coded to
be 1.

If let's hypothetically say the max inflight requests is > 1, then I think
Dong
is right to say that even the separate queue cannot guarantee ordered
processing,
For example, Req1 and Req2 are sent to a broker, and after a connection
reconnection,
both requests are sent again, causing the broker to have 4 requests in the
following order
Req2 > Req1 > Req2 > Req1.

In summary, it seems using the dequeue should not cause problems with
out-of-order processing.
Is that right?

Lucas

On Wed, Jul 18, 2018 at 6:24 PM, Dong Lin  wrote:

> Hey Becket,
>
> It seems that the requests from the old controller will be discarded due to
> old controller epoch. It is not clear whether this is a problem.
>
> And if this out-of-order processing of controller requests is a problem, it
> seems like an existing problem which also applies to the multi-queue based
> design. So it is probably not a concern specific to the use of deque. Does
> that sound reasonable?
>
> Thanks,
> Dong
>
>
> On Wed, 18 Jul 2018 at 6:17 PM Becket Qin  wrote:
>
> > Hi Mayuresh/Joel,
> >
> > Using the request channel as a dequeue was bright up some time ago when
> we
> > initially thinking of prioritizing the request. The concern was that the
> > controller requests are supposed to be processed in order. If we can
> ensure
> > that there is one controller request in the request channel, the order is
> > not a concern. But in cases that there are more than one controller
> request
> > inserted into the queue, the controller request order may change and
> cause
> > problem. For example, think about the following sequence:
> > 1. Controller successfully sent a request R1 to broker
> > 2. Broker receives R1 and put the request to the head of the request
> queue.
> > 3. Controller to broker connection failed and the controller reconnected
> to
> > the broker.
> > 4. Controller sends a request R2 to the broker
> > 5. Broker receives R2 and add it to the head of the request queue.
> > Now on the broker side, R2 will be processed before R1 is processed,
> which
> > may cause problem.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Thu, Jul 19, 2018 at 3:23 AM, Joel Koshy  wrote:
> >
> > > @Mayuresh - I like your idea. It appears to be a simpler less invasive
> > > alternative and it should work. Jun/Becket/others, do you see any
> > pitfalls
> > > with this approach?
> > >
> > > On Wed, Jul 18, 2018 at 12:03 PM, Lucas Wang 
> > > wrote:
> > >
> > > > @Mayuresh,
> > > > That's a very interesting idea that I haven't thought before.
> > > > It seems to solve our problem at hand pretty well, and also
> > > > avoids the need to have a new size metric and capacity config
> > > > for the controller request queue. In fact, if we were to adopt
> > > > this design, there is no public interface change, and we
> > > > probably don't need a KIP.
> > > > Also implementation wise, it seems
> > > > the java class LinkedBlockingQueue can readily satisfy the
> requirement
> > > > by supporting a capacity, and also allowing inserting at both ends.
> > > >
> > > > My only concern is that this design is tied to the coincidence that
> > > > we have two request priorities and there are two ends to a deque.
> > > > Hence by using the proposed design, it seems the network layer is
> > > > more tightly coupled with upper layer logic, e.g. if we were to add
> > > > an extra priority level in the future for some reason, we would
> > probably
> > > > need to go back to the design of separate queues, one for each
> priority
> > > > level.
> > > >
> > > > In summary, I'm ok with both designs and lean toward your suggested
> > > > approach.
> > > > Let's hear what others think.
> > > >
> > > > @Becket,
> > > > In light of Mayuresh's suggested new design, I'm answering your
> > question
> > > > only in the context
> > > > of the current KIP design: I think your suggestion makes sense, and
> I'm
> > > ok
> > > > with removing the capacity config and
> > > > just relying on the default value of 20 being sufficient enough.
> > > >
> > > > Thanks,
> > > > Lucas
> > > >
> > > >
> > >

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-18 Thread Lucas Wang
@Mayuresh,
That's a very interesting idea that I haven't thought before.
It seems to solve our problem at hand pretty well, and also
avoids the need to have a new size metric and capacity config
for the controller request queue. In fact, if we were to adopt
this design, there is no public interface change, and we
probably don't need a KIP.
Also implementation wise, it seems
the java class LinkedBlockingQueue can readily satisfy the requirement
by supporting a capacity, and also allowing inserting at both ends.

My only concern is that this design is tied to the coincidence that
we have two request priorities and there are two ends to a deque.
Hence by using the proposed design, it seems the network layer is
more tightly coupled with upper layer logic, e.g. if we were to add
an extra priority level in the future for some reason, we would probably
need to go back to the design of separate queues, one for each priority
level.

In summary, I'm ok with both designs and lean toward your suggested
approach.
Let's hear what others think.

@Becket,
In light of Mayuresh's suggested new design, I'm answering your question
only in the context
of the current KIP design: I think your suggestion makes sense, and I'm ok
with removing the capacity config and
just relying on the default value of 20 being sufficient enough.

Thanks,
Lucas










On Wed, Jul 18, 2018 at 9:57 AM, Mayuresh Gharat  wrote:

> Hi Lucas,
>
> Seems like the main intent here is to prioritize the controller request
> over any other requests.
> In that case, we can change the request queue to a dequeue, where you
> always insert the normal requests (produce, consume,..etc) to the end of
> the dequeue, but if its a controller request, you insert it to the head of
> the queue. This ensures that the controller request will be given higher
> priority over other requests.
>
> Also since we only read one request from the socket and mute it and only
> unmute it after handling the request, this would ensure that we don't
> handle controller requests out of order.
>
> With this approach we can avoid the second queue and the additional config
> for the size of the queue.
>
> What do you think ?
>
> Thanks,
>
> Mayuresh
>
>
> On Wed, Jul 18, 2018 at 3:05 AM Becket Qin  wrote:
>
> > Hey Joel,
> >
> > Thank for the detail explanation. I agree the current design makes sense.
> > My confusion is about whether the new config for the controller queue
> > capacity is necessary. I cannot think of a case in which users would
> change
> > it.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Wed, Jul 18, 2018 at 6:00 PM, Becket Qin 
> wrote:
> >
> > > Hi Lucas,
> > >
> > > I guess my question can be rephrased to "do we expect user to ever
> change
> > > the controller request queue capacity"? If we agree that 20 is already
> a
> > > very generous default number and we do not expect user to change it, is
> > it
> > > still necessary to expose this as a config?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Wed, Jul 18, 2018 at 2:29 AM, Lucas Wang 
> > wrote:
> > >
> > >> @Becket
> > >> 1. Thanks for the comment. You are right that normally there should be
> > >> just
> > >> one controller request because of muting,
> > >> and I had NOT intended to say there would be many enqueued controller
> > >> requests.
> > >> I went through the KIP again, and I'm not sure which part conveys that
> > >> info.
> > >> I'd be happy to revise if you point it out the section.
> > >>
> > >> 2. Though it should not happen in normal conditions, the current
> design
> > >> does not preclude multiple controllers running
> > >> at the same time, hence if we don't have the controller queue capacity
> > >> config and simply make its capacity to be 1,
> > >> network threads handling requests from different controllers will be
> > >> blocked during those troublesome times,
> > >> which is probably not what we want. On the other hand, adding the
> extra
> > >> config with a default value, say 20, guards us from issues in those
> > >> troublesome times, and IMO there isn't much downside of adding the
> extra
> > >> config.
> > >>
> > >> @Mayuresh
> > >> Good catch, this sentence is an obsolete statement based on a previous
> > >> design. I've revised the wording in the KIP.
> > >>
> > >> Thanks,
> > >> Lucas
> > >>
> > >

[jira] [Created] (KAFKA-7180) In testHWCheckpointWithFailuresSingleLogSegment, wait until server1 has joined the ISR before shutting down server2

2018-07-18 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-7180:
-

 Summary: In testHWCheckpointWithFailuresSingleLogSegment, wait 
until server1 has joined the ISR before shutting down server2
 Key: KAFKA-7180
 URL: https://issues.apache.org/jira/browse/KAFKA-7180
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Wang
Assignee: Lucas Wang


In the testHWCheckpointWithFailuresSingleLogSegment method, the test logic is 
1. shutdown server1 and then capture the leadership of a partition in the 
variable "leader", which should be server2
2. shutdown server2 and wait until the leadership has changed to a broker other 
than server2
through the line 
waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = 
Some(leader))

However when we execute step 2 and shutdown server2, it's possible that server1 
has not caught up with the partition, and has not joined the ISR. With unclean 
leader election turned off, the leadership cannot be transferred to server1, 
causing the waited condition in step 2 to be never met. 

The obvious fix is to wait until server1 has joined the ISR before shutting 
down server2.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-17 Thread Lucas Wang
@Becket
1. Thanks for the comment. You are right that normally there should be just
one controller request because of muting,
and I had NOT intended to say there would be many enqueued controller
requests.
I went through the KIP again, and I'm not sure which part conveys that
info.
I'd be happy to revise if you point it out the section.

2. Though it should not happen in normal conditions, the current design
does not preclude multiple controllers running
at the same time, hence if we don't have the controller queue capacity
config and simply make its capacity to be 1,
network threads handling requests from different controllers will be
blocked during those troublesome times,
which is probably not what we want. On the other hand, adding the extra
config with a default value, say 20, guards us from issues in those
troublesome times, and IMO there isn't much downside of adding the extra
config.

@Mayuresh
Good catch, this sentence is an obsolete statement based on a previous
design. I've revised the wording in the KIP.

Thanks,
Lucas

On Tue, Jul 17, 2018 at 10:33 AM, Mayuresh Gharat <
gharatmayures...@gmail.com> wrote:

> Hi Lucas,
>
> Thanks for the KIP.
> I am trying to understand why you think "The memory consumption can rise
> given the total number of queued requests can go up to 2x" in the impact
> section. Normally the requests from controller to a Broker are not high
> volume, right ?
>
>
> Thanks,
>
> Mayuresh
>
> On Tue, Jul 17, 2018 at 5:06 AM Becket Qin  wrote:
>
> > Thanks for the KIP, Lucas. Separating the control plane from the data
> plane
> > makes a lot of sense.
> >
> > In the KIP you mentioned that the controller request queue may have many
> > requests in it. Will this be a common case? The controller requests still
> > goes through the SocketServer. The SocketServer will mute the channel
> once
> > a request is read and put into the request channel. So assuming there is
> > only one connection between controller and each broker, on the broker
> side,
> > there should be only one controller request in the controller request
> queue
> > at any given time. If that is the case, do we need a separate controller
> > request queue capacity config? The default value 20 means that we expect
> > there are 20 controller switches to happen in a short period of time. I
> am
> > not sure whether someone should increase the controller request queue
> > capacity to handle such case, as it seems indicating something very wrong
> > has happened.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Fri, Jul 13, 2018 at 1:10 PM, Dong Lin  wrote:
> >
> > > Thanks for the update Lucas.
> > >
> > > I think the motivation section is intuitive. It will be good to learn
> > more
> > > about the comments from other reviewers.
> > >
> > > On Thu, Jul 12, 2018 at 9:48 PM, Lucas Wang 
> > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > > I've updated the motivation section of the KIP by explaining the
> cases
> > > that
> > > > would have user impacts.
> > > > Please take a look at let me know your comments.
> > > >
> > > > Thanks,
> > > > Lucas
> > > >
> > > > On Mon, Jul 9, 2018 at 5:53 PM, Lucas Wang 
> > > wrote:
> > > >
> > > > > Hi Dong,
> > > > >
> > > > > The simulation of disk being slow is merely for me to easily
> > construct
> > > a
> > > > > testing scenario
> > > > > with a backlog of produce requests. In production, other than the
> > disk
> > > > > being slow, a backlog of
> > > > > produce requests may also be caused by high produce QPS.
> > > > > In that case, we may not want to kill the broker and that's when
> this
> > > KIP
> > > > > can be useful, both for JBOD
> > > > > and non-JBOD setup.
> > > > >
> > > > > Going back to your previous question about each ProduceRequest
> > covering
> > > > 20
> > > > > partitions that are randomly
> > > > > distributed, let's say a LeaderAndIsr request is enqueued that
> tries
> > to
> > > > > switch the current broker, say broker0, from leader to follower
> > > > > *for one of the partitions*, say *test-0*. For the sake of
> argument,
> > > > > let's also assume the other brokers, say broker1, have *stopped*
> > > fetching
> > > > > from
> > > > > the current broke

Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-07-14 Thread Lucas Wang
Hi Jun,

I agree by using the conditional variables, there is no need to add such a
new config.
Also thanks for approving this KIP.

Lucas


[jira] [Created] (KAFKA-7162) Flaky unit tests caused by record creation timestamps differ from validation time by more than timestampDiffMaxMs

2018-07-13 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-7162:
-

 Summary: Flaky unit tests caused by record creation timestamps 
differ from validation time by more than timestampDiffMaxMs
 Key: KAFKA-7162
 URL: https://issues.apache.org/jira/browse/KAFKA-7162
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Wang


While running gradle unit tests, we found the test method 
LogValidatorTest.testCompressedV1 can fail sometimes. Upon investigation, it 
turns out the test method uses one set of timestamps, say t0, t1 and t2, for 
the records, while using a separate timestamp, say t3, for the "now" parameter 
when invoking the LogValidator.validateMessagesAndAssignOffsets method. The 
validateMessagesAndAssignOffsets validation method also takes a parameter 
timestampDiffMaxMs=1 second, that specifies the maximum allowed time different 
between t3 and the timestamps in records, i.e. t0, t1, and t2. While running 
unit tests, especially when multiple tests are run simultaneously, there is no 
guarantee that the time difference between t3 and t0 is within 1 second, 
causing the test method to flaky sometimes. Many other test methods in the 
LogValidatorTest can suffer from the same problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-07-12 Thread Lucas Wang
Hi Jun,

About 3, thanks for the clarification. I like your proposal in that it
avoids the delay for controller requests when the data request queue is
empty.
In comparison, the approach I described earlier is simpler to understand
and implement.

Between these two I actually like your suggested approach better because
in some cases the data request queue becoming empty can be a common scenario
if the request handler average idle percent is high. Therefore the extra
complexity is worth
the effort.

Either approach we choose, it does not affect public interfaces or the
write up in the KIP,
and we can discuss further in the PR.

Thanks,
Lucas



On Wed, Jul 11, 2018 at 8:46 AM, Jun Rao  wrote:

> Hi, Lucas,
>
> 2. Good point about not knowing the request type in memory pool. Looking at
> the implementation. It seems that queued.max.request.bytes is orthogonal to
> queued.max.requests. So, this seems fine.
>
> 3. The implementation that you suggested sounds good. It would be useful
> not to unnecessarily delay the processing of a request up to 300ms. I was
> thinking that we could have RequestChannel manage a Lock and a couple of
> Conditions and have sendRequest()/receiveRequest() coordinate on the lock
> and the conditions (similar to how ArrayBlockingQueue is implemented). This
> way, any new request can wake up the blocked request handling threads
> immediately.
>
> Thanks,
>
> Jun
>
>
> On Fri, Jun 29, 2018 at 4:53 PM, Lucas Wang  wrote:
>
> > Hi Jun,
> >
> > Thanks for your comments.
> > 1. I just replied in the discussion thread about the positive change this
> > KIP can still bring
> > if implemented on the latest trunk, which includes the async ZK
> operations
> > for KAFKA-5642.
> > The evaluation is done using an integration test.
> > In production, we have not upgraded to Kafka 1.1 yet, and the code we are
> > currently running does
> > not include async ZK operations, therefore I don't have any real usage
> > result.
> >
> > 2. Thanks for bringing this up. I haven't considered this setting, and
> the
> > existing proposal in this KIP
> > would make data requests and controller requests share a memory poll of
> > size specified by the config
> > queued.max.request.bytes. The downside is that if there is memory
> pressure,
> > controller requests may be blocked
> > from being read from a socket and does not get prioritized at the socket
> > layer.
> >
> > If we have a separate bytes limit for the controller requests, I imagine
> > there would be a separate memory pool
> > dedicated to controller requests. Also it requires the processors to tell
> > connections from a controller apart
> > from connections from other brokers or clients, which would probably
> > require a dedicated port for the controller?
> > IMO, this change is mainly driven by the memory pressure, kind of an
> > orthogonal issue, and we can address it with a separate KIP
> > if desired. Please let me know what you think.
> >
> > 3. I plans to change the implementation of the method
> > receiveRequest(timeout: Long) in the RequestChannel class as follows:
> >
> > val controllerRequest = controllerRequestQueue.poll()
> > if (controllerRequest != null) {
> >   controllerRequest
> > } else {
> >   dataRequestQueue.poll(timeout, TimeUnit.MILLISECONDS)
> > }
> >
> > with this implementation, there is no need to explicitly choose a request
> > handler thread to wake up depending on
> > the types of request enqueued, and if a controller request arrives while
> > some request handler threads are blocked on an empty data request queue,
> > they will simply timeout and call the receiveRequest method again.
> >
> > In terms of performance, it means that in the worst case, for a
> controller
> > request that just missed the receiveRequest call, it can be delayed for
> as
> > long as
> > the timeout parameter, which is hard coded to be 300 milliseconds. If
> there
> > is just one request handler thread, the average delay is
> > 150 milliseconds assuming the chance of a controller request arriving at
> > any particular time is the same. With N request handler threads,
> > the average delay is 150/N milliseconds, which does not seem to be a
> > problem.
> >
> > We have considered waking up of request handler threads based on which
> > queue the request handler threads are blocked,
> > and that design was turned down because of its complexity. The design can
> > be found at here
> > <https://cwiki.apache.org/confluence/display/KAFKA/Old+contr
> > oller+request+queue+design>
> >

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-12 Thread Lucas Wang
Hi Dong,

I've updated the motivation section of the KIP by explaining the cases that
would have user impacts.
Please take a look at let me know your comments.

Thanks,
Lucas

On Mon, Jul 9, 2018 at 5:53 PM, Lucas Wang  wrote:

> Hi Dong,
>
> The simulation of disk being slow is merely for me to easily construct a
> testing scenario
> with a backlog of produce requests. In production, other than the disk
> being slow, a backlog of
> produce requests may also be caused by high produce QPS.
> In that case, we may not want to kill the broker and that's when this KIP
> can be useful, both for JBOD
> and non-JBOD setup.
>
> Going back to your previous question about each ProduceRequest covering 20
> partitions that are randomly
> distributed, let's say a LeaderAndIsr request is enqueued that tries to
> switch the current broker, say broker0, from leader to follower
> *for one of the partitions*, say *test-0*. For the sake of argument,
> let's also assume the other brokers, say broker1, have *stopped* fetching
> from
> the current broker, i.e. broker0.
> 1. If the enqueued produce requests have acks =  -1 (ALL)
>   1.1 without this KIP, the ProduceRequests ahead of LeaderAndISR will be
> put into the purgatory,
> and since they'll never be replicated to other brokers (because of
> the assumption made above), they will
> be completed either when the LeaderAndISR request is processed or
> when the timeout happens.
>   1.2 With this KIP, broker0 will immediately transition the partition
> test-0 to become a follower,
> after the current broker sees the replication of the remaining 19
> partitions, it can send a response indicating that
> it's no longer the leader for the "test-0".
>   To see the latency difference between 1.1 and 1.2, let's say there are
> 24K produce requests ahead of the LeaderAndISR, and there are 8 io threads,
>   so each io thread will process approximately 3000 produce requests. Now
> let's investigate the io thread that finally processed the LeaderAndISR.
>   For the 3000 produce requests, if we model the time when their remaining
> 19 partitions catch up as t0, t1, ...t2999, and the LeaderAndISR request is
> processed at time t3000.
>   Without this KIP, the 1st produce request would have waited an extra
> t3000 - t0 time in the purgatory, the 2nd an extra time of t3000 - t1, etc.
>   Roughly speaking, the latency difference is bigger for the earlier
> produce requests than for the later ones. For the same reason, the more
> ProduceRequests queued
>   before the LeaderAndISR, the bigger benefit we get (capped by the
> produce timeout).
> 2. If the enqueued produce requests have acks=0 or acks=1
>   There will be no latency differences in this case, but
>   2.1 without this KIP, the records of partition test-0 in the
> ProduceRequests ahead of the LeaderAndISR will be appended to the local log,
> and eventually be truncated after processing the LeaderAndISR.
> This is what's referred to as
> "some unofficial definition of data loss in terms of messages
> beyond the high watermark".
>   2.2 with this KIP, we can mitigate the effect since if the LeaderAndISR
> is immediately processed, the response to producers will have
> the NotLeaderForPartition error, causing producers to retry
>
> This explanation above is the benefit for reducing the latency of a broker
> becoming the follower,
> closely related is reducing the latency of a broker becoming the leader.
> In this case, the benefit is even more obvious, if other brokers have
> resigned leadership, and the
> current broker should take leadership. Any delay in processing the
> LeaderAndISR will be perceived
> by clients as unavailability. In extreme cases, this can cause failed
> produce requests if the retries are
> exhausted.
>
> Another two types of controller requests are UpdateMetadata and
> StopReplica, which I'll briefly discuss as follows:
> For UpdateMetadata requests, delayed processing means clients receiving
> stale metadata, e.g. with the wrong leadership info
> for certain partitions, and the effect is more retries or even fatal
> failure if the retries are exhausted.
>
> For StopReplica requests, a long queuing time may degrade the performance
> of topic deletion.
>
> Regarding your last question of the delay for DescribeLogDirsRequest, you
> are right
> that this KIP cannot help with the latency in getting the log dirs info,
> and it's only relevant
> when controller requests are involved.
>
> Regards,
> Lucas
>
>
> On Tue, Jul 3, 2018 at 5:11 PM, Dong Lin  wrote:
>
>> Hey Jun,
>>
>> Thanks much for the comments. It is good point. So the feature may be
>> usef

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-09 Thread Lucas Wang
broker. So the overall
> time to move leaders away from the failed disk may still be long even with
> this KIP. What do you think?
>
> Thanks,
> Dong
>
>
> On Tue, Jul 3, 2018 at 4:38 PM, Lucas Wang  wrote:
>
> > Thanks for the insightful comment, Jun.
> >
> > @Dong,
> > Since both of the two comments in your previous email are about the
> > benefits of this KIP and whether it's useful,
> > in light of Jun's last comment, do you agree that this KIP can be
> > beneficial in the case mentioned by Jun?
> > Please let me know, thanks!
> >
> > Regards,
> > Lucas
> >
> > On Tue, Jul 3, 2018 at 2:07 PM, Jun Rao  wrote:
> >
> > > Hi, Lucas, Dong,
> > >
> > > If all disks on a broker are slow, one probably should just kill the
> > > broker. In that case, this KIP may not help. If only one of the disks
> on
> > a
> > > broker is slow, one may want to fail that disk and move the leaders on
> > that
> > > disk to other brokers. In that case, being able to process the
> > LeaderAndIsr
> > > requests faster will potentially help the producers recover quicker.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Jul 2, 2018 at 7:56 PM, Dong Lin  wrote:
> > >
> > > > Hey Lucas,
> > > >
> > > > Thanks for the reply. Some follow up questions below.
> > > >
> > > > Regarding 1, if each ProduceRequest covers 20 partitions that are
> > > randomly
> > > > distributed across all partitions, then each ProduceRequest will
> likely
> > > > cover some partitions for which the broker is still leader after it
> > > quickly
> > > > processes the
> > > > LeaderAndIsrRequest. Then broker will still be slow in processing
> these
> > > > ProduceRequest and request will still be very high with this KIP. It
> > > seems
> > > > that most ProduceRequest will still timeout after 30 seconds. Is this
> > > > understanding correct?
> > > >
> > > > Regarding 2, if most ProduceRequest will still timeout after 30
> > seconds,
> > > > then it is less clear how this KIP reduces average produce latency.
> Can
> > > you
> > > > clarify what metrics can be improved by this KIP?
> > > >
> > > > Not sure why system operator directly cares number of truncated
> > messages.
> > > > Do you mean this KIP can improve average throughput or reduce message
> > > > duplication? It will be good to understand this.
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Tue, 3 Jul 2018 at 7:12 AM Lucas Wang 
> > wrote:
> > > >
> > > > > Hi Dong,
> > > > >
> > > > > Thanks for your valuable comments. Please see my reply below.
> > > > >
> > > > > 1. The Google doc showed only 1 partition. Now let's consider a
> more
> > > > common
> > > > > scenario
> > > > > where broker0 is the leader of many partitions. And let's say for
> > some
> > > > > reason its IO becomes slow.
> > > > > The number of leader partitions on broker0 is so large, say 10K,
> that
> > > the
> > > > > cluster is skewed,
> > > > > and the operator would like to shift the leadership for a lot of
> > > > > partitions, say 9K, to other brokers,
> > > > > either manually or through some service like cruise control.
> > > > > With this KIP, not only will the leadership transitions finish more
> > > > > quickly, helping the cluster itself becoming more balanced,
> > > > > but all existing producers corresponding to the 9K partitions will
> > get
> > > > the
> > > > > errors relatively quickly
> > > > > rather than relying on their timeout, thanks to the batched async
> ZK
> > > > > operations.
> > > > > To me it's a useful feature to have during such troublesome times.
> > > > >
> > > > >
> > > > > 2. The experiments in the Google Doc have shown that with this KIP
> > many
> > > > > producers
> > > > > receive an explicit error NotLeaderForPartition, based on which
> they
> > > > retry
> > > > > immediately.
> > > > > Therefore the la

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-03 Thread Lucas Wang
Thanks for the insightful comment, Jun.

@Dong,
Since both of the two comments in your previous email are about the
benefits of this KIP and whether it's useful,
in light of Jun's last comment, do you agree that this KIP can be
beneficial in the case mentioned by Jun?
Please let me know, thanks!

Regards,
Lucas

On Tue, Jul 3, 2018 at 2:07 PM, Jun Rao  wrote:

> Hi, Lucas, Dong,
>
> If all disks on a broker are slow, one probably should just kill the
> broker. In that case, this KIP may not help. If only one of the disks on a
> broker is slow, one may want to fail that disk and move the leaders on that
> disk to other brokers. In that case, being able to process the LeaderAndIsr
> requests faster will potentially help the producers recover quicker.
>
> Thanks,
>
> Jun
>
> On Mon, Jul 2, 2018 at 7:56 PM, Dong Lin  wrote:
>
> > Hey Lucas,
> >
> > Thanks for the reply. Some follow up questions below.
> >
> > Regarding 1, if each ProduceRequest covers 20 partitions that are
> randomly
> > distributed across all partitions, then each ProduceRequest will likely
> > cover some partitions for which the broker is still leader after it
> quickly
> > processes the
> > LeaderAndIsrRequest. Then broker will still be slow in processing these
> > ProduceRequest and request will still be very high with this KIP. It
> seems
> > that most ProduceRequest will still timeout after 30 seconds. Is this
> > understanding correct?
> >
> > Regarding 2, if most ProduceRequest will still timeout after 30 seconds,
> > then it is less clear how this KIP reduces average produce latency. Can
> you
> > clarify what metrics can be improved by this KIP?
> >
> > Not sure why system operator directly cares number of truncated messages.
> > Do you mean this KIP can improve average throughput or reduce message
> > duplication? It will be good to understand this.
> >
> > Thanks,
> > Dong
> >
> >
> >
> >
> >
> > On Tue, 3 Jul 2018 at 7:12 AM Lucas Wang  wrote:
> >
> > > Hi Dong,
> > >
> > > Thanks for your valuable comments. Please see my reply below.
> > >
> > > 1. The Google doc showed only 1 partition. Now let's consider a more
> > common
> > > scenario
> > > where broker0 is the leader of many partitions. And let's say for some
> > > reason its IO becomes slow.
> > > The number of leader partitions on broker0 is so large, say 10K, that
> the
> > > cluster is skewed,
> > > and the operator would like to shift the leadership for a lot of
> > > partitions, say 9K, to other brokers,
> > > either manually or through some service like cruise control.
> > > With this KIP, not only will the leadership transitions finish more
> > > quickly, helping the cluster itself becoming more balanced,
> > > but all existing producers corresponding to the 9K partitions will get
> > the
> > > errors relatively quickly
> > > rather than relying on their timeout, thanks to the batched async ZK
> > > operations.
> > > To me it's a useful feature to have during such troublesome times.
> > >
> > >
> > > 2. The experiments in the Google Doc have shown that with this KIP many
> > > producers
> > > receive an explicit error NotLeaderForPartition, based on which they
> > retry
> > > immediately.
> > > Therefore the latency (~14 seconds+quick retry) for their single
> message
> > is
> > > much smaller
> > > compared with the case of timing out without the KIP (30 seconds for
> > timing
> > > out + quick retry).
> > > One might argue that reducing the timing out on the producer side can
> > > achieve the same result,
> > > yet reducing the timeout has its own drawbacks[1].
> > >
> > > Also *IF* there were a metric to show the number of truncated messages
> on
> > > brokers,
> > > with the experiments done in the Google Doc, it should be easy to see
> > that
> > > a lot fewer messages need
> > > to be truncated on broker0 since the up-to-date metadata avoids
> appending
> > > of messages
> > > in subsequent PRODUCE requests. If we talk to a system operator and ask
> > > whether
> > > they prefer fewer wasteful IOs, I bet most likely the answer is yes.
> > >
> > > 3. To answer your question, I think it might be helpful to construct
> some
> > > formulas.
> > > To simplify the modeling, I'm going back to the case where there is
> only
> > > ONE partition

Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-07-03 Thread Lucas Wang
@Ted
For #1, it's probably hard to predict M since it also depends on the
hardware.
I'm not sure how to use the suggested formula for the default value if we
don't know M.
Also TO is the default timeout we want to figure out, and the formula seems
to be recursive.
I'd suggest we stay with the current default value of 300 milliseconds, and
address it separately
if it turns out to be a problem. What do you think?

#2, please try this link and see if it works now:
https://drive.google.com/file/d/1QbPDqfT59A2X4To2p3OfD5YeJR8aWDK7/view?usp=sharing

Regards,
Lucas


On Mon, Jul 2, 2018 at 5:52 PM, Ted Yu  wrote:

> For #1, I don't know what would be good approximation for M.
> Maybe use max((TO / 2) / N, M / N) as default value for poll timeout ?
>
> For #2, I don't see the picture in email :-)
> Can you use third party website ?
>
> Thanks
>
> On Mon, Jul 2, 2018 at 5:17 PM, Lucas Wang  wrote:
>
> > Hi Ted,
> >
> > 1. I'm neutral on making the poll timeout parameter configurable.
> > Mainly because as a config, it could be confusing for operators who try
> to
> > choose a value for it.
> >
> > To understand the implication of this value better,
> > let's use TO to represent the timeout value under discussion,
> > M to denote the processing time of data requests,
> > and N to be the number of io threads.
> >
> > - If the data request queue is empty and there is no incoming data
> > requests,
> >   all io threads should be blocked on the data request queue, and
> >   the average delay for a controller request is (TO / 2) / N, and the
> > worst case delay is TO.
> > - If all IO threads are busy processing data requests, then the average
> > latency for a controller request is M / N.
> > - In the worst case, a controller request can just miss the train, and IO
> > threads get blocked on data request queue
> >   for TO, at the end of which they all receive a new incoming data
> > request, the latency for the
> >   controller request can be TO + M.
> >
> > Given the intricacies, what do you think about choosing a relatively
> > meaningful value and stick with it,
> > rather than exposing it as a config?
> >
> > 2. Sorry for losing the format of the table, I've attached it below as a
> > picture
> >
> >
> > Regards,
> > Lucas
> >
> > On Fri, Jun 29, 2018 at 5:28 PM, Ted Yu  wrote:
> >
> >> bq. which is hard coded to be 300 milliseconds
> >>
> >> Have you considered making the duration configurable ?
> >>
> >> The comparison at the end of your email seems to be copied where tabular
> >> form is lost.
> >> Do you mind posting that part again ?
> >>
> >> Thanks
> >>
> >> On Fri, Jun 29, 2018 at 4:53 PM, Lucas Wang 
> >> wrote:
> >>
> >> > Hi Jun,
> >> >
> >> > Thanks for your comments.
> >> > 1. I just replied in the discussion thread about the positive change
> >> this
> >> > KIP can still bring
> >> > if implemented on the latest trunk, which includes the async ZK
> >> operations
> >> > for KAFKA-5642.
> >> > The evaluation is done using an integration test.
> >> > In production, we have not upgraded to Kafka 1.1 yet, and the code we
> >> are
> >> > currently running does
> >> > not include async ZK operations, therefore I don't have any real usage
> >> > result.
> >> >
> >> > 2. Thanks for bringing this up. I haven't considered this setting, and
> >> the
> >> > existing proposal in this KIP
> >> > would make data requests and controller requests share a memory poll
> of
> >> > size specified by the config
> >> > queued.max.request.bytes. The downside is that if there is memory
> >> pressure,
> >> > controller requests may be blocked
> >> > from being read from a socket and does not get prioritized at the
> socket
> >> > layer.
> >> >
> >> > If we have a separate bytes limit for the controller requests, I
> imagine
> >> > there would be a separate memory pool
> >> > dedicated to controller requests. Also it requires the processors to
> >> tell
> >> > connections from a controller apart
> >> > from connections from other brokers or clients, which would probably
> >> > require a dedicated port for the controller?
> >> > IMO, this change is mainly driven by the memory pressure, kind of an
> >> > orthogonal issue, and we can a

Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-07-02 Thread Lucas Wang
Hi Ted,

1. I'm neutral on making the poll timeout parameter configurable.
Mainly because as a config, it could be confusing for operators who try to
choose a value for it.

To understand the implication of this value better,
let's use TO to represent the timeout value under discussion,
M to denote the processing time of data requests,
and N to be the number of io threads.

- If the data request queue is empty and there is no incoming data requests,
  all io threads should be blocked on the data request queue, and
  the average delay for a controller request is (TO / 2) / N, and the worst
case delay is TO.
- If all IO threads are busy processing data requests, then the average
latency for a controller request is M / N.
- In the worst case, a controller request can just miss the train, and IO
threads get blocked on data request queue
  for TO, at the end of which they all receive a new incoming data request,
the latency for the
  controller request can be TO + M.

Given the intricacies, what do you think about choosing a relatively
meaningful value and stick with it,
rather than exposing it as a config?

2. Sorry for losing the format of the table, I've attached it below as a
picture


Regards,
Lucas

On Fri, Jun 29, 2018 at 5:28 PM, Ted Yu  wrote:

> bq. which is hard coded to be 300 milliseconds
>
> Have you considered making the duration configurable ?
>
> The comparison at the end of your email seems to be copied where tabular
> form is lost.
> Do you mind posting that part again ?
>
> Thanks
>
> On Fri, Jun 29, 2018 at 4:53 PM, Lucas Wang  wrote:
>
> > Hi Jun,
> >
> > Thanks for your comments.
> > 1. I just replied in the discussion thread about the positive change this
> > KIP can still bring
> > if implemented on the latest trunk, which includes the async ZK
> operations
> > for KAFKA-5642.
> > The evaluation is done using an integration test.
> > In production, we have not upgraded to Kafka 1.1 yet, and the code we are
> > currently running does
> > not include async ZK operations, therefore I don't have any real usage
> > result.
> >
> > 2. Thanks for bringing this up. I haven't considered this setting, and
> the
> > existing proposal in this KIP
> > would make data requests and controller requests share a memory poll of
> > size specified by the config
> > queued.max.request.bytes. The downside is that if there is memory
> pressure,
> > controller requests may be blocked
> > from being read from a socket and does not get prioritized at the socket
> > layer.
> >
> > If we have a separate bytes limit for the controller requests, I imagine
> > there would be a separate memory pool
> > dedicated to controller requests. Also it requires the processors to tell
> > connections from a controller apart
> > from connections from other brokers or clients, which would probably
> > require a dedicated port for the controller?
> > IMO, this change is mainly driven by the memory pressure, kind of an
> > orthogonal issue, and we can address it with a separate KIP
> > if desired. Please let me know what you think.
> >
> > 3. I plans to change the implementation of the method
> > receiveRequest(timeout: Long) in the RequestChannel class as follows:
> >
> > val controllerRequest = controllerRequestQueue.poll()
> > if (controllerRequest != null) {
> >   controllerRequest
> > } else {
> >   dataRequestQueue.poll(timeout, TimeUnit.MILLISECONDS)
> > }
> >
> > with this implementation, there is no need to explicitly choose a request
> > handler thread to wake up depending on
> > the types of request enqueued, and if a controller request arrives while
> > some request handler threads are blocked on an empty data request queue,
> > they will simply timeout and call the receiveRequest method again.
> >
> > In terms of performance, it means that in the worst case, for a
> controller
> > request that just missed the receiveRequest call, it can be delayed for
> as
> > long as
> > the timeout parameter, which is hard coded to be 300 milliseconds. If
> there
> > is just one request handler thread, the average delay is
> > 150 milliseconds assuming the chance of a controller request arriving at
> > any particular time is the same. With N request handler threads,
> > the average delay is 150/N milliseconds, which does not seem to be a
> > problem.
> >
> > We have considered waking up of request handler threads based on which
> > queue the request handler threads are blocked,
> > and that design was turned down because of its complexity. The design can
> > be found at here
> > <https://cwiki.apach

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-07-02 Thread Lucas Wang
nt etc. It
> is probably useful to clarify this.
>
> 3) Does this KIP help improve user experience only when there is issue with
> broker, e.g. significant backlog in the request queue due to slow disk as
> described in the Google doc? Or is this KIP also useful when there is no
> ongoing issue in the cluster? It might be helpful to clarify this to
> understand the benefit of this KIP.
>
>
> Thanks much,
> Dong
>
>
>
>
> On Fri, Jun 29, 2018 at 2:58 PM, Lucas Wang  wrote:
>
> > Hi Eno,
> >
> > Sorry for the delay in getting the experiment results.
> > Here is a link to the positive impact achieved by implementing the
> proposed
> > change:
> > https://docs.google.com/document/d/1ge2jjp5aPTBber6zaIT9AdhW
> > FWUENJ3JO6Zyu4f9tgQ/edit?usp=sharing
> > Please take a look when you have time and let me know your feedback.
> >
> > Regards,
> > Lucas
> >
> > On Tue, Jun 26, 2018 at 9:52 AM, Harsha  wrote:
> >
> > > Thanks for the pointer. Will take a look might suit our requirements
> > > better.
> > >
> > > Thanks,
> > > Harsha
> > >
> > > On Mon, Jun 25th, 2018 at 2:52 PM, Lucas Wang 
> > > wrote:
> > >
> > > >
> > > >
> > > >
> > > > Hi Harsha,
> > > >
> > > > If I understand correctly, the replication quota mechanism proposed
> in
> > > > KIP-73 can be helpful in that scenario.
> > > > Have you tried it out?
> > > >
> > > > Thanks,
> > > > Lucas
> > > >
> > > >
> > > >
> > > > On Sun, Jun 24, 2018 at 8:28 AM, Harsha < ka...@harsha.io > wrote:
> > > >
> > > > > Hi Lucas,
> > > > > One more question, any thoughts on making this configurable
> > > > > and also allowing subset of data requests to be prioritized. For
> > > example
> > > >
> > > > > ,we notice in our cluster when we take out a broker and bring new
> one
> > > it
> > > >
> > > > > will try to become follower and have lot of fetch requests to other
> > > > leaders
> > > > > in clusters. This will negatively effect the application/client
> > > > requests.
> > > > > We are also exploring the similar solution to de-prioritize if a
> new
> > > > > replica comes in for fetch requests, we are ok with the replica to
> be
> > > > > taking time but the leaders should prioritize the client requests.
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Harsha
> > > > >
> > > > > On Fri, Jun 22nd, 2018 at 11:35 AM Lucas Wang wrote:
> > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > Hi Eno,
> > > > > >
> > > > > > Sorry for the delayed response.
> > > > > > - I haven't implemented the feature yet, so no experimental
> results
> > > so
> > > >
> > > > > > far.
> > > > > > And I plan to test in out in the following days.
> > > > > >
> > > > > > - You are absolutely right that the priority queue does not
> > > completely
> > > >
> > > > > > prevent
> > > > > > data requests being processed ahead of controller requests.
> > > > > > That being said, I expect it to greatly mitigate the effect of
> > stable
> > > > > > metadata.
> > > > > > In any case, I'll try it out and post the results when I have it.
> > > > > >
> > > > > > Regards,
> > > > > > Lucas
> > > > > >
> > > > > > On Wed, Jun 20, 2018 at 5:44 AM, Eno Thereska <
> > > eno.there...@gmail.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Lucas,
> > > > > > >
> > > > > > > Sorry for the delay, just had a look at this. A couple of
> > > questions:
> > > >
> > > > > > > - did you notice any positive change after implementing this
> KIP?
> > > > I'm
> > > > > > > wondering if you have any experimental results that show the
> > > benefit
> > > > of
> > > > > > the
> > > > > > > two queues.
> > > &g

Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-06-29 Thread Lucas Wang
uest handler thread blocked on one queue, when
> another queue gets a request? Have we considered just making the request
> queue a priority queue?
>
> 4. Related to 3, currently we have 2
> metrics  NetworkProcessorAvgIdlePercent and RequestHandlerAvgIdlePercent
> that measure the utilization of the network and the request handler thread
> pools. They are computed by measuring the amount of time waiting on the
> request queue. Will these 2 metrics be extended to support 2 request
> queues.
>
> Jun
>
>
> On Mon, Jun 18, 2018 at 1:04 PM, Lucas Wang  wrote:
>
> > Hi All,
> >
> > I've addressed a couple of comments in the discussion thread for KIP-291,
> > and
> > got no objections after making the changes. Therefore I would like to
> start
> > the voting thread.
> >
> > KIP:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-291%
> > 3A+Have+separate+queues+for+control+requests+and+data+requests
> >
> > Thanks for your time!
> > Lucas
> >
>


Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-06-29 Thread Lucas Wang
Hi Eno,

Sorry for the delay in getting the experiment results.
Here is a link to the positive impact achieved by implementing the proposed
change:
https://docs.google.com/document/d/1ge2jjp5aPTBber6zaIT9AdhWFWUENJ3JO6Zyu4f9tgQ/edit?usp=sharing
Please take a look when you have time and let me know your feedback.

Regards,
Lucas

On Tue, Jun 26, 2018 at 9:52 AM, Harsha  wrote:

> Thanks for the pointer. Will take a look might suit our requirements
> better.
>
> Thanks,
> Harsha
>
> On Mon, Jun 25th, 2018 at 2:52 PM, Lucas Wang 
> wrote:
>
> >
> >
> >
> > Hi Harsha,
> >
> > If I understand correctly, the replication quota mechanism proposed in
> > KIP-73 can be helpful in that scenario.
> > Have you tried it out?
> >
> > Thanks,
> > Lucas
> >
> >
> >
> > On Sun, Jun 24, 2018 at 8:28 AM, Harsha < ka...@harsha.io > wrote:
> >
> > > Hi Lucas,
> > > One more question, any thoughts on making this configurable
> > > and also allowing subset of data requests to be prioritized. For
> example
> >
> > > ,we notice in our cluster when we take out a broker and bring new one
> it
> >
> > > will try to become follower and have lot of fetch requests to other
> > leaders
> > > in clusters. This will negatively effect the application/client
> > requests.
> > > We are also exploring the similar solution to de-prioritize if a new
> > > replica comes in for fetch requests, we are ok with the replica to be
> > > taking time but the leaders should prioritize the client requests.
> > >
> > >
> > > Thanks,
> > > Harsha
> > >
> > > On Fri, Jun 22nd, 2018 at 11:35 AM Lucas Wang wrote:
> > >
> > > >
> > > >
> > > >
> > > > Hi Eno,
> > > >
> > > > Sorry for the delayed response.
> > > > - I haven't implemented the feature yet, so no experimental results
> so
> >
> > > > far.
> > > > And I plan to test in out in the following days.
> > > >
> > > > - You are absolutely right that the priority queue does not
> completely
> >
> > > > prevent
> > > > data requests being processed ahead of controller requests.
> > > > That being said, I expect it to greatly mitigate the effect of stable
> > > > metadata.
> > > > In any case, I'll try it out and post the results when I have it.
> > > >
> > > > Regards,
> > > > Lucas
> > > >
> > > > On Wed, Jun 20, 2018 at 5:44 AM, Eno Thereska <
> eno.there...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi Lucas,
> > > > >
> > > > > Sorry for the delay, just had a look at this. A couple of
> questions:
> >
> > > > > - did you notice any positive change after implementing this KIP?
> > I'm
> > > > > wondering if you have any experimental results that show the
> benefit
> > of
> > > > the
> > > > > two queues.
> > > > >
> > > > > - priority is usually not sufficient in addressing the problem the
> > KIP
> > > > > identifies. Even with priority queues, you will sometimes (often?)
> > have
> > > > the
> > > > > case that data plane requests will be ahead of the control plane
> > > > requests.
> > > > > This happens because the system might have already started
> > processing
> > > > the
> > > > > data plane requests before the control plane ones arrived. So it
> > would
> > > > be
> > > > > good to know what % of the problem this KIP addresses.
> > > > >
> > > > > Thanks
> > > > > Eno
> > > > >
> > > >
> > > >
> > > > > On Fri, Jun 15, 2018 at 4:44 PM, Ted Yu < yuzhih...@gmail.com >
> > wrote:
> > > > >
> > > > > > Change looks good.
> > > > > >
> > > > > > Thanks
> > > > > >
> > > > > > On Fri, Jun 15, 2018 at 8:42 AM, Lucas Wang <
> lucasatu...@gmail.com
> >
> > > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi Ted,
> > > > > > >
> > > > > > > Thanks for the suggestion. I've updated the KIP. Please take
> > > another
> > > >
> > &g

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-06-25 Thread Lucas Wang
Hi Harsha,

If I understand correctly, the replication quota mechanism proposed in
KIP-73 can be helpful in that scenario.
Have you tried it out?

Thanks,
Lucas

On Sun, Jun 24, 2018 at 8:28 AM, Harsha  wrote:

> Hi Lucas,
>  One more question, any thoughts on making this configurable
> and also allowing subset of data requests to be prioritized. For example
> ,we notice in our cluster when we take out a broker and bring new one it
> will try to become follower and have lot of fetch requests to other leaders
> in clusters. This will negatively effect the application/client requests.
> We are also exploring the similar solution to de-prioritize if a new
> replica comes in for fetch requests, we are ok with the replica to be
> taking time but the leaders should prioritize the client requests.
>
>
> Thanks,
> Harsha
>
> On Fri, Jun 22nd, 2018 at 11:35 AM Lucas Wang wrote:
>
> >
> >
> >
> > Hi Eno,
> >
> > Sorry for the delayed response.
> > - I haven't implemented the feature yet, so no experimental results so
> > far.
> > And I plan to test in out in the following days.
> >
> > - You are absolutely right that the priority queue does not completely
> > prevent
> > data requests being processed ahead of controller requests.
> > That being said, I expect it to greatly mitigate the effect of stable
> > metadata.
> > In any case, I'll try it out and post the results when I have it.
> >
> > Regards,
> > Lucas
> >
> > On Wed, Jun 20, 2018 at 5:44 AM, Eno Thereska < eno.there...@gmail.com >
> > wrote:
> >
> > > Hi Lucas,
> > >
> > > Sorry for the delay, just had a look at this. A couple of questions:
> > > - did you notice any positive change after implementing this KIP? I'm
> > > wondering if you have any experimental results that show the benefit of
> > the
> > > two queues.
> > >
> > > - priority is usually not sufficient in addressing the problem the KIP
> > > identifies. Even with priority queues, you will sometimes (often?) have
> > the
> > > case that data plane requests will be ahead of the control plane
> > requests.
> > > This happens because the system might have already started processing
> > the
> > > data plane requests before the control plane ones arrived. So it would
> > be
> > > good to know what % of the problem this KIP addresses.
> > >
> > > Thanks
> > > Eno
> > >
> >
> >
> > > On Fri, Jun 15, 2018 at 4:44 PM, Ted Yu < yuzhih...@gmail.com > wrote:
> > >
> > > > Change looks good.
> > > >
> > > > Thanks
> > > >
> > > > On Fri, Jun 15, 2018 at 8:42 AM, Lucas Wang < lucasatu...@gmail.com
> >
> > > wrote:
> > > >
> > > > > Hi Ted,
> > > > >
> > > > > Thanks for the suggestion. I've updated the KIP. Please take
> another
> >
> > > > look.
> > > > >
> > > > > Lucas
> > > > >
> > > > > On Thu, Jun 14, 2018 at 6:34 PM, Ted Yu < yuzhih...@gmail.com >
> > wrote:
> > > > >
> > > > > > Currently in KafkaConfig.scala :
> > > > > >
> > > > > > val QueuedMaxRequests = 500
> > > > > >
> > > > > > It would be good if you can include the default value for this
> new
> >
> > > > config
> > > > > > in the KIP.
> > > > > >
> > > > > > Thanks
> > > > > >
> > > > > > On Thu, Jun 14, 2018 at 4:28 PM, Lucas Wang <
> lucasatu...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi Ted, Dong
> > > > > > >
> > > > > > > I've updated the KIP by adding a new config, instead of reusing
> > the
> > > > > > > existing one.
> > > > > > > Please take another look when you have time. Thanks a lot!
> > > > > > >
> > > > > > > Lucas
> > > > > > >
> > > > > > > On Thu, Jun 14, 2018 at 2:33 PM, Ted Yu < yuzhih...@gmail.com
> >
> > > wrote:
> > > > > > >
> > > > > > > > bq. that's a waste of resource if control request rate is low
> > > > > > > >
> > > > > > > > I don't know if control request rate can get to 100,00

Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-06-22 Thread Lucas Wang
Hi Eno,

Sorry for the delayed response.
- I haven't implemented the feature yet, so no experimental results so far.
And I plan to test in out in the following days.

- You are absolutely right that the priority queue does not completely
prevent
data requests being processed ahead of controller requests.
That being said, I expect it to greatly mitigate the effect of stable
metadata.
In any case, I'll try it out and post the results when I have it.

Regards,
Lucas

On Wed, Jun 20, 2018 at 5:44 AM, Eno Thereska 
wrote:

> Hi Lucas,
>
> Sorry for the delay, just had a look at this. A couple of questions:
> - did you notice any positive change after implementing this KIP? I'm
> wondering if you have any experimental results that show the benefit of the
> two queues.
>
> - priority is usually not sufficient in addressing the problem the KIP
> identifies. Even with priority queues, you will sometimes (often?) have the
> case that data plane requests will be ahead of the control plane requests.
> This happens because the system might have already started processing the
> data plane requests before the control plane ones arrived. So it would be
> good to know what % of the problem this KIP addresses.
>
> Thanks
> Eno
>
> On Fri, Jun 15, 2018 at 4:44 PM, Ted Yu  wrote:
>
> > Change looks good.
> >
> > Thanks
> >
> > On Fri, Jun 15, 2018 at 8:42 AM, Lucas Wang 
> wrote:
> >
> > > Hi Ted,
> > >
> > > Thanks for the suggestion. I've updated the KIP. Please take another
> > look.
> > >
> > > Lucas
> > >
> > > On Thu, Jun 14, 2018 at 6:34 PM, Ted Yu  wrote:
> > >
> > > > Currently in KafkaConfig.scala :
> > > >
> > > >   val QueuedMaxRequests = 500
> > > >
> > > > It would be good if you can include the default value for this new
> > config
> > > > in the KIP.
> > > >
> > > > Thanks
> > > >
> > > > On Thu, Jun 14, 2018 at 4:28 PM, Lucas Wang 
> > > wrote:
> > > >
> > > > > Hi Ted, Dong
> > > > >
> > > > > I've updated the KIP by adding a new config, instead of reusing the
> > > > > existing one.
> > > > > Please take another look when you have time. Thanks a lot!
> > > > >
> > > > > Lucas
> > > > >
> > > > > On Thu, Jun 14, 2018 at 2:33 PM, Ted Yu 
> wrote:
> > > > >
> > > > > > bq.  that's a waste of resource if control request rate is low
> > > > > >
> > > > > > I don't know if control request rate can get to 100,000, likely
> > not.
> > > > Then
> > > > > > using the same bound as that for data requests seems high.
> > > > > >
> > > > > > On Wed, Jun 13, 2018 at 10:13 PM, Lucas Wang <
> > lucasatu...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Ted,
> > > > > > >
> > > > > > > Thanks for taking a look at this KIP.
> > > > > > > Let's say today the setting of "queued.max.requests" in
> cluster A
> > > is
> > > > > > 1000,
> > > > > > > while the setting in cluster B is 100,000.
> > > > > > > The 100 times difference might have indicated that machines in
> > > > cluster
> > > > > B
> > > > > > > have larger memory.
> > > > > > >
> > > > > > > By reusing the "queued.max.requests", the controlRequestQueue
> in
> > > > > cluster
> > > > > > B
> > > > > > > automatically
> > > > > > > gets a 100x capacity without explicitly bothering the
> operators.
> > > > > > > I understand the counter argument can be that maybe that's a
> > waste
> > > of
> > > > > > > resource if control request
> > > > > > > rate is low and operators may want to fine tune the capacity of
> > the
> > > > > > > controlRequestQueue.
> > > > > > >
> > > > > > > I'm ok with either approach, and can change it if you or anyone
> > > else
> > > > > > feels
> > > > > > > strong about adding the extra config.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Lucas
> > > > > > >
> > > &

Re: [VOTE] KIP-291: Have separate queues for control requests and data requests

2018-06-19 Thread Lucas Wang
Hi Jun, Ismael,

Can you please take a look when you get a chance? Thanks!

Lucas

On Mon, Jun 18, 2018 at 1:47 PM, Ted Yu  wrote:

> +1
>
> On Mon, Jun 18, 2018 at 1:04 PM, Lucas Wang  wrote:
>
> > Hi All,
> >
> > I've addressed a couple of comments in the discussion thread for KIP-291,
> > and
> > got no objections after making the changes. Therefore I would like to
> start
> > the voting thread.
> >
> > KIP:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 291%3A+Have+separate+queues+for+control+requests+and+data+requests
> >
> > Thanks for your time!
> > Lucas
> >
>


[VOTE] KIP-291: Have separate queues for control requests and data requests

2018-06-18 Thread Lucas Wang
Hi All,

I've addressed a couple of comments in the discussion thread for KIP-291,
and
got no objections after making the changes. Therefore I would like to start
the voting thread.

KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-291%3A+Have+separate+queues+for+control+requests+and+data+requests

Thanks for your time!
Lucas


Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-06-15 Thread Lucas Wang
Hi Ted,

Thanks for the suggestion. I've updated the KIP. Please take another look.

Lucas

On Thu, Jun 14, 2018 at 6:34 PM, Ted Yu  wrote:

> Currently in KafkaConfig.scala :
>
>   val QueuedMaxRequests = 500
>
> It would be good if you can include the default value for this new config
> in the KIP.
>
> Thanks
>
> On Thu, Jun 14, 2018 at 4:28 PM, Lucas Wang  wrote:
>
> > Hi Ted, Dong
> >
> > I've updated the KIP by adding a new config, instead of reusing the
> > existing one.
> > Please take another look when you have time. Thanks a lot!
> >
> > Lucas
> >
> > On Thu, Jun 14, 2018 at 2:33 PM, Ted Yu  wrote:
> >
> > > bq.  that's a waste of resource if control request rate is low
> > >
> > > I don't know if control request rate can get to 100,000, likely not.
> Then
> > > using the same bound as that for data requests seems high.
> > >
> > > On Wed, Jun 13, 2018 at 10:13 PM, Lucas Wang 
> > > wrote:
> > >
> > > > Hi Ted,
> > > >
> > > > Thanks for taking a look at this KIP.
> > > > Let's say today the setting of "queued.max.requests" in cluster A is
> > > 1000,
> > > > while the setting in cluster B is 100,000.
> > > > The 100 times difference might have indicated that machines in
> cluster
> > B
> > > > have larger memory.
> > > >
> > > > By reusing the "queued.max.requests", the controlRequestQueue in
> > cluster
> > > B
> > > > automatically
> > > > gets a 100x capacity without explicitly bothering the operators.
> > > > I understand the counter argument can be that maybe that's a waste of
> > > > resource if control request
> > > > rate is low and operators may want to fine tune the capacity of the
> > > > controlRequestQueue.
> > > >
> > > > I'm ok with either approach, and can change it if you or anyone else
> > > feels
> > > > strong about adding the extra config.
> > > >
> > > > Thanks,
> > > > Lucas
> > > >
> > > >
> > > > On Wed, Jun 13, 2018 at 3:11 PM, Ted Yu  wrote:
> > > >
> > > > > Lucas:
> > > > > Under Rejected Alternatives, #2, can you elaborate a bit more on
> why
> > > the
> > > > > separate config has bigger impact ?
> > > > >
> > > > > Thanks
> > > > >
> > > > > On Wed, Jun 13, 2018 at 2:00 PM, Dong Lin 
> > wrote:
> > > > >
> > > > > > Hey Luca,
> > > > > >
> > > > > > Thanks for the KIP. Looks good overall. Some comments below:
> > > > > >
> > > > > > - We usually specify the full mbean for the new metrics in the
> KIP.
> > > Can
> > > > > you
> > > > > > specify it in the Public Interface section similar to KIP-237
> > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > 237%3A+More+Controller+Health+Metrics>
> > > > > > ?
> > > > > >
> > > > > > - Maybe we could follow the same pattern as KIP-153
> > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > 153%3A+Include+only+client+traffic+in+BytesOutPerSec+metric>,
> > > > > > where we keep the existing sensor name "BytesInPerSec" and add a
> > new
> > > > > sensor
> > > > > > "ReplicationBytesInPerSec", rather than replacing the sensor
> name "
> > > > > > BytesInPerSec" with e.g. "ClientBytesInPerSec".
> > > > > >
> > > > > > - It seems that the KIP changes the semantics of the broker
> config
> > > > > > "queued.max.requests" because the number of total requests queued
> > in
> > > > the
> > > > > > broker will be no longer bounded by "queued.max.requests". This
> > > > probably
> > > > > > needs to be specified in the Public Interfaces section for
> > > discussion.
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > > Dong
> > > > > >
> > > > > >
> > > > > > On Wed, Jun 13, 2018 at 12:45 PM, Lucas Wang <
> > lucasatu...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Kafka experts,
> > > > > > >
> > > > > > > I created KIP-291 to add a separate queue for controller
> > requests:
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-291%
> > > > > > > 3A+Have+separate+queues+for+control+requests+and+data+requests
> > > > > > >
> > > > > > > Can you please take a look and let me know your feedback?
> > > > > > >
> > > > > > > Thanks a lot for your time!
> > > > > > > Regards,
> > > > > > > Lucas
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-06-14 Thread Lucas Wang
Hi Ted, Dong

I've updated the KIP by adding a new config, instead of reusing the
existing one.
Please take another look when you have time. Thanks a lot!

Lucas

On Thu, Jun 14, 2018 at 2:33 PM, Ted Yu  wrote:

> bq.  that's a waste of resource if control request rate is low
>
> I don't know if control request rate can get to 100,000, likely not. Then
> using the same bound as that for data requests seems high.
>
> On Wed, Jun 13, 2018 at 10:13 PM, Lucas Wang 
> wrote:
>
> > Hi Ted,
> >
> > Thanks for taking a look at this KIP.
> > Let's say today the setting of "queued.max.requests" in cluster A is
> 1000,
> > while the setting in cluster B is 100,000.
> > The 100 times difference might have indicated that machines in cluster B
> > have larger memory.
> >
> > By reusing the "queued.max.requests", the controlRequestQueue in cluster
> B
> > automatically
> > gets a 100x capacity without explicitly bothering the operators.
> > I understand the counter argument can be that maybe that's a waste of
> > resource if control request
> > rate is low and operators may want to fine tune the capacity of the
> > controlRequestQueue.
> >
> > I'm ok with either approach, and can change it if you or anyone else
> feels
> > strong about adding the extra config.
> >
> > Thanks,
> > Lucas
> >
> >
> > On Wed, Jun 13, 2018 at 3:11 PM, Ted Yu  wrote:
> >
> > > Lucas:
> > > Under Rejected Alternatives, #2, can you elaborate a bit more on why
> the
> > > separate config has bigger impact ?
> > >
> > > Thanks
> > >
> > > On Wed, Jun 13, 2018 at 2:00 PM, Dong Lin  wrote:
> > >
> > > > Hey Luca,
> > > >
> > > > Thanks for the KIP. Looks good overall. Some comments below:
> > > >
> > > > - We usually specify the full mbean for the new metrics in the KIP.
> Can
> > > you
> > > > specify it in the Public Interface section similar to KIP-237
> > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 237%3A+More+Controller+Health+Metrics>
> > > > ?
> > > >
> > > > - Maybe we could follow the same pattern as KIP-153
> > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 153%3A+Include+only+client+traffic+in+BytesOutPerSec+metric>,
> > > > where we keep the existing sensor name "BytesInPerSec" and add a new
> > > sensor
> > > > "ReplicationBytesInPerSec", rather than replacing the sensor name "
> > > > BytesInPerSec" with e.g. "ClientBytesInPerSec".
> > > >
> > > > - It seems that the KIP changes the semantics of the broker config
> > > > "queued.max.requests" because the number of total requests queued in
> > the
> > > > broker will be no longer bounded by "queued.max.requests". This
> > probably
> > > > needs to be specified in the Public Interfaces section for
> discussion.
> > > >
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > > On Wed, Jun 13, 2018 at 12:45 PM, Lucas Wang 
> > > > wrote:
> > > >
> > > > > Hi Kafka experts,
> > > > >
> > > > > I created KIP-291 to add a separate queue for controller requests:
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-291%
> > > > > 3A+Have+separate+queues+for+control+requests+and+data+requests
> > > > >
> > > > > Can you please take a look and let me know your feedback?
> > > > >
> > > > > Thanks a lot for your time!
> > > > > Regards,
> > > > > Lucas
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-06-13 Thread Lucas Wang
Hi Ted,

Thanks for taking a look at this KIP.
Let's say today the setting of "queued.max.requests" in cluster A is 1000,
while the setting in cluster B is 100,000.
The 100 times difference might have indicated that machines in cluster B
have larger memory.

By reusing the "queued.max.requests", the controlRequestQueue in cluster B
automatically
gets a 100x capacity without explicitly bothering the operators.
I understand the counter argument can be that maybe that's a waste of
resource if control request
rate is low and operators may want to fine tune the capacity of the
controlRequestQueue.

I'm ok with either approach, and can change it if you or anyone else feels
strong about adding the extra config.

Thanks,
Lucas


On Wed, Jun 13, 2018 at 3:11 PM, Ted Yu  wrote:

> Lucas:
> Under Rejected Alternatives, #2, can you elaborate a bit more on why the
> separate config has bigger impact ?
>
> Thanks
>
> On Wed, Jun 13, 2018 at 2:00 PM, Dong Lin  wrote:
>
> > Hey Luca,
> >
> > Thanks for the KIP. Looks good overall. Some comments below:
> >
> > - We usually specify the full mbean for the new metrics in the KIP. Can
> you
> > specify it in the Public Interface section similar to KIP-237
> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 237%3A+More+Controller+Health+Metrics>
> > ?
> >
> > - Maybe we could follow the same pattern as KIP-153
> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 153%3A+Include+only+client+traffic+in+BytesOutPerSec+metric>,
> > where we keep the existing sensor name "BytesInPerSec" and add a new
> sensor
> > "ReplicationBytesInPerSec", rather than replacing the sensor name "
> > BytesInPerSec" with e.g. "ClientBytesInPerSec".
> >
> > - It seems that the KIP changes the semantics of the broker config
> > "queued.max.requests" because the number of total requests queued in the
> > broker will be no longer bounded by "queued.max.requests". This probably
> > needs to be specified in the Public Interfaces section for discussion.
> >
> >
> > Thanks,
> > Dong
> >
> >
> > On Wed, Jun 13, 2018 at 12:45 PM, Lucas Wang 
> > wrote:
> >
> > > Hi Kafka experts,
> > >
> > > I created KIP-291 to add a separate queue for controller requests:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-291%
> > > 3A+Have+separate+queues+for+control+requests+and+data+requests
> > >
> > > Can you please take a look and let me know your feedback?
> > >
> > > Thanks a lot for your time!
> > > Regards,
> > > Lucas
> > >
> >
>


Re: [DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-06-13 Thread Lucas Wang
Thanks for the comments, Dong.

I've updated the KIP and addressed your 3 comments.
Please take another look when you get a chance.

Lucas

On Wed, Jun 13, 2018 at 2:00 PM, Dong Lin  wrote:

> Hey Luca,
>
> Thanks for the KIP. Looks good overall. Some comments below:
>
> - We usually specify the full mbean for the new metrics in the KIP. Can you
> specify it in the Public Interface section similar to KIP-237
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 237%3A+More+Controller+Health+Metrics>
> ?
>
> - Maybe we could follow the same pattern as KIP-153
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 153%3A+Include+only+client+traffic+in+BytesOutPerSec+metric>,
> where we keep the existing sensor name "BytesInPerSec" and add a new sensor
> "ReplicationBytesInPerSec", rather than replacing the sensor name "
> BytesInPerSec" with e.g. "ClientBytesInPerSec".
>
> - It seems that the KIP changes the semantics of the broker config
> "queued.max.requests" because the number of total requests queued in the
> broker will be no longer bounded by "queued.max.requests". This probably
> needs to be specified in the Public Interfaces section for discussion.
>
>
> Thanks,
> Dong
>
>
> On Wed, Jun 13, 2018 at 12:45 PM, Lucas Wang 
> wrote:
>
> > Hi Kafka experts,
> >
> > I created KIP-291 to add a separate queue for controller requests:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-291%
> > 3A+Have+separate+queues+for+control+requests+and+data+requests
> >
> > Can you please take a look and let me know your feedback?
> >
> > Thanks a lot for your time!
> > Regards,
> > Lucas
> >
>


[DISCUSS] KIP-291: Have separate queues for control requests and data requests

2018-06-13 Thread Lucas Wang
Hi Kafka experts,

I created KIP-291 to add a separate queue for controller requests:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-291%3A+Have+separate+queues+for+control+requests+and+data+requests

Can you please take a look and let me know your feedback?

Thanks a lot for your time!
Regards,
Lucas


[jira] [Created] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-06-11 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-7040:
-

 Summary: The replica fetcher thread may truncate accepted messages 
during multiple fast leadership transitions
 Key: KAFKA-7040
 URL: https://issues.apache.org/jira/browse/KAFKA-7040
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Wang


Problem Statement:
Consider the scenario where there are two brokers, broker0, and broker1, and 
there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
the leader and broker0 as the follower. The following sequence of events 
happened on broker0

1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
broker1, and awaits to get the response
2. A LeaderAndISR request causes broker0 to become the leader for one partition 
t1p0, which in turn will remove the partition t1p0 from the replica fetcher 
thread
3. Broker0 accepts some messages from a producer
4. A 2nd LeaderAndISR request causes broker1 to become the leader, and broker0 
to become the follower for partition t1p0. This will cause the partition t1p0 
to be added back to the replica fetcher thread on broker0.
5. The replica fetcher thread on broker0 receives a response for the 
LeaderEpoch request issued in step 1, and truncates the accepted messages in 
step3.

The issue can be reproduced with the test from 
https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea

[1] Initially we set up broker0 to be the follower of two partitions instead of 
just one, to avoid the shutting down of the replica fetcher thread when it 
becomes idle.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6974) Changes the interaction between request handler threads and fetcher threads into an ASYNC model

2018-05-31 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-6974:
-

 Summary: Changes the interaction between request handler threads 
and fetcher threads into an ASYNC model
 Key: KAFKA-6974
 URL: https://issues.apache.org/jira/browse/KAFKA-6974
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


Problem Statement:
At LinkedIn, occasionally our clients complain about receiving consant 
NotLeaderForPartition exceptions 

Investigations:
For one investigated case, the cluster was going through a rolling bounce. And 
we saw there was a ~8 minutes delay between an old partition leader resigning 
and the new leader becoming active, based on entries of "Broker xxx handling 
LeaderAndIsr request" in the state change log.
Our monitoring shows the LeaderAndISR request local time during the incident 
went up to ~4 minutes.


Explanations:
One possible explanation of the ~8 minutes of delay is:
During controlled shutdown of a broker, the partitions whose leaders lie on the 
shutting down broker need to go through leadership transitions. And the 
controller process partitions in batches with each batch having 
config.controlledShutdownPartitionBatchSize partitions, e.g. 100.
If the 1st LeaderAndISR sent to a new leader broker takes too long, e.g. 4 
minutes, then the subsequent LeaderAndISR requests can have an accumulated 
delay of maybe 4 minutes, 8 minutes, or even 12 minutes... The reason is that 
subsequent LeaderAndISR requests are blocked in a muted channel, given only one 
LeaderAndISR request can be processed at a time with a 
maxInFlightRequestsPerConnection setting of 1. When that happens, no existing 
metric would show the total delay of 8 or 12 minutes for muted requests.
Now the question is why it took ~4 minutes for the the 1st LeaderAndISR request 
to finish.

Explanation for the ~4 minutes of local time for LeaderAndISR request:
During processing of an LeaderAndISR request, the request handler thread needs 
to add partitions to or remove partitions from partitionStates field of the 
ReplicaFetcherThread, also shutdown idle fetcher threads by checking the size 
of the partitionStates field. On the other hand, background fetcher threads 
need to iterate through all the partitions in partitionStates in order to build 
fetch request, and process fetch responses. The synchronization between request 
handler thread and the fetcher threads is done through a partitionMapLock. 
Specifically, the fetcher threads may acquire the partitionMapLock, and then 
calls the following functions for processing the fetch response
(1) processPartitionData, which in turn calls 
(2) Replica.maybeIncrementLogStartOffset, which calls 
(3) Log.maybeIncrementLogStartOffset, which calls 
(4) LeaderEpochCache.clearAndFlushEarliest.
Now two factors contribute to the long holding of the partitionMapLock,
1. function (4) above entails calling sync() to make sure data gets persistent 
to the disk, which may potentially have a long latency
2. All the 4 functions above can potentially be called for each partition in 
the fetch response, multiplying the sync() latency by a factor of n.

The end result is that the request handler thread got blocked for a long time 
trying to acquire the partitionMapLock of some fetcher inside 
AbstractFetcherManager.shutdownIdleFetcherThreads since checking each fetcher's 
partitionCount requires getting the partitionMapLock.

In our testing environment, we reproduced the problem and confirmed the 
explanation above with a request handler thread getting blocked for 10 seconds 
trying to acquire the partitionMapLock of one particular fetcher thread, while 
there are many log entries showing "Incrementing log start offset of 
partition..."

Proposed change:
We propose to change the interaction between the request handler threads and 
the fetcher threads to an ASYNC model by using an event queue. All requests to 
add or remove partitions, or shutdown idle fetcher threads are modeled as items 
in the event queue. And only the fetcher threads can take items out of the 
event queue and actually process them.
In the new ASYNC model, in order to be able to process an infinite sequence of 
FetchRequests, a fetcher thread initially has one FetchRequest, and after it's 
done processing one FetchRequest, it enqueues one more into its own event queue.
Also since the current AbstractFetcherThread logic is inherited by both the 
replica-fetcher-threads and the consumer-fetcher-threads for the old consumer, 
and the latter has been deprecated, we plan to implement the ASYNC model with a 
clean-slate approach, and only support the replica-fetcher-threads, in order to 
make the code cleaner.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6753) Speed up event processing on the controller

2018-04-05 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-6753:
-

 Summary: Speed up event processing on the controller 
 Key: KAFKA-6753
 URL: https://issues.apache.org/jira/browse/KAFKA-6753
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang
Assignee: Lucas Wang
 Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png

The existing controller code updates metrics after processing every event. This 
can slow down event processing on the controller tremendously. In one profiling 
we see that updating metrics takes nearly 100% of the CPU for the controller 
event processing thread. Specifically the slowness can be attributed to two 
factors:
1. Each invocation to update the metrics is expensive. Specifically trying to 
calculate the offline partitions count requires iterating through all the 
partitions in the cluster to check if the partition is offline; and calculating 
the preferred replica imbalance count requires iterating through all the 
partitions in the cluster to check if a partition has a leader other than the 
preferred leader. In a large cluster, the number of partitions can be quite 
large, all seen by the controller. Even if the time spent to check a single 
partition is small, the accumulation effect of so many partitions in the 
cluster can make the invocation to update metrics quite expensive. One might 
argue that maybe the logic for processing each single partition is not 
optimized, we checked the CPU percentage of leaf nodes in the profiling result, 
and found that inside the loops of collection objects, e.g. the set of all 
partitions, no single function dominates the processing. Hence the large number 
of the partitions in a cluster is the main contributor to the slowness of one 
invocation to update the metrics.
2. The invocation to update metrics is called many times when the is a high 
number of events to be processed by the controller, one invocation after 
processing any event.

The patch that will be submitted tries to fix bullet 2 above, i.e. reducing the 
number of invocations to update metrics. Instead of updating the metrics after 
processing any event, we only periodically check if the metrics needs to be 
updated, i.e. once every second. 
* If after the previous invocation to update metrics, there are other types of 
events that changed the controller’s state, then one second later the metrics 
will be updated. 
* If after the previous invocation, there has been no other types of events, 
then the call to update metrics can be bypassed.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6652) The controller should log failed attempts to transition a replica to OfflineReplica state if there is no leadership info

2018-04-04 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang resolved KAFKA-6652.
---
Resolution: Won't Fix

> The controller should log failed attempts to transition a replica to 
> OfflineReplica state if there is no leadership info
> 
>
> Key: KAFKA-6652
> URL: https://issues.apache.org/jira/browse/KAFKA-6652
> Project: Kafka
>  Issue Type: Improvement
>    Reporter: Lucas Wang
>    Assignee: Lucas Wang
>Priority: Minor
>
> In certain conditions, the controller's attempt to transition a replica to 
> OfflineReplica state could fail because there is no leadership info, e.g. the 
> condition described in 
> [KAFKA-6650|https://issues.apache.org/jira/browse/KAFKA-6650]. When that 
> happens, there should be logs to indicate the failed state transitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6612) Added logic to prevent increasing partition counts during topic deletion

2018-03-29 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang resolved KAFKA-6612.
---
Resolution: Fixed

> Added logic to prevent increasing partition counts during topic deletion
> 
>
> Key: KAFKA-6612
> URL: https://issues.apache.org/jira/browse/KAFKA-6612
> Project: Kafka
>  Issue Type: Improvement
>    Reporter: Lucas Wang
>    Assignee: Lucas Wang
>Priority: Major
>
> Problem: trying to increase the partition count of a topic while the topic 
> deletion is in progress can cause the topic to be never deleted.
> In the current code base, if a topic deletion is still in progress and the 
> partition count is increased,
> the new partition and its replica assignment be created on zookeeper as data 
> of the path /brokers/topics/.
> Upon detecting the change, the controller sees the topic is being deleted, 
> and therefore ignores the partition change. Therefore the zk path 
> /brokers/topics//partitions/ will NOT be created.
> If a controller switch happens next, the added partition will be detected by 
> the new controller and stored in the 
> controllerContext.partitionReplicaAssignment. The new controller then tries 
> to delete the topic by first transitioning its replicas to OfflineReplica. 
> However the transition to OfflineReplica state will NOT succeed since there 
> is no leader for the partition. Since the only state change path for a 
> replica to be successfully deleted is OfflineReplica -> 
> ReplicaDeletionStarted -> ReplicaDeletionSuccessful, not being able to enter 
> the OfflineReplica state means the replica can never be successfully deleted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6652) The controller should log failed attempts to transition a replica to OfflineReplica state

2018-03-13 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-6652:
-

 Summary: The controller should log failed attempts to transition a 
replica to OfflineReplica state
 Key: KAFKA-6652
 URL: https://issues.apache.org/jira/browse/KAFKA-6652
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang
Assignee: Lucas Wang


In certain conditions, the controller's attempt to transition a replica to 
OfflineReplica state could fail, e.g. the condition described in 
[KAFKA-6650|https://issues.apache.org/jira/browse/KAFKA-6650]. When that 
happens, there should be logs to indicate the failed state transitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6650) The controller should be able to handle a partially deleted topic

2018-03-13 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-6650:
-

 Summary: The controller should be able to handle a partially 
deleted topic
 Key: KAFKA-6650
 URL: https://issues.apache.org/jira/browse/KAFKA-6650
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Wang
Assignee: Lucas Wang


A previous controller could have deleted some partitions of a topic from ZK, 
but not all partitions, and then died.
In that case, the new controller should be able to handle the partially deleted 
topic, and finish the deletion.

In the current code base, if there is no leadership info for a replica's 
partition, the transition to OfflineReplica state for the replica will fail. 
Afterwards the transition to ReplicaDeletionStarted will fail as well since the 
only valid previous state for ReplicaDeletionStarted is OfflineReplica. 
Furthermore, it means the topic deletion will never finish.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6630) Speed up the processing of StopReplicaResponse events on the controller

2018-03-08 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-6630:
-

 Summary: Speed up the processing of StopReplicaResponse events on 
the controller
 Key: KAFKA-6630
 URL: https://issues.apache.org/jira/browse/KAFKA-6630
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Lucas Wang
Assignee: Lucas Wang


Problem Statement:
We find in a large cluster with many partition replicas, it takes a long time 
to successfully delete a topic. 

Root cause:
Further analysis shows that for a topic with N replicas, the controller 
receives all the N StopReplicaResponses from brokers within a short time, 
however sequentially handling all the N 
TopicDeletionStopReplicaResponseReceived events one by one takes a long time.

Specifically the functions triggered while handling every single 
TopicDeletionStopReplicaResponseReceived event include:
TopicDeletionStopReplicaResponseReceived.process calls 
TopicDeletionManager.completeReplicaDeletion, which calls 
TopicDeletionManager.resumeDeletions, which calls several inefficient functions.

The inefficient functions called inside TopicDeletionManager.resumeDeletions 
include
ReplicaStateMachine.areAllReplicasForTopicDeleted
ReplicaStateMachine.isAtLeastOneReplicaInDeletionStartedState
ReplicaStateMachine.replicasInState

Each of the 3 inefficient functions above will iterate through all the replicas 
in the cluster, and filter out the replicas belonging to a topic. In a large 
cluster with many replicas, these functions can be quite slow. 

Total deletion time for a topic becomes long in single threaded controller 
processing model:
Since the controller needs to sequentially process the queued 
TopicDeletionStopReplicaResponseReceived events, if the time cost to process 
one event is t, the total time to process all events for all replicas of a 
topic is N * t.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6612) Added logic to prevent increasing partition counts during topic deletion

2018-03-05 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-6612:
-

 Summary: Added logic to prevent increasing partition counts during 
topic deletion
 Key: KAFKA-6612
 URL: https://issues.apache.org/jira/browse/KAFKA-6612
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


Problem: trying to increase the partition count of a topic while the topic 
deletion is in progress can cause the topic to be never deleted.

In the current code base, if a topic deletion is still in progress and the 
partition count is increased,
the new partition and its replica assignment be created on zookeeper as data of 
the path /brokers/topics/.
Upon detecting the change, the controller sees the topic is being deleted, and 
therefore ignores the partition change. Therefore the zk path 
/brokers/topics//partitions/ will NOT be created.

If a controller switch happens next, the added partition will be detected by 
the new controller and stored in the 
controllerContext.partitionReplicaAssignment. The new controller then tries to 
delete the topic by first transitioning its replicas to OfflineReplica. However 
the transition to OfflineReplica state will NOT succeed since there is no 
leader for the partition. Since the only state change path for a replica to be 
successfully deleted is OfflineReplica -> ReplicaDeletionStarted -> 
ReplicaDeletionSuccessful, not being able to enter the OfflineReplica state 
means the replica can never be successfully deleted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-01-24 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-6481:
-

 Summary: Improving performance of the function 
ControllerChannelManager.addUpdateMetadataRequestForBrokers
 Key: KAFKA-6481
 URL: https://issues.apache.org/jira/browse/KAFKA-6481
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
    ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
    ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
    inside a for-loop for each partition 
ReplicaStateMachine.doHandleStateChanges
ReplicaStateMachine.handleStateChanges
KafkaController.onReplicasBecomeOffline
KafkaController.onBrokerFailure


How to reproduce the problem:
1. Cretae a cluster with 2 brokers having id 1 and 2
2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.
4. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`
5. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
6. Verify that the following log message appear over 200 times in the 
controller.log file, one for each iteration of the a0 partitions
 "Leader not yet assigned for partition [a0,..]. Skip sending 
UpdateMetadataRequest."
 
 What happened was 
 1. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
 2. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) entries of the logs above.

After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
partitions.
Also I've verified that topic deletion for topic a1 still works fine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)