Hi, Rob and Nimi. I have watched this thread this days. I have several questions about the deadlock situation of GreedyFilter.
1. Which channel is fully consumed and get frozen when deadlock occurs? I thought it was router.InChan and injectRecycleChan because Inject called by Input Plugin will deliver the pack into forunner.InChan while at the same time Filter Inject pack into forunner.InChan too. forunner.InChan maybe fully occupied because GreedyFilter has no chance take pack out of forunner.InChan when it is flushing. But Inject() of a filter will run a goroutine to do acctually inject operation and then GreedyFilter.flush() will return and get pack from forunner.InChan eventually. So I don't know where the deadlock happens. 2. When deadlock happens why can "everybody calls Recycle or Inject" and the pool size is still exhausted? Thank you all. On Fri, Aug 1, 2014 at 1:40 AM, Rob Miller <[email protected]> wrote: > For this case it sounds like maybe you should be doing the work in a > decoder instead of a filter. Decoders can generate more than one output > message per input message, and decoding happens before the router is hit, > so any back-pressure that is generated only impacts the relevant input, not > the entire pipeline. You might have to bump the pool size, though, esp if > you use a SandboxDecoder, b/c a SandboxDecoder will consume all of the > packs it needs before flushing any of them to the router. A decoder written > in Go can inject each message as it's generated. > > If you already have a decoder in play, then you can try using a > MultiDecoder, or if you really need to squeeze performance out of it you > can write a single decoder that integrates the functionality of what you're > already using and the additional work you need to do into one operation. > > Does this seem reasonable? > > -r > > > > On 07/31/2014 10:22 AM, Nimi Wariboko Jr wrote: > >> We ran into this issue again in another filter. This time we have an >> HttpInput plugin that flushes out to a PluginFilter that takes each >> incoming message, does some regex matching and created 8-16 new >> PipelinePacks. Under load of about ~1,000 http requests/s heka ends up >> in a deadlock in the same location. Whats really undesirable here is >> that once the lock happens, even if load goes down Heka is still frozen. >> >> I'd like to avoid creating a goroutine for every request here, so what >> we ended up doing is just routing directly to the next plugin inside the >> filter. The only downside here is that we lose the ability to change the >> routes without recompiling (for example if we wanted to output to >> dashboard as well, or another plugin). >> >> I only bring this up to ask that maybe its worth giving each Filter >> their own dedicated message router. Filters seem to be unique position >> because they can be routed messages as well as route their own messages, >> they can starve out other plugins from using the message router. What do >> you think? >> >> >> On Wed, Jul 30, 2014 at 11:29 AM, Rob Miller <[email protected] >> <mailto:[email protected]>> wrote: >> >> Yep, this behavior sounds like what I would expect. Your flush >> operation takes a long time, so if you've got a lot of data flowing >> through the pipeline things are going to back up if the filter >> blocks while the flush is happening. Your solution of performing the >> flush in a separate goroutine so you can keep processing the >> incoming messages is a fine one. >> >> -r >> >> >> >> On Sat 26 Jul 2014 03:15:11 PM PDT, Nimi Wariboko Jr. wrote: >> >> Hi Rob, >> >> Thanks a lot for the detailed explanation. We attempted at first >> to >> increase the pack pool, but ran into the issue again after a >> longer >> period of time. When we switched to the goroutine however, we >> saw that >> the default pool size was more than enough. >> >> However in our original use case I thought we were using >> everything >> correctly, but I may have been wrong I’ve uploaded a repo with a >> pattern similar to the one we were using >> >> https://github.com/nemothekid/__heka-greedy-filter-deadlock >> >> <https://github.com/nemothekid/heka-greedy-filter-deadlock> >> >> I also want to make sure to mention that the aggregate step was >> “asking for one then injecting” the pipeline packs, and that the >> aggregation step wasn’t really taking a long time. It was simply >> the >> case the during the aggregation step, enough time passed for the >> plugin’s input chan to fill up. >> >> The example above freezes in the same location as our program when >> used with the example toml. >> >> Again, thanks! >> >> Nimi >> >> On Saturday, July 26, 2014 at 12:24 PM, Rob Miller wrote: >> >> Hi! >> >> What you've hit here isn't a bug, but just one of the facts >> of life in >> working w/ Heka and how it's currently designed. There's a >> static pool >> of packs, and if those packs are exhausted then you're going >> to >> deadlock b/c everybody will be waiting for new packs and >> none are being >> freed. Similarly, if the traffic flowing through the router >> backs up, >> then things will come to a stop b/c, well, just about >> everything flows >> through the router. >> >> The good news is that we have yet to find a case where this >> is actually >> a show-stopper. Once you know that these are the pitfalls, >> you can >> write your plugin code and structure your pipeline in a way >> that keeps >> data flowing. >> >> The first thing to point out is that the size of the >> PipelinePack pool >> is configurable, see >> http://hekad.readthedocs.org/__en/v0.6.0/config/index.html#_ >> _global-configuration-options >> <http://hekad.readthedocs.org/en/v0.6.0/config/index.html# >> global-configuration-options>. >> >> >> So if you really have a case where one plugin is going to >> need to >> consume a large number of packs at a single time, then you >> can crank >> the number of available packs way up. This will increase the >> resident >> memory size of the running hekad process, but it will give >> you the >> headroom you need to not cause everything else to block due >> to running >> out of packs. >> >> Really, though, while there are cases where you need a lot >> of packs, >> those are pretty rare. Usually you can structure things so >> that you >> free packs up as you use them. That is, instead of having a >> filter ask >> for dozens of packs, populate them, and then start injecting >> them into >> the router, you can ask for one, populate it, inject it, and >> then ask >> for the next one. I don't know enough about your use case to >> know if >> such adjustments would help, though. To that end I *would* be >> interested in you coding up an example filter that causes >> the issue, I >> might be able to suggest changes that would alleviate the >> back-pressure. Then you can let us know if you have other >> constraints >> that prevent you from making those changes in your >> production code. >> >> Another thing that I find curious is that apparently your >> aggregation >> step is taking a long time, so long that everything backs up >> while it's >> happening. This seems like an unusual situation. Usually >> aggregation >> happens as the data flows, and the periodic flushes don't >> actually have >> to do much other than inject the data that's been >> accumulating. Again, >> I don't know enough about your particular use case to say >> whether this >> is easily changed, but ideally the aggregation step wouldn't >> be such a >> heavy "stop-the-world" kind of activity. >> >> Assuming that can't be improved, your current solution of >> doing the >> heavy work in another goroutine so the receiving goroutine >> doesn't have >> to block is a fine one. In fact, I've used that often enough >> that I >> actually call it the "batch-and-back" pattern. I usually >> pre-allocate >> two buffers and pass them back and forth on channels btn the >> goroutines. The receiving goroutine populates the first >> buffer and, >> when some batch threshold (size and/or time elapsed) is >> reached, it >> passes it to the committing goroutine. The committing >> goroutine grabs >> this, does its work, and then drops the (now "empty") buffer >> on the >> return channel for reuse by the receiving goroutine. You can >> see this >> in action in the FileOutput: >> https://github.com/mozilla-__services/heka/blob/dev/__ >> plugins/file/file_output.go >> <https://github.com/mozilla-services/heka/blob/dev/ >> plugins/file/file_output.go>. >> >> >> Also, yes it is possible to bypass the router entirely by >> delivering >> packs directly to a specific filter or output, assuming you >> know the >> name of the registered plugin. The functions to support this >> are >> available on the PluginHelper interface, passed in to each >> filter's Run >> method: >> https://github.com/mozilla-__services/heka/blob/dev/__ >> pipeline/config.go#L46 >> >> <https://github.com/mozilla-services/heka/blob/dev/ >> pipeline/config.go#L46> >> >> If you do go this route (no pun intended), I recommend you >> have the >> output name specified as part of your filter's config and not >> hard-coded. >> >> Finally, when you're working in Go, anything is possible. >> It's probably >> not necessary, but there's nothing stopping you from >> instantiating your >> own set of packs entirely outside of the pools that Heka is >> already >> creating. When you call NewPipelinePack, you pass in the >> channel that >> the pack will be returned to when Recycle is called, so you >> could even >> inject them into the router or pass them on to some other >> plugin and >> ultimately they'll be returned to you when they've been >> processed. >> >> Hope this helps, >> >> -r >> >> >> >> On Sat 26 Jul 2014 01:51:24 AM PDT, Nimi Wariboko Jr wrote: >> >> Hi, >> >> We've come across an issue stress testing one of our >> setups. We have a >> 3 step setup, one (tcp) input, (aggregate) filter, and >> (cassandra) >> output. The way it works is on the input is a stream of >> key-values, >> which are then aggregated on key, and periodically >> flushed to the >> cassandra output. >> >> What we began to see under a stress test was in the >> periodic aggregate >> step, the aggregate filter will begin to flush its memory >> by >> requesting and injecting pipeline packs (up to ~13k >> pipelinepacks, >> multiple times larger than the 100 pool size). >> Surprisingly, >> eventually the pipeline pack pool is exhausted and heka >> will freeze. >> >> After digging around, it seems the issue is: >> 1.) A large amount of pipeline packs are sent to the >> aggregate filter. >> 2.) The aggregate filter begins its aggregate step, and >> stops >> accepting packs from its InChan. >> 3.) The aggregate filter also begins to request >> PipelinePacks and >> inject them into the stream. >> 4.) Because the aggregate filter is no longer accepting >> requests, the >> MessageRouter is stuck blocking for the aggregate filter >> to accept a >> message. >> 5.) Because the MessageRouter isn't routing messages, >> the injected >> packs aren't going to the Cassandra output, and aren't >> ultimately >> being freed >> 6.) Because the packs aren't being freed, the pool is >> eventually >> exhausted >> 7.) Heka freezes even though everybody calls Recycle or >> Inject. >> >> If this doesn't make sense, I can code up an example >> filter/input >> plugin that should cause the deadlock. >> >> I'm sure if this is a legitimate bug, or just something >> we shouldn't >> do. Currently we have sidestepped the issue by copying >> the data and >> doing the flush in a separate goroutine (freeing up the >> filter to >> continue to accept packs). Another solution for us would >> be to >> completely sidestep the router and just pass the >> pipeline pack to the >> output directly. The docs claim this is possible, but >> I'm not entirely >> clear on how to achieve this (how do you get the >> reference to the >> target plugin InChan?). >> >> All in all, it seems that heka can freeze if the message >> router tries >> to deliver a pipelinepack to a filter that is in a busy >> loop >> requesting & injecting pipeline packs. >> >> Thanks, >> Nimi Wariboko Jr. >> [email protected] <mailto:[email protected]> >> <mailto:[email protected] >> <mailto:[email protected]>> >> >> >> _________________________________________________ >> >> Heka mailing list >> [email protected] <mailto:[email protected]> >> <mailto:[email protected] <mailto:[email protected]>> >> https://mail.mozilla.org/__listinfo/heka >> <https://mail.mozilla.org/listinfo/heka> >> >> >> >> > _______________________________________________ > Heka mailing list > [email protected] > https://mail.mozilla.org/listinfo/heka > -- Best Regards. Tom Sawyer. @Alibaba-inc.
_______________________________________________ Heka mailing list [email protected] https://mail.mozilla.org/listinfo/heka

