On 01/12/2015 11:52 AM, Tiru Srikantha wrote:
Yeah, the more I think about it the more I'm sure I was overengineering.
Your point was a good one re: complexity. I'm not sure why we'd be
framing the messages anyway, though. Outer/inner framing actually has no
purpose in this context since the records are JSON entries and the
format delimits the individual records. If we were doing binary
buffering, I'd say that more work was needed to ensure the lengths were
read properly, but for this specific plugin it wouldn't be a problem
unless I totally misunderstand framing.
No, you're understanding correctly, I'm just thinking in the general case. Our 
plan is to make the buffering functionality in BufferedOutput available to all 
outputs, so any batching support that's added needs to be able to support 
generic data streams. We have to make sure that if someone decides to buffer a 
batch of framed, protobuf encoded Heka messages that the buffer's batch framing 
and the individual message framing don't interfere with each other.
I implemented it
(https://github.com/highlyunavailable/heka/compare/feature/bufferedelasticsearch?diff=split&name=feature%2Fbufferedelasticsearch)
and it works fine against a test ES instance without framing. I tried
killing the instance and it errored as you might expect, then
immediately started sending data when the instance came back up
(including persisting the data through a Heka restart), so I consider it
a success pending a bit more testing and cleaning up the unused
variables. This also got rid of the receiver/committer goroutines from
`Init()` and changed the behavior from "block if the buffer is full and
we haven't sent the buffer yet" to "write the buffer to disk and send it
when we have time" which I consider a better/safer behavior than leaving
packs in the queue until there's space to process them.
Notes from a quick peek:

* You don't need an `if` at the end of QueueRecord, just `return 
b.QueueBytes(msgBytes)` should be fine.
* If you're storing the config on the ElasticSearchOutput struct, you probably 
don't need to copy all of the attributes from the config struct to the output 
struct. Is there a reason to set  `o.flushInterval = o.conf.FlushInterval` when 
you can just refer to `o.conf.FlushInterval` anywhere?
* You're never checking the results of the QueueBytes call, and instead you're 
checking if the result of the or.Encode call is QueueIsFull, which doesn't 
really make much sense.
* The queueFull seems a bit off, it should be dealing w/ byte slices, not a 
single pack. Also, a single success or failure might represent an entire batch 
of messages, not just a single one, w.r.t. your processMessageCount and 
dropMessageCount.
Also, the Processed Message Count seems to stay empty in the dashboard
for the ES output - what do I need to implement to have it properly show
how many messages it has sent to ES? I can't find anything in
https://hekad.readthedocs.org/en/latest/developing/plugin.html about
what interfaces need to be implemented or where I need to register to
have that happen. Just adding a ReportMessage method to satisfy
pipeline.ReportingPlugin didn't seem to do it.
Yes, this is a bit awkward. Right now you have to implement it on a case by 
case basis, when really Heka should be doing at least some of this accounting 
for you. We have an issue open for it: 
https://github.com/mozilla-services/heka/issues/918

Using ReportMessage is the way to make it work, however. You can see how it's 
supposed to work in the SandboxFilter implementation: 
https://github.com/mozilla-services/heka/blob/dev/sandbox/plugins/sandbox_filter.go#L120

Hope this helps,

-r


On Mon, Jan 12, 2015 at 11:13 AM, Rob Miller <[email protected]
<mailto:[email protected]>> wrote:

    This is true, but this is true no matter how the buffering is
    engineered. There might be messages sitting on the router's channel,
    or the output matcher's channel, or the output's channel, all of
    which would be lost if Heka crashes. I've imagined the buffering to
    be protecting against downtime and/or slowness of the downstream
    destination, not as a delivery guarantee mechanism. I wish I could
    tell you Heka guarantees message delivery, but it doesn't. I'd love
    to add support, but it would need to be considered within the
    context of the whole pipeline, not just a single point.

    That being said, harm reduction is reasonable, and I'm not dead set
    against being able to buffer partial batches. But I still shy away
    from the idea of having a separate SendRecords function for bulk
    sending, if possible. Maybe a buffer that was set up for batching
    could accumulate individual records to a short term buffer, adding
    them to the end of the send queue when a full batch is acquired? I'm
    open to further discussion.

    Also, our message header contains a message length, so with care it
    should be possible to nest the framing, it's just that confusing the
    inner framing for the outer framing is a pitfall to beware.

    -r


    On 01/11/2015 11:03 PM, Tiru Srikantha wrote:

        The simple approach means you can lose messages if the Heka
        process dies
        before the ticker expires or the maxcount is hit, as well as the
        nested
        framing issues you raised. I'm probably mentally
        over-engineering this,
        though, and if it's good enough for you, then that's what you
        get. I'll
        send a PR with an implementation based on what you recommended - it
        should be fairly simple given those limitations. Thanks for the
        guidance.

        On Sun, Jan 11, 2015 at 6:52 PM, Rob Miller <[email protected]
        <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>> wrote:



             On 01/11/2015 12:34 AM, Tiru Srikantha wrote:

                 Yeah, I'm looking at this as well. I don't like losing
        the bulk
                 because
                 that means it gets slow when your load gets high due to
        HTTP
                 overhead.
                 If I didn't care about bulk it'd just be a quick
        rewrite. The other
                 problem I ran into around bulk operations on a buffered
        queue is
                 that
                 there are 4 points when you want to flush to the output:

                 1. Queued messages count equals count to send.
                 2. Queued message size with the new message added will
        exceed
                 the max
                 bulk message size, even if the count is less than max,
        due to large
                 messages.
                 3. A timer expires to force a flush to the output
        target even if the
                 count or max size hasn't been hit yet, to get the data
        into the
                 output
                 target in a timely manner.
                 4. Plugin is shutting down.

                 I'm actually re-writing a lot of the
        plugins/buffered_output.go
                 file in
                 my local fork because of this and teasing out the "read
        the next
                 record
                 from the file pile" operation from the "send the next
        record"
                 operation,
                 so what I'm aiming for is something like a
        BulkBufferedOutput
                 interface
                 with a SendRecords(records [][]byte) method that must be
                 implemented in
                 lieu of BufferedOutput's SendRecord(record []byte) and
        some code
                 that's
                 shared between them to buffer new messages and read
        buffered
                 messages.
                 SendRecords would not advance the cursor until the bulk
        operation
                 succeeded, as you might expect.


             I recommend reading my other message in this thread
             (http://is.gd/kzuhxs) for an alternate approach. I think
        you can
             achieve what you want with less effort, and better
        separation of
             concerns, by doing the batching *before* you pass data in
        to the
             BufferedOutput. Ultimately each buffer "record" is just a
        slice of
             bytes, the buffer doesn't need to know or care if that slice
             contains a single serialized message or an accumulated
        batch of them.

                 I'll submit a PR once I finish.


             I always look forward to PRs, but I'll warn you that one
        that the
             approach described above will likely be rejected. I'm not
        very keen
             on introducing a separate buffering interface specifically for
             batches, when a simpler change can solve the same problems.

             -r





_______________________________________________
Heka mailing list
[email protected]
https://mail.mozilla.org/listinfo/heka

Reply via email to