Hi!
What you've hit here isn't a bug, but just one of the facts of life in
working w/ Heka and how it's currently designed. There's a static pool
of packs, and if those packs are exhausted then you're going to
deadlock b/c everybody will be waiting for new packs and none are being
freed. Similarly, if the traffic flowing through the router backs up,
then things will come to a stop b/c, well, just about everything flows
through the router.
The good news is that we have yet to find a case where this is actually
a show-stopper. Once you know that these are the pitfalls, you can
write your plugin code and structure your pipeline in a way that keeps
data flowing.
The first thing to point out is that the size of the PipelinePack pool
is configurable, see
http://hekad.readthedocs.org/en/v0.6.0/config/index.html#global-configuration-options.
So if you really have a case where one plugin is going to need to
consume a large number of packs at a single time, then you can crank
the number of available packs way up. This will increase the resident
memory size of the running hekad process, but it will give you the
headroom you need to not cause everything else to block due to running
out of packs.
Really, though, while there are cases where you need a lot of packs,
those are pretty rare. Usually you can structure things so that you
free packs up as you use them. That is, instead of having a filter ask
for dozens of packs, populate them, and then start injecting them into
the router, you can ask for one, populate it, inject it, and then ask
for the next one. I don't know enough about your use case to know if
such adjustments would help, though. To that end I *would* be
interested in you coding up an example filter that causes the issue, I
might be able to suggest changes that would alleviate the
back-pressure. Then you can let us know if you have other constraints
that prevent you from making those changes in your production code.
Another thing that I find curious is that apparently your aggregation
step is taking a long time, so long that everything backs up while it's
happening. This seems like an unusual situation. Usually aggregation
happens as the data flows, and the periodic flushes don't actually have
to do much other than inject the data that's been accumulating. Again,
I don't know enough about your particular use case to say whether this
is easily changed, but ideally the aggregation step wouldn't be such a
heavy "stop-the-world" kind of activity.
Assuming that can't be improved, your current solution of doing the
heavy work in another goroutine so the receiving goroutine doesn't have
to block is a fine one. In fact, I've used that often enough that I
actually call it the "batch-and-back" pattern. I usually pre-allocate
two buffers and pass them back and forth on channels btn the
goroutines. The receiving goroutine populates the first buffer and,
when some batch threshold (size and/or time elapsed) is reached, it
passes it to the committing goroutine. The committing goroutine grabs
this, does its work, and then drops the (now "empty") buffer on the
return channel for reuse by the receiving goroutine. You can see this
in action in the FileOutput:
https://github.com/mozilla-services/heka/blob/dev/plugins/file/file_output.go.
Also, yes it is possible to bypass the router entirely by delivering
packs directly to a specific filter or output, assuming you know the
name of the registered plugin. The functions to support this are
available on the PluginHelper interface, passed in to each filter's Run
method:
https://github.com/mozilla-services/heka/blob/dev/pipeline/config.go#L46
If you do go this route (no pun intended), I recommend you have the
output name specified as part of your filter's config and not
hard-coded.
Finally, when you're working in Go, anything is possible. It's probably
not necessary, but there's nothing stopping you from instantiating your
own set of packs entirely outside of the pools that Heka is already
creating. When you call NewPipelinePack, you pass in the channel that
the pack will be returned to when Recycle is called, so you could even
inject them into the router or pass them on to some other plugin and
ultimately they'll be returned to you when they've been processed.
Hope this helps,
-r
On Sat 26 Jul 2014 01:51:24 AM PDT, Nimi Wariboko Jr wrote:
Hi,
We've come across an issue stress testing one of our setups. We have a
3 step setup, one (tcp) input, (aggregate) filter, and (cassandra)
output. The way it works is on the input is a stream of key-values,
which are then aggregated on key, and periodically flushed to the
cassandra output.
What we began to see under a stress test was in the periodic aggregate
step, the aggregate filter will begin to flush its memory by
requesting and injecting pipeline packs (up to ~13k pipelinepacks,
multiple times larger than the 100 pool size). Surprisingly,
eventually the pipeline pack pool is exhausted and heka will freeze.
After digging around, it seems the issue is:
1.) A large amount of pipeline packs are sent to the aggregate filter.
2.) The aggregate filter begins its aggregate step, and stops
accepting packs from its InChan.
3.) The aggregate filter also begins to request PipelinePacks and
inject them into the stream.
4.) Because the aggregate filter is no longer accepting requests, the
MessageRouter is stuck blocking for the aggregate filter to accept a
message.
5.) Because the MessageRouter isn't routing messages, the injected
packs aren't going to the Cassandra output, and aren't ultimately
being freed
6.) Because the packs aren't being freed, the pool is eventually
exhausted
7.) Heka freezes even though everybody calls Recycle or Inject.
If this doesn't make sense, I can code up an example filter/input
plugin that should cause the deadlock.
I'm sure if this is a legitimate bug, or just something we shouldn't
do. Currently we have sidestepped the issue by copying the data and
doing the flush in a separate goroutine (freeing up the filter to
continue to accept packs). Another solution for us would be to
completely sidestep the router and just pass the pipeline pack to the
output directly. The docs claim this is possible, but I'm not entirely
clear on how to achieve this (how do you get the reference to the
target plugin InChan?).
All in all, it seems that heka can freeze if the message router tries
to deliver a pipelinepack to a filter that is in a busy loop
requesting & injecting pipeline packs.
Thanks,
Nimi Wariboko Jr.
[email protected] <mailto:[email protected]>
_______________________________________________
Heka mailing list
[email protected] <mailto:[email protected]>
https://mail.mozilla.org/listinfo/heka