Hi, Lucas,

2. Good point about not knowing the request type in memory pool. Looking at
the implementation. It seems that queued.max.request.bytes is orthogonal to
queued.max.requests. So, this seems fine.

3. The implementation that you suggested sounds good. It would be useful
not to unnecessarily delay the processing of a request up to 300ms. I was
thinking that we could have RequestChannel manage a Lock and a couple of
Conditions and have sendRequest()/receiveRequest() coordinate on the lock
and the conditions (similar to how ArrayBlockingQueue is implemented). This
way, any new request can wake up the blocked request handling threads
immediately.

Thanks,

Jun


On Fri, Jun 29, 2018 at 4:53 PM, Lucas Wang <lucasatu...@gmail.com> wrote:

> Hi Jun,
>
> Thanks for your comments.
> 1. I just replied in the discussion thread about the positive change this
> KIP can still bring
> if implemented on the latest trunk, which includes the async ZK operations
> for KAFKA-5642.
> The evaluation is done using an integration test.
> In production, we have not upgraded to Kafka 1.1 yet, and the code we are
> currently running does
> not include async ZK operations, therefore I don't have any real usage
> result.
>
> 2. Thanks for bringing this up. I haven't considered this setting, and the
> existing proposal in this KIP
> would make data requests and controller requests share a memory poll of
> size specified by the config
> queued.max.request.bytes. The downside is that if there is memory pressure,
> controller requests may be blocked
> from being read from a socket and does not get prioritized at the socket
> layer.
>
> If we have a separate bytes limit for the controller requests, I imagine
> there would be a separate memory pool
> dedicated to controller requests. Also it requires the processors to tell
> connections from a controller apart
> from connections from other brokers or clients, which would probably
> require a dedicated port for the controller?
> IMO, this change is mainly driven by the memory pressure, kind of an
> orthogonal issue, and we can address it with a separate KIP
> if desired. Please let me know what you think.
>
> 3. I plans to change the implementation of the method
> receiveRequest(timeout: Long) in the RequestChannel class as follows:
>
> val controllerRequest = controllerRequestQueue.poll()
> if (controllerRequest != null) {
>   controllerRequest
> } else {
>   dataRequestQueue.poll(timeout, TimeUnit.MILLISECONDS)
> }
>
> with this implementation, there is no need to explicitly choose a request
> handler thread to wake up depending on
> the types of request enqueued, and if a controller request arrives while
> some request handler threads are blocked on an empty data request queue,
> they will simply timeout and call the receiveRequest method again.
>
> In terms of performance, it means that in the worst case, for a controller
> request that just missed the receiveRequest call, it can be delayed for as
> long as
> the timeout parameter, which is hard coded to be 300 milliseconds. If there
> is just one request handler thread, the average delay is
> 150 milliseconds assuming the chance of a controller request arriving at
> any particular time is the same. With N request handler threads,
> the average delay is 150/N milliseconds, which does not seem to be a
> problem.
>
> We have considered waking up of request handler threads based on which
> queue the request handler threads are blocked,
> and that design was turned down because of its complexity. The design can
> be found at here
> <https://cwiki.apache.org/confluence/display/KAFKA/Old+contr
> oller+request+queue+design>
> .
>
> If you mean a general purpose priority queue such as the
> java.util.PriorityQueue, we also have considered it and turned down the
> design because
> - The readily available class java.util.PriorityQueue is unbounded and
> we'll need to implement a bounded version
> - We would still like to have the FIFO semantics on both the controller
> request queue and data request queue, which conceptually does not fit very
> well
> with a general purpose priority queue, e.g. we would probably need to use
> the enqueue time to enforce FIFO semantics.
> - A typical operation on the priority queue is O(log n), whereas the sample
> implementation above gives O(1) performance regardless of the size of both
> queues.
>
> 4. For the two APIs sendRequest and receiveRequest, since we are only
> changing their implementation, not the API itself
> the two metrics will support two queues and the meaning of "Idle" still
> holds:
>
>
>
>
>
>
> *Before this KIPAfter this KIPNetworkProcessorAvgIdlePercentidle = blocked
> on selectnot idle includes being blocked on requestQueueidle = blocked on
> selectnot idle includes being blocked on either controller request queue or
> data request queueRequestHandlerAvgIdlePercentidle = blocked on reading
> from requestQueue idle = taking a request from the controller request
> queue, or blocked on reading from the data request queue*
>
> Regards,
> Lucas
>
> On Fri, Jun 29, 2018 at 11:22 AM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Lucas,
> >
> > Thanks for the KIP. A few comments below.
> >
> > 1. As Eno mentioned in the discussion thread, I am wondering how much of
> > this is still an issue after KAFKA-5642. With that fix, the requests from
> > the controller to the brokers are batched in all the common cases. Have
> you
> > deployed Kafka 1.1? What's the request queue time and the request queue
> > size that you have observed in production?
> >
> > 2. For the request queue, currently we can also bound it by size
> > through queued.max.request.bytes. Should we consider the same for the
> > control queue?
> >
> > 3. Implementation wise, currently the request handler threads just block
> on
> > the request queue when the queue is empty. With two queues, it seems that
> > we need to wake up a request handler thread blocked on one queue, when
> > another queue gets a request? Have we considered just making the request
> > queue a priority queue?
> >
> > 4. Related to 3, currently we have 2
> > metrics  NetworkProcessorAvgIdlePercent and RequestHandlerAvgIdlePercent
> > that measure the utilization of the network and the request handler
> thread
> > pools. They are computed by measuring the amount of time waiting on the
> > request queue. Will these 2 metrics be extended to support 2 request
> > queues.
> >
> > Jun
> >
> >
> > On Mon, Jun 18, 2018 at 1:04 PM, Lucas Wang <lucasatu...@gmail.com>
> wrote:
> >
> > > Hi All,
> > >
> > > I've addressed a couple of comments in the discussion thread for
> KIP-291,
> > > and
> > > got no objections after making the changes. Therefore I would like to
> > start
> > > the voting thread.
> > >
> > > KIP:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-291%
> > > 3A+Have+separate+queues+for+control+requests+and+data+requests
> > >
> > > Thanks for your time!
> > > Lucas
> > >
> >
>

Reply via email to