For this case it sounds like maybe you should be doing the work in a decoder 
instead of a filter. Decoders can generate more than one output message per 
input message, and decoding happens before the router is hit, so any 
back-pressure that is generated only impacts the relevant input, not the entire 
pipeline. You might have to bump the pool size, though, esp if you use a 
SandboxDecoder, b/c a SandboxDecoder will consume all of the packs it needs 
before flushing any of them to the router. A decoder written in Go can inject 
each message as it's generated.

If you already have a decoder in play, then you can try using a MultiDecoder, 
or if you really need to squeeze performance out of it you can write a single 
decoder that integrates the functionality of what you're already using and the 
additional work you need to do into one operation.

Does this seem reasonable?

-r


On 07/31/2014 10:22 AM, Nimi Wariboko Jr wrote:
We ran into this issue again in another filter. This time we have an
HttpInput plugin that flushes out to a PluginFilter that takes each
incoming message, does some regex matching and created 8-16 new
PipelinePacks. Under load of about ~1,000 http requests/s heka ends up
in a deadlock in the same location. Whats really undesirable here is
that once the lock happens, even if load goes down Heka is still frozen.

I'd like to avoid creating a goroutine for every request here, so what
we ended up doing is just routing directly to the next plugin inside the
filter. The only downside here is that we lose the ability to change the
routes without recompiling (for example if we wanted to output to
dashboard as well, or another plugin).

I only bring this up to ask that maybe its worth giving each Filter
their own dedicated message router. Filters seem to be unique position
because they can be routed messages as well as route their own messages,
they can starve out other plugins from using the message router. What do
you think?


On Wed, Jul 30, 2014 at 11:29 AM, Rob Miller <[email protected]
<mailto:[email protected]>> wrote:

    Yep, this behavior sounds like what I would expect. Your flush
    operation takes a long time, so if you've got a lot of data flowing
    through the pipeline things are going to back up if the filter
    blocks while the flush is happening. Your solution of performing the
    flush in a separate goroutine so you can keep processing the
    incoming messages is a fine one.

    -r



    On Sat 26 Jul 2014 03:15:11 PM PDT, Nimi Wariboko Jr. wrote:

        Hi Rob,

        Thanks a lot for the detailed explanation. We attempted at first to
        increase the pack pool, but ran into the issue again after a longer
        period of time. When we switched to the goroutine however, we
        saw that
        the default pool size was more than enough.

        However in our original use case I thought we were using everything
        correctly, but I may have been wrong I’ve uploaded a repo with a
        pattern similar to the one we were using

        https://github.com/nemothekid/__heka-greedy-filter-deadlock
        <https://github.com/nemothekid/heka-greedy-filter-deadlock>

        I also want to make sure to mention that the aggregate step was
        “asking for one then injecting” the pipeline packs, and that the
        aggregation step wasn’t really taking a long time. It was simply the
        case the during the aggregation step, enough time passed for the
        plugin’s input chan to fill up.

        The example above freezes in the same location as our program when
        used with the example toml.

        Again, thanks!

        Nimi

        On Saturday, July 26, 2014 at 12:24 PM, Rob Miller wrote:

            Hi!

            What you've hit here isn't a bug, but just one of the facts
            of life in
            working w/ Heka and how it's currently designed. There's a
            static pool
            of packs, and if those packs are exhausted then you're going to
            deadlock b/c everybody will be waiting for new packs and
            none are being
            freed. Similarly, if the traffic flowing through the router
            backs up,
            then things will come to a stop b/c, well, just about
            everything flows
            through the router.

            The good news is that we have yet to find a case where this
            is actually
            a show-stopper. Once you know that these are the pitfalls,
            you can
            write your plugin code and structure your pipeline in a way
            that keeps
            data flowing.

            The first thing to point out is that the size of the
            PipelinePack pool
            is configurable, see
            
http://hekad.readthedocs.org/__en/v0.6.0/config/index.html#__global-configuration-options
            
<http://hekad.readthedocs.org/en/v0.6.0/config/index.html#global-configuration-options>.

            So if you really have a case where one plugin is going to
            need to
            consume a large number of packs at a single time, then you
            can crank
            the number of available packs way up. This will increase the
            resident
            memory size of the running hekad process, but it will give
            you the
            headroom you need to not cause everything else to block due
            to running
            out of packs.

            Really, though, while there are cases where you need a lot
            of packs,
            those are pretty rare. Usually you can structure things so
            that you
            free packs up as you use them. That is, instead of having a
            filter ask
            for dozens of packs, populate them, and then start injecting
            them into
            the router, you can ask for one, populate it, inject it, and
            then ask
            for the next one. I don't know enough about your use case to
            know if
            such adjustments would help, though. To that end I *would* be
            interested in you coding up an example filter that causes
            the issue, I
            might be able to suggest changes that would alleviate the
            back-pressure. Then you can let us know if you have other
            constraints
            that prevent you from making those changes in your
            production code.

            Another thing that I find curious is that apparently your
            aggregation
            step is taking a long time, so long that everything backs up
            while it's
            happening. This seems like an unusual situation. Usually
            aggregation
            happens as the data flows, and the periodic flushes don't
            actually have
            to do much other than inject the data that's been
            accumulating. Again,
            I don't know enough about your particular use case to say
            whether this
            is easily changed, but ideally the aggregation step wouldn't
            be such a
            heavy "stop-the-world" kind of activity.

            Assuming that can't be improved, your current solution of
            doing the
            heavy work in another goroutine so the receiving goroutine
            doesn't have
            to block is a fine one. In fact, I've used that often enough
            that I
            actually call it the "batch-and-back" pattern. I usually
            pre-allocate
            two buffers and pass them back and forth on channels btn the
            goroutines. The receiving goroutine populates the first
            buffer and,
            when some batch threshold (size and/or time elapsed) is
            reached, it
            passes it to the committing goroutine. The committing
            goroutine grabs
            this, does its work, and then drops the (now "empty") buffer
            on the
            return channel for reuse by the receiving goroutine. You can
            see this
            in action in the FileOutput:
            
