Hi Jun, Yes, youre right - right now the next select() call will return immediately with the same set of keys as earlier (at least) as they were not previously handled (no memory). My assumption is that this happens under considerable load - something has to be occupying all this memory. also, this happens in the context of SocketServer.Processor.run():
while (isRunning) { configureNewConnections() processNewResponses() poll() <------ HERE processCompletedReceives() processCompletedSends() processDisconnected() } even within poll(), things like finishConnection(), prepare(), and write()s can still make progress under low memory conditions. and given the load, there's probably progress to be made in processCompletedReceives(), processCompletedSends() and processDisconnected(). if there's progress to be made in other things its likely that the next call to poll() will not happen immediately and so the loop wont be that tight. in order for this to devolve into true busy waiting you would need a situation where no progress can be made on any in-progress requests and no responses to send out ? if my assumption does not hold then you are correct, and selector.poll(300) currently hardcoded in SocketServer.Processor.poll() would need to be replaced with something more complicated. my biggest point of concern though is that the resulting code would be complicated and would couple Selector to the memory pool very tightly. undey my current patch Selector needs the memory pool only to pass to channels when they are built. this would allow different memory pools relatively easily for things like reserving memory for cross-broker replication and high-SLA connections. a tighter coupling would make any such future modification hard. On Sun, Sep 11, 2016 at 10:37 AM, Jun Rao <j...@confluent.io> wrote: > Hi, Radai, > > Thanks for the reply. I still have a followup question on #2. > > My understanding is that in your proposal, selector will now first read the > size of the Receive. If there is not enough memory, it has to turn off the > READ interest bit for the corresponding KafkaChannel. Otherwise, subsequent > selector.poll() call will always return immediately, adding unnecessary > overhead. If you do that, the Selector will need to know when to turn on > the READ interest bit again. It may not be enough to do this check until > the next poll call since the timeout used by poll() could be arbitrarily > large. So, it seems that some kind of coordination between the Selector and > the bufferpool is needed? > > Jun > > On Thu, Sep 8, 2016 at 7:02 PM, radai <radai.rosenbl...@gmail.com> wrote: > > > Hi Jun, > > > > 1. yes, it is my own personal opinion that people use queued.max.requests > > as an indirect way to bound memory consumption. once a more direct memory > > bound mechanism exists (and works) i dont think queued.max.requests woul > > dbe required. having said that I was not planning on making any changes > > w.r.t queued.max.requests support (so I was aiming to get to a situation > > where both configs are supported) to allow gathering enough > data/feedback. > > > > 2. Selector.poll() calls into KafkaChannel.read() to maybe get a > > NetworkReceive. multiple such read() calls may be required until a > Receive > > is produced already in the current code base. my pool implementation is > > non-blocking so if there's no memory available the read() call will > return > > null. poll() would then move on to try and service other selection keys. > > the pool will be checked for available memory again the next time the > > SocketServer.run() loop gets to poll(). and so right now I dont > communicate > > memory becoming available to the selector - it will just go on to try and > > make progress elsewhere and come back again. i never block it or send it > to > > sleep. I think for efficiency what could maybe be done is if there's not > > enough memory to service a readable selection key we may want to skip all > > other read-ready selection keys for that iteration of > pollSelectionKeys(). > > that would require rather invasive changes around > > Selector.pollSelectionKeys() that I'd rather avoid. also different > > KafkaChannels may be backed by different memory pool (under some sort of > > future QoS scheme?), which would complicate such an optimization further. > > > > 3. i added the pool interface and implementation under > kafka.common.memory, > > and the API is "thin" enough to be generally useful (currently its > > non-blocking only, but a get(long maxWait) is definitely doable). having > > said that, I'm not really familiar enough with the code to say.... > > > > > > > > On Fri, Sep 2, 2016 at 2:04 PM, Jun Rao <j...@confluent.io> wrote: > > > > > Hi, Radi, > > > > > > Thanks for the update. At the high level, this looks promising. A few > > > comments below. > > > > > > 1. If we can bound the requests by bytes, it seems that we don't need > > > queued.max.requests > > > any more? Could we just deprecate the config and make the queue size > > > unbounded? > > > 2. How do we communicate back to the selector when some memory is freed > > up? > > > We probably need to wake up the selector. For efficiency, perhaps we > only > > > need to wake up the selector if the bufferpool is full? > > > 3. We talked about bounding the consumer's memory before. To fully > > support > > > that, we will need to bound the memory used by different fetch > responses > > in > > > the consumer. Do you think the changes that you propose here can be > > > leveraged to bound the memory in the consumer as well? > > > > > > Jun > > > > > > > > > On Tue, Aug 30, 2016 at 10:41 AM, radai <radai.rosenbl...@gmail.com> > > > wrote: > > > > > > > My apologies for the delay in response. > > > > > > > > I agree with the concerns about OOM reading from the actual sockets > and > > > > blocking the network threads - messing with the request queue itself > > > would > > > > not do. > > > > > > > > I propose instead a memory pool approach - the broker would have a > non > > > > blocking memory pool. upon reading the first 4 bytes out of a socket > an > > > > attempt would be made to acquire enough memory and if that attempt > > fails > > > > the processing thread will move on to try and make progress with > other > > > > tasks. > > > > > > > > I think Its simpler than mute/unmute because using mute/unmute would > > > > require differentiating between sockets muted due to a request in > > > progress > > > > (normal current operation) and sockets muted due to lack of memory. > > > sockets > > > > of the 1st kind would be unmuted at the end of request processing (as > > it > > > > happens right now) but the 2nd kind would require some sort of > "unmute > > > > watchdog" which is (i claim) more complicated than a memory pool. > also > > a > > > > memory pool is a more generic solution. > > > > > > > > I've updated the KIP page ( > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests) > > > > to reflect the new proposed implementation, and i've also put up an > > > inital > > > > implementation proposal on github - > > > > https://github.com/radai-rosenblatt/kafka/commits/broker-memory-pool > . > > > the > > > > proposed code is not complete and tested yet (so probably buggy) but > > does > > > > include the main points of modification. > > > > > > > > the specific implementation of the pool on that branch also has a > built > > > in > > > > safety net where memory that is acquired but not released (which is a > > > bug) > > > > is discovered when the garbage collector frees it and the capacity is > > > > reclaimed. > > > > > > > > On Tue, Aug 9, 2016 at 8:14 AM, Jun Rao <j...@confluent.io> wrote: > > > > > > > > > Radi, > > > > > > > > > > Yes, I got the benefit of bounding the request queue by bytes. My > > > concern > > > > > is the following if we don't change the behavior of processor > > blocking > > > on > > > > > queue full. > > > > > > > > > > If the broker truly doesn't have enough memory for buffering > > > outstanding > > > > > requests from all connections, we have to either hit OOM or block > the > > > > > processor. Both will be bad. I am not sure if one is clearly better > > > than > > > > > the other. In this case, the solution is probably to expand the > > cluster > > > > to > > > > > reduce the per broker request load. > > > > > > > > > > If the broker actually has enough memory, we want to be able to > > > configure > > > > > the request queue in such a way that it never blocks. You can tell > > > people > > > > > to just set the request queue to be unbounded, which may scare > them. > > If > > > > we > > > > > do want to put a bound, it seems it's easier to configure the queue > > > size > > > > > based on # requests. Basically, we can tell people to set the queue > > > size > > > > > based on number of connections. If the queue is based on bytes, > it's > > > not > > > > > clear how people should set it w/o causing the processor to block. > > > > > > > > > > Finally, Rajini has a good point. The ByteBuffer in the request > > object > > > is > > > > > allocated as soon as we see the first 4 bytes from the socket. So, > I > > am > > > > not > > > > > sure if just bounding the request queue itself is enough to bound > the > > > > > memory related to requests. > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > On Mon, Aug 8, 2016 at 4:46 PM, radai <radai.rosenbl...@gmail.com> > > > > wrote: > > > > > > > > > > > I agree that filling up the request queue can cause clients to > time > > > out > > > > > > (and presumably retry?). However, for the workloads where we > expect > > > > this > > > > > > configuration to be useful the alternative is currently an OOM > > crash. > > > > > > In my opinion an initial implementation of this feature could be > > > > > > constrained to a simple drop-in replacement of ArrayBlockingQueue > > > > > > (conditional, opt-in) and further study of behavior patterns > under > > > load > > > > > can > > > > > > drive future changes to the API later when those behaviors are > > better > > > > > > understood (like back-pressure, nop filler responses to avoid > > client > > > > > > timeouts or whatever). > > > > > > > > > > > > On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh Gharat < > > > > > > gharatmayures...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Nice write up Radai. > > > > > > > I think what Jun said is a valid concern. > > > > > > > If I am not wrong as per the proposal, we are depending on the > > > entire > > > > > > > pipeline to flow smoothly from accepting requests to handling > it, > > > > > calling > > > > > > > KafkaApis and handing back the responses. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Mayuresh > > > > > > > > > > > > > > > > > > > > > On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy < > jjkosh...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > . > > > > > > > > >> > > > > > > > > >> > > > > > > > > > Hi Becket, > > > > > > > > > > > > > > > > > > I don't think progress can be made in the processor's run > > loop > > > if > > > > > the > > > > > > > > > queue fills up. i.e., I think Jun's point is that if the > > queue > > > is > > > > > > full > > > > > > > > > (either due to the proposed max.bytes or today due to > > > > max.requests > > > > > > > > hitting > > > > > > > > > the limit) then processCompletedReceives will block and no > > > > further > > > > > > > > progress > > > > > > > > > can be made. > > > > > > > > > > > > > > > > > > > > > > > > > I'm sorry - this isn't right. There will be progress as long > as > > > the > > > > > API > > > > > > > > handlers are able to pick requests off the request queue and > > add > > > > the > > > > > > > > responses to the response queues (which are effectively > > > unbounded). > > > > > > > > However, the point is valid that blocking in the request > > > channel's > > > > > put > > > > > > > has > > > > > > > > the effect of exacerbating the pressure on the socket server. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > >> On Mon, Aug 8, 2016 at 10:04 AM, Jun Rao < > j...@confluent.io> > > > > > wrote: > > > > > > > > >> > > > > > > > > >> > Radai, > > > > > > > > >> > > > > > > > > > >> > Thanks for the proposal. A couple of comments on this. > > > > > > > > >> > > > > > > > > > >> > 1. Since we store request objects in the request queue, > > how > > > do > > > > > we > > > > > > > get > > > > > > > > an > > > > > > > > >> > accurate size estimate for those requests? > > > > > > > > >> > > > > > > > > > >> > 2. Currently, it's bad if the processor blocks on > adding a > > > > > request > > > > > > > to > > > > > > > > >> the > > > > > > > > >> > request queue. Once blocked, the processor can't process > > the > > > > > > sending > > > > > > > > of > > > > > > > > >> > responses of other socket keys either. This will cause > all > > > > > clients > > > > > > > in > > > > > > > > >> this > > > > > > > > >> > processor with an outstanding request to eventually > > timeout. > > > > > > > > Typically, > > > > > > > > >> > this will trigger client-side retries, which will add > more > > > > load > > > > > on > > > > > > > the > > > > > > > > >> > broker and cause potentially more congestion in the > > request > > > > > queue. > > > > > > > > With > > > > > > > > >> > queued.max.requests, to prevent blocking on the request > > > queue, > > > > > our > > > > > > > > >> > recommendation is to configure queued.max.requests to be > > the > > > > > same > > > > > > as > > > > > > > > the > > > > > > > > >> > number of socket connections on the broker. Since the > > broker > > > > > never > > > > > > > > >> > processes more than 1 request per connection at a time, > > the > > > > > > request > > > > > > > > >> queue > > > > > > > > >> > will never be blocked. With queued.max.bytes, it's going > > to > > > be > > > > > > > harder > > > > > > > > to > > > > > > > > >> > configure the value properly to prevent blocking. > > > > > > > > >> > > > > > > > > > >> > So, while adding queued.max.bytes is potentially useful > > for > > > > > memory > > > > > > > > >> > management, for it to be truly useful, we probably need > to > > > > > address > > > > > > > the > > > > > > > > >> > processor blocking issue for it to be really useful in > > > > practice. > > > > > > One > > > > > > > > >> > possibility is to put back-pressure to the client when > the > > > > > request > > > > > > > > >> queue is > > > > > > > > >> > blocked. For example, if the processor notices that the > > > > request > > > > > > > queue > > > > > > > > is > > > > > > > > >> > full, it can turn off the interest bit for read for all > > > socket > > > > > > keys. > > > > > > > > >> This > > > > > > > > >> > will allow the processor to continue handling responses. > > > When > > > > > the > > > > > > > > >> request > > > > > > > > >> > queue has space again, it can indicate the new state to > > the > > > > > > process > > > > > > > > and > > > > > > > > >> > wake up the selector. Not sure how this will work with > > > > multiple > > > > > > > > >> processors > > > > > > > > >> > though since the request queue is shared across all > > > > processors. > > > > > > > > >> > > > > > > > > > >> > Thanks, > > > > > > > > >> > > > > > > > > > >> > Jun > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > On Thu, Aug 4, 2016 at 11:28 AM, radai < > > > > > > radai.rosenbl...@gmail.com> > > > > > > > > >> wrote: > > > > > > > > >> > > > > > > > > > >> > > Hello, > > > > > > > > >> > > > > > > > > > > >> > > I'd like to initiate a discussion about > > > > > > > > >> > > https://cwiki.apache.org/ > confluence/display/KAFKA/KIP- > > > > > > > > >> > > 72%3A+Allow+Sizing+Incoming+Request+Queue+in+Bytes > > > > > > > > >> > > > > > > > > > > >> > > The goal of the KIP is to allow configuring a bound on > > the > > > > > > > capacity > > > > > > > > >> (as > > > > > > > > >> > in > > > > > > > > >> > > bytes of memory used) of the incoming request queue, > in > > > > > addition > > > > > > > to > > > > > > > > >> the > > > > > > > > >> > > current bound on the number of messages. > > > > > > > > >> > > > > > > > > > > >> > > This comes after several incidents at Linkedin where a > > > > sudden > > > > > > > > "spike" > > > > > > > > >> of > > > > > > > > >> > > large message batches caused an out of memory > exception. > > > > > > > > >> > > > > > > > > > > >> > > Thank you, > > > > > > > > >> > > > > > > > > > > >> > > Radai > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > -Regards, > > > > > > > Mayuresh R. Gharat > > > > > > > (862) 250-7125 > > > > > > > > > > > > > > > > > > > > > > > > > > > >