I haven't gotten a chance to fully digest this thread but here's the cheat sheet:
* It's tempting to make channel ops blocking and event driven upon event arrival. This creates a nasty error handling / timed event competition[1] case that OG choked on constantly. Until there's an obvious solution (I have some ideas) I'd prefer to leave it. * Even if we did this, you'd potentially destroy performance (make it worse) because you'd fire immediately upon event arrival which can reduce batch sizes; RPC accounts for significantly more overhead over a steady stream of events than sleeping while events accumulate and polling again in a second (or 500ms - whatever it is). We're aiming for global throughput, not immediate latency compensation at the micro-level (although I'm happy to revisit that, people will *not* be happy with performance, though - I did a bunch of early testing here). * Standardization is good. * JDBCChannel can block (due to transaction semantics; those block potentially indefinitely). [1] The "timed event competition" problem is, for instance, situations such as rolling file handles while blocking on events. In OG, the HDFS sink would block on event arrival but it desperately wanted to rotate the file handle every N seconds. If there was a period of N+1 seconds with no events, the code descended into thread interruption voodoo that never worked right. Now, one can deal with this by using intricate bit flipping (i.e. keeping state about the number of events received since last poll and skipping rotation if nothing has happened) but this tends to kick the can down the road on errors. For instance, if a DN failed and the connection to HDFS becomes invalid, the fact that you didn't rotate means you didn't catch the error when it happened (or close to it) and instead, catch it N+M seconds later when you do receive an event and immediately catch an exception on write, blah blah blah. It's all very yucky. I promise to give this more attention. This is just because I skimmed and saw "why not make it blocking" and I panicked. :) On Mon, Feb 27, 2012 at 6:51 PM, Arvind Prabhakar <[email protected]>wrote: > On Mon, Feb 27, 2012 at 6:33 PM, Brock Noland <[email protected]> wrote: > > > Hi, > > > > Thank you everyone for you comments! My comments are inline. > > > > On Tue, Feb 28, 2012 at 1:13 AM, Arvind Prabhakar <[email protected]> > > wrote: > > > My bad for not chiming on this thread soon enough. When we laid out > > > the initial architecture, the following assumptions were made and I > > > still think that most of them are valid: > > > > > > 1. Sources doing put() on channel should relay back any exceptions > > > they receive from the channel. They should not die or become invalid > > > due to this. If they do, it is more of a bug in source implementation. > > > > > > 2. Channels must respect capacity. This is vital for operators to > > > ensure that they can size a system without overwhelming it. Both mem > > > and jdbc channels support size specification at this time. > > > > yes with a recent commit it looks like MemoryChannel was changed to > > work like JDBCChannel and throws an exception if full. > > > > > > > > 3. Channels should never block. This is to ensure that there is no > > > scope of threads deadlocking within the agent due to bugs or invalid > > > state of the system. The chosen alternative to blocking was the notion > > > of the sink runner which will honor backoff strategy when necessary. > > > Consequently the implementation of sink should send the correct signal > > > to the runner in case it is not able to take events from the channel > > > or deliver events to the downstream destination. > > > > MemoryChannel.take blocks: > > > > if(!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) { > > > > This is perhaps needed to fine tune the memory channel throughput in a > highly concurrent system. For example, if multiple threads are > reading/writing to the channel, it is possible that the channel may not get > handle on the necessary locks in every request - hence a basic timeout > mechanism would help in ensuring reliability of the channel. I do think > though that the default of 3 seconds is a bit longish and should instead be > more like 100ms or something. > > > > > > JDBCChannel does not. Meaning that JDBCChannel + HDFSEventSink > > consumes an entire CPU when no events are flowing through the system. > > It sounds like HDFSEventSink should be returning BACKOFF when the > > channel returns null? > > > > Correct. The idea is that any logic common to flow > handling/throttling should be implemented by the runner and not by the > individual sink or channel. That will likely not be as sophisticated as > what one can do within a specialized implementation, but certainly serves > the purpose and keeps things uniform and simple. > > > > > > My main concern with this strategy is that SinkRunner is going to > > sleep for some number of seconds regardless of events being pushed to > > the channel. Users that have very spiky flows have two options: > > > > 1) Turn the sleep down considerably which burns unneeded CPU > > 2) Size the channel large enough to handle the largest spike during > > the specified sleep time > > > > Shouldn't sink runner be event driven and start processing events as > > soon as they arrive on the channel? > > > > This does make a lot of sense. However, one of the immediate trade offs of > the flume architecture was to specifically sacrifice blocking behavior in > favor of implementing simple semantics. It is very possible that as flume > gets adopted in the field we realize that this is more of a pressing need > than what we expected it to ben then it will surely get prioritized. > > Thanks, > Arvind > > > > > > Cheers! > > Brock > > > > > > > > At some point in time, when we have the basic implementation of Flume > > > working in production to validate all of these semantics, we can start > > > discussion on how best these semantics can change to accommodate any > > > new findings that we discover in the field. > > > > > > Thanks, > > > Arvind Prabhakar > > > > > > On Mon, Feb 27, 2012 at 11:30 AM, Prasad Mujumdar < > [email protected]> > > wrote: > > >> IMO the blocking vs wait time should be an attribute of the flow and > > not > > >> individual component. Perhaps each source/sink/channel should make it > > >> configurable (with consistent default) so that it it can be tweaked > per > > the > > >> use case. The common attributes like timeout, capacity can be standard > > >> configurations that each component should support wherever possible. > > >> > > >> @Brock, I will try to include the relevant conclusions of this > > discussion > > >> in the dev guide. > > >> > > >> thanks > > >> Prasad > > >> > > >> > > >> On Mon, Feb 27, 2012 at 7:35 AM, Peter Newcomb <[email protected] > > >wrote: > > >> > > >>> Juhani, FWIW I agree with most of what you described, based on my > > reading > > >>> and use of the codebase. Brock, I agree that these things are not > yet > > >>> adequately documented--especially in terms of Javadocs for the main > > >>> interfaces: Source, Channel, and Sink. Also, there is enough > variation > > >>> among the various implementations of these interfaces to lead to > > ambiguous > > >>> interpretation. > > >>> > > >>> One thing I wanted to comment on specifically is Juhani's statement > > about > > >>> channel capacity: > > >>> > > >>> > Channels: > > >>> > - Only memory channels have a capacity, but when that is exceeded > > >>> > ChannelException seems a clearcut reaction > > >>> > > >>> Before your recent refactoring of MemoryChannel, put() would block > > >>> indefinitely if the queue was at capacity--are you suggesting that > > this was > > >>> incorrect behavior that should not be allowed? Or just that any such > > >>> blocking should have a finite duration (similar to take() > keep-alive), > > and > > >>> throw ChannelException upon timeout? > > >>> > > >>> Also, other channels may well have implicit capacities, for instance > > >>> available space in a database or filesystem partition, though I agree > > that > > >>> ChannelException would be appropriate in those cases. > > >>> > > >>> -peter > > >>> > > > > > > > > -- > > Apache MRUnit - Unit testing MapReduce - > > http://incubator.apache.org/mrunit/ > > > -- Eric Sammer twitter: esammer data: www.cloudera.com