https://github.com/mozilla-__services/heka/blob/dev/__plugins/file/file_output.go
            
<https://github.com/mozilla-services/heka/blob/dev/plugins/file/file_output.go>.

            Also, yes it is possible to bypass the router entirely by
            delivering
            packs directly to a specific filter or output, assuming you
            know the
            name of the registered plugin. The functions to support this are
            available on the PluginHelper interface, passed in to each
            filter's Run
            method:
            
https://github.com/mozilla-__services/heka/blob/dev/__pipeline/config.go#L46
            
<https://github.com/mozilla-services/heka/blob/dev/pipeline/config.go#L46>

            If you do go this route (no pun intended), I recommend you
            have the
            output name specified as part of your filter's config and not
            hard-coded.

            Finally, when you're working in Go, anything is possible.
            It's probably
            not necessary, but there's nothing stopping you from
            instantiating your
            own set of packs entirely outside of the pools that Heka is
            already
            creating. When you call NewPipelinePack, you pass in the
            channel that
            the pack will be returned to when Recycle is called, so you
            could even
            inject them into the router or pass them on to some other
            plugin and
            ultimately they'll be returned to you when they've been
            processed.

            Hope this helps,

            -r



            On Sat 26 Jul 2014 01:51:24 AM PDT, Nimi Wariboko Jr wrote:

                Hi,

                We've come across an issue stress testing one of our
                setups. We have a
                3 step setup, one (tcp) input, (aggregate) filter, and
                (cassandra)
                output. The way it works is on the input is a stream of
                key-values,
                which are then aggregated on key, and periodically
                flushed to the
                cassandra output.

                What we began to see under a stress test was in the
                periodic aggregate
                step, the aggregate filter will begin to flush its memory by
                requesting and injecting pipeline packs (up to ~13k
                pipelinepacks,
                multiple times larger than the 100 pool size). Surprisingly,
                eventually the pipeline pack pool is exhausted and heka
                will freeze.

                After digging around, it seems the issue is:
                1.) A large amount of pipeline packs are sent to the
                aggregate filter.
                2.) The aggregate filter begins its aggregate step, and
                stops
                accepting packs from its InChan.
                3.) The aggregate filter also begins to request
                PipelinePacks and
                inject them into the stream.
                4.) Because the aggregate filter is no longer accepting
                requests, the
                MessageRouter is stuck blocking for the aggregate filter
                to accept a
                message.
                5.) Because the MessageRouter isn't routing messages,
                the injected
                packs aren't going to the Cassandra output, and aren't
                ultimately
                being freed
                6.) Because the packs aren't being freed, the pool is
                eventually
                exhausted
                7.) Heka freezes even though everybody calls Recycle or
                Inject.

                If this doesn't make sense, I can code up an example
                filter/input
                plugin that should cause the deadlock.

                I'm sure if this is a legitimate bug, or just something
                we shouldn't
                do. Currently we have sidestepped the issue by copying
                the data and
                doing the flush in a separate goroutine (freeing up the
                filter to
                continue to accept packs). Another solution for us would
                be to
                completely sidestep the router and just pass the
                pipeline pack to the
                output directly. The docs claim this is possible, but
                I'm not entirely
                clear on how to achieve this (how do you get the
                reference to the
                target plugin InChan?).

                All in all, it seems that heka can freeze if the message
                router tries
                to deliver a pipelinepack to a filter that is in a busy loop
                requesting & injecting pipeline packs.

                Thanks,
                Nimi Wariboko Jr.
                [email protected] <mailto:[email protected]>
                <mailto:[email protected]
                <mailto:[email protected]>>


                _________________________________________________
                Heka mailing list
                [email protected] <mailto:[email protected]>
                <mailto:[email protected] <mailto:[email protected]>>
                https://mail.mozilla.org/__listinfo/heka
                <https://mail.mozilla.org/listinfo/heka>




_______________________________________________
Heka mailing list
[email protected]
https://mail.mozilla.org/listinfo/heka

Reply via email to