Oh, one other thought... ultimately the goal is for buffering to be a config option. In this case, if the user doesn't want buffering then Run could call SendRecord directly. Or, possibly better, spin up another goroutine that pulls the batch off of a channel and calls SendRecord, so processing new messages isn't blocked on the HTTP request.

-r


On 01/12/2015 02:34 PM, Tiru Srikantha wrote:
Thanks for the code review. I wrote that at 11pm and was going to go
over it again later today to clean it up, but you caught a couple things
I wouldn't have. queueFull got copied directly from TCPOutput and the
names changed, I was already going to go back and rework that for the
byte slice.

About framing:
http://hekad.readthedocs.org/en/v0.8.2/message/index.html#stream-framing
doesn't show a "trailer" character and
https://github.com/mozilla-services/heka/blob/dev/pipeline/stream_parser.go#L315
suggests to me that it will attempt to blindly grab `message_length -
header_size` bytes once it finds a header. It's definitely something to
make sure of for a more thorough implementation of buffering, I agree.

On Mon, Jan 12, 2015 at 2:08 PM, Rob Miller <[email protected]
<mailto:[email protected]>> wrote:

    On 01/12/2015 11:52 AM, Tiru Srikantha wrote:

        Yeah, the more I think about it the more I'm sure I was
        overengineering.
        Your point was a good one re: complexity. I'm not sure why we'd be
        framing the messages anyway, though. Outer/inner framing
        actually has no
        purpose in this context since the records are JSON entries and the
        format delimits the individual records. If we were doing binary
        buffering, I'd say that more work was needed to ensure the
        lengths were
        read properly, but for this specific plugin it wouldn't be a problem
        unless I totally misunderstand framing.

    No, you're understanding correctly, I'm just thinking in the general
    case. Our plan is to make the buffering functionality in
    BufferedOutput available to all outputs, so any batching support
    that's added needs to be able to support generic data streams. We
    have to make sure that if someone decides to buffer a batch of
    framed, protobuf encoded Heka messages that the buffer's batch
    framing and the individual message framing don't interfere with each
    other.

        I implemented it
        
(https://github.com/__highlyunavailable/heka/__compare/feature/__bufferedelasticsearch?diff=__split&name=feature%__2Fbufferedelasticsearch
        
<https://github.com/highlyunavailable/heka/compare/feature/bufferedelasticsearch?diff=split&name=feature%2Fbufferedelasticsearch>)
        and it works fine against a test ES instance without framing. I
        tried
        killing the instance and it errored as you might expect, then
        immediately started sending data when the instance came back up
        (including persisting the data through a Heka restart), so I
        consider it
        a success pending a bit more testing and cleaning up the unused
        variables. This also got rid of the receiver/committer
        goroutines from
        `Init()` and changed the behavior from "block if the buffer is
        full and
        we haven't sent the buffer yet" to "write the buffer to disk and
        send it
        when we have time" which I consider a better/safer behavior than
        leaving
        packs in the queue until there's space to process them.

    Notes from a quick peek:

    * You don't need an `if` at the end of QueueRecord, just `return
    b.QueueBytes(msgBytes)` should be fine.
    * If you're storing the config on the ElasticSearchOutput struct,
    you probably don't need to copy all of the attributes from the
    config struct to the output struct. Is there a reason to set
    `o.flushInterval = o.conf.FlushInterval` when you can just refer to
    `o.conf.FlushInterval` anywhere?
    * You're never checking the results of the QueueBytes call, and
    instead you're checking if the result of the or.Encode call is
    QueueIsFull, which doesn't really make much sense.
    * The queueFull seems a bit off, it should be dealing w/ byte
    slices, not a single pack. Also, a single success or failure might
    represent an entire batch of messages, not just a single one, w.r.t.
    your processMessageCount and dropMessageCount.

        Also, the Processed Message Count seems to stay empty in the
        dashboard
        for the ES output - what do I need to implement to have it
        properly show
        how many messages it has sent to ES? I can't find anything in
        https://hekad.readthedocs.org/__en/latest/developing/plugin.__html
        <https://hekad.readthedocs.org/en/latest/developing/plugin.html>
        about
        what interfaces need to be implemented or where I need to
        register to
        have that happen. Just adding a ReportMessage method to satisfy
        pipeline.ReportingPlugin didn't seem to do it.

    Yes, this is a bit awkward. Right now you have to implement it on a
    case by case basis, when really Heka should be doing at least some
    of this accounting for you. We have an issue open for it:
    https://github.com/mozilla-__services/heka/issues/918
    <https://github.com/mozilla-services/heka/issues/918>

    Using ReportMessage is the way to make it work, however. You can see
    how it's supposed to work in the SandboxFilter implementation:
    
https://github.com/mozilla-__services/heka/blob/dev/__sandbox/plugins/sandbox___filter.go#L120
    
<https://github.com/mozilla-services/heka/blob/dev/sandbox/plugins/sandbox_filter.go#L120>

    Hope this helps,

    -r


        On Mon, Jan 12, 2015 at 11:13 AM, Rob Miller
        <[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>> wrote:

             This is true, but this is true no matter how the buffering is
             engineered. There might be messages sitting on the router's
        channel,
             or the output matcher's channel, or the output's channel,
        all of
             which would be lost if Heka crashes. I've imagined the
        buffering to
             be protecting against downtime and/or slowness of the
        downstream
             destination, not as a delivery guarantee mechanism. I wish
        I could
             tell you Heka guarantees message delivery, but it doesn't.
        I'd love
             to add support, but it would need to be considered within the
             context of the whole pipeline, not just a single point.

             That being said, harm reduction is reasonable, and I'm not
        dead set
             against being able to buffer partial batches. But I still
        shy away
             from the idea of having a separate SendRecords function for
        bulk
             sending, if possible. Maybe a buffer that was set up for
        batching
             could accumulate individual records to a short term buffer,
        adding
             them to the end of the send queue when a full batch is
        acquired? I'm
             open to further discussion.

             Also, our message header contains a message length, so with
        care it
             should be possible to nest the framing, it's just that
        confusing the
             inner framing for the outer framing is a pitfall to beware.

             -r


             On 01/11/2015 11:03 PM, Tiru Srikantha wrote:

                 The simple approach means you can lose messages if the Heka
                 process dies
                 before the ticker expires or the maxcount is hit, as
        well as the
                 nested
                 framing issues you raised. I'm probably mentally
                 over-engineering this,
                 though, and if it's good enough for you, then that's
        what you
                 get. I'll
                 send a PR with an implementation based on what you
        recommended - it
                 should be fairly simple given those limitations. Thanks
        for the
                 guidance.

                 On Sun, Jan 11, 2015 at 6:52 PM, Rob Miller
        <[email protected] <mailto:[email protected]>
                 <mailto:[email protected] <mailto:[email protected]>>
                 <mailto:[email protected]
        <mailto:[email protected]> <mailto:[email protected]
        <mailto:[email protected]>>>> wrote:



                      On 01/11/2015 12:34 AM, Tiru Srikantha wrote:

                          Yeah, I'm looking at this as well. I don't
        like losing
                 the bulk
                          because
                          that means it gets slow when your load gets
        high due to
                 HTTP
                          overhead.
                          If I didn't care about bulk it'd just be a quick
                 rewrite. The other
                          problem I ran into around bulk operations on a
        buffered
                 queue is
                          that
                          there are 4 points when you want to flush to
        the output:

                          1. Queued messages count equals count to send.
                          2. Queued message size with the new message
        added will
                 exceed
                          the max
                          bulk message size, even if the count is less
        than max,
                 due to large
                          messages.
                          3. A timer expires to force a flush to the output
                 target even if the
                          count or max size hasn't been hit yet, to get
        the data
                 into the
                          output
                          target in a timely manner.
                          4. Plugin is shutting down.

                          I'm actually re-writing a lot of the
                 plugins/buffered_output.go
                          file in
                          my local fork because of this and teasing out
        the "read
                 the next
                          record
                          from the file pile" operation from the "send
        the next
                 record"
                          operation,
                          so what I'm aiming for is something like a
                 BulkBufferedOutput
                          interface
                          with a SendRecords(records [][]byte) method
        that must be
                          implemented in
                          lieu of BufferedOutput's SendRecord(record
        []byte) and
                 some code
                          that's
                          shared between them to buffer new messages and
        read
                 buffered
                          messages.
                          SendRecords would not advance the cursor until
        the bulk
                 operation
                          succeeded, as you might expect.


                      I recommend reading my other message in this thread
                      (http://is.gd/kzuhxs) for an alternate approach. I
        think
                 you can
                      achieve what you want with less effort, and better
                 separation of
                      concerns, by doing the batching *before* you pass
        data in
                 to the
                      BufferedOutput. Ultimately each buffer "record" is
        just a
                 slice of
                      bytes, the buffer doesn't need to know or care if
        that slice
                      contains a single serialized message or an accumulated
                 batch of them.

                          I'll submit a PR once I finish.


                      I always look forward to PRs, but I'll warn you
        that one
                 that the
                      approach described above will likely be rejected.
        I'm not
                 very keen
                      on introducing a separate buffering interface
        specifically for
                      batches, when a simpler change can solve the same
        problems.

                      -r






_______________________________________________
Heka mailing list
[email protected]
https://mail.mozilla.org/listinfo/heka

Reply via email to