Thanks for the code review. I wrote that at 11pm and was going to go over
it again later today to clean it up, but you caught a couple things I
wouldn't have. queueFull got copied directly from TCPOutput and the names
changed, I was already going to go back and rework that for the byte slice.

About framing:
http://hekad.readthedocs.org/en/v0.8.2/message/index.html#stream-framing
doesn't show a "trailer" character and
https://github.com/mozilla-services/heka/blob/dev/pipeline/stream_parser.go#L315
suggests to me that it will attempt to blindly grab `message_length -
header_size` bytes once it finds a header. It's definitely something to
make sure of for a more thorough implementation of buffering, I agree.

On Mon, Jan 12, 2015 at 2:08 PM, Rob Miller <[email protected]> wrote:

> On 01/12/2015 11:52 AM, Tiru Srikantha wrote:
>
>> Yeah, the more I think about it the more I'm sure I was overengineering.
>> Your point was a good one re: complexity. I'm not sure why we'd be
>> framing the messages anyway, though. Outer/inner framing actually has no
>> purpose in this context since the records are JSON entries and the
>> format delimits the individual records. If we were doing binary
>> buffering, I'd say that more work was needed to ensure the lengths were
>> read properly, but for this specific plugin it wouldn't be a problem
>> unless I totally misunderstand framing.
>>
> No, you're understanding correctly, I'm just thinking in the general case.
> Our plan is to make the buffering functionality in BufferedOutput available
> to all outputs, so any batching support that's added needs to be able to
> support generic data streams. We have to make sure that if someone decides
> to buffer a batch of framed, protobuf encoded Heka messages that the
> buffer's batch framing and the individual message framing don't interfere
> with each other.
>
>> I implemented it
>> (https://github.com/highlyunavailable/heka/compare/feature/
>> bufferedelasticsearch?diff=split&name=feature%2Fbufferedelasticsearch)
>> and it works fine against a test ES instance without framing. I tried
>> killing the instance and it errored as you might expect, then
>> immediately started sending data when the instance came back up
>> (including persisting the data through a Heka restart), so I consider it
>> a success pending a bit more testing and cleaning up the unused
>> variables. This also got rid of the receiver/committer goroutines from
>> `Init()` and changed the behavior from "block if the buffer is full and
>> we haven't sent the buffer yet" to "write the buffer to disk and send it
>> when we have time" which I consider a better/safer behavior than leaving
>> packs in the queue until there's space to process them.
>>
> Notes from a quick peek:
>
> * You don't need an `if` at the end of QueueRecord, just `return
> b.QueueBytes(msgBytes)` should be fine.
> * If you're storing the config on the ElasticSearchOutput struct, you
> probably don't need to copy all of the attributes from the config struct to
> the output struct. Is there a reason to set  `o.flushInterval =
> o.conf.FlushInterval` when you can just refer to `o.conf.FlushInterval`
> anywhere?
> * You're never checking the results of the QueueBytes call, and instead
> you're checking if the result of the or.Encode call is QueueIsFull, which
> doesn't really make much sense.
> * The queueFull seems a bit off, it should be dealing w/ byte slices, not
> a single pack. Also, a single success or failure might represent an entire
> batch of messages, not just a single one, w.r.t. your processMessageCount
> and dropMessageCount.
>
>> Also, the Processed Message Count seems to stay empty in the dashboard
>> for the ES output - what do I need to implement to have it properly show
>> how many messages it has sent to ES? I can't find anything in
>> https://hekad.readthedocs.org/en/latest/developing/plugin.html about
>> what interfaces need to be implemented or where I need to register to
>> have that happen. Just adding a ReportMessage method to satisfy
>> pipeline.ReportingPlugin didn't seem to do it.
>>
> Yes, this is a bit awkward. Right now you have to implement it on a case
> by case basis, when really Heka should be doing at least some of this
> accounting for you. We have an issue open for it:
> https://github.com/mozilla-services/heka/issues/918
>
> Using ReportMessage is the way to make it work, however. You can see how
> it's supposed to work in the SandboxFilter implementation:
> https://github.com/mozilla-services/heka/blob/dev/sandbox/plugins/sandbox_
> filter.go#L120
>
> Hope this helps,
>
> -r
>
>
>> On Mon, Jan 12, 2015 at 11:13 AM, Rob Miller <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>>     This is true, but this is true no matter how the buffering is
>>     engineered. There might be messages sitting on the router's channel,
>>     or the output matcher's channel, or the output's channel, all of
>>     which would be lost if Heka crashes. I've imagined the buffering to
>>     be protecting against downtime and/or slowness of the downstream
>>     destination, not as a delivery guarantee mechanism. I wish I could
>>     tell you Heka guarantees message delivery, but it doesn't. I'd love
>>     to add support, but it would need to be considered within the
>>     context of the whole pipeline, not just a single point.
>>
>>     That being said, harm reduction is reasonable, and I'm not dead set
>>     against being able to buffer partial batches. But I still shy away
>>     from the idea of having a separate SendRecords function for bulk
>>     sending, if possible. Maybe a buffer that was set up for batching
>>     could accumulate individual records to a short term buffer, adding
>>     them to the end of the send queue when a full batch is acquired? I'm
>>     open to further discussion.
>>
>>     Also, our message header contains a message length, so with care it
>>     should be possible to nest the framing, it's just that confusing the
>>     inner framing for the outer framing is a pitfall to beware.
>>
>>     -r
>>
>>
>>     On 01/11/2015 11:03 PM, Tiru Srikantha wrote:
>>
>>         The simple approach means you can lose messages if the Heka
>>         process dies
>>         before the ticker expires or the maxcount is hit, as well as the
>>         nested
>>         framing issues you raised. I'm probably mentally
>>         over-engineering this,
>>         though, and if it's good enough for you, then that's what you
>>         get. I'll
>>         send a PR with an implementation based on what you recommended -
>> it
>>         should be fairly simple given those limitations. Thanks for the
>>         guidance.
>>
>>         On Sun, Jan 11, 2015 at 6:52 PM, Rob Miller <[email protected]
>>         <mailto:[email protected]>
>>         <mailto:[email protected] <mailto:[email protected]>>> wrote:
>>
>>
>>
>>              On 01/11/2015 12:34 AM, Tiru Srikantha wrote:
>>
>>                  Yeah, I'm looking at this as well. I don't like losing
>>         the bulk
>>                  because
>>                  that means it gets slow when your load gets high due to
>>         HTTP
>>                  overhead.
>>                  If I didn't care about bulk it'd just be a quick
>>         rewrite. The other
>>                  problem I ran into around bulk operations on a buffered
>>         queue is
>>                  that
>>                  there are 4 points when you want to flush to the output:
>>
>>                  1. Queued messages count equals count to send.
>>                  2. Queued message size with the new message added will
>>         exceed
>>                  the max
>>                  bulk message size, even if the count is less than max,
>>         due to large
>>                  messages.
>>                  3. A timer expires to force a flush to the output
>>         target even if the
>>                  count or max size hasn't been hit yet, to get the data
>>         into the
>>                  output
>>                  target in a timely manner.
>>                  4. Plugin is shutting down.
>>
>>                  I'm actually re-writing a lot of the
>>         plugins/buffered_output.go
>>                  file in
>>                  my local fork because of this and teasing out the "read
>>         the next
>>                  record
>>                  from the file pile" operation from the "send the next
>>         record"
>>                  operation,
>>                  so what I'm aiming for is something like a
>>         BulkBufferedOutput
>>                  interface
>>                  with a SendRecords(records [][]byte) method that must be
>>                  implemented in
>>                  lieu of BufferedOutput's SendRecord(record []byte) and
>>         some code
>>                  that's
>>                  shared between them to buffer new messages and read
>>         buffered
>>                  messages.
>>                  SendRecords would not advance the cursor until the bulk
>>         operation
>>                  succeeded, as you might expect.
>>
>>
>>              I recommend reading my other message in this thread
>>              (http://is.gd/kzuhxs) for an alternate approach. I think
>>         you can
>>              achieve what you want with less effort, and better
>>         separation of
>>              concerns, by doing the batching *before* you pass data in
>>         to the
>>              BufferedOutput. Ultimately each buffer "record" is just a
>>         slice of
>>              bytes, the buffer doesn't need to know or care if that slice
>>              contains a single serialized message or an accumulated
>>         batch of them.
>>
>>                  I'll submit a PR once I finish.
>>
>>
>>              I always look forward to PRs, but I'll warn you that one
>>         that the
>>              approach described above will likely be rejected. I'm not
>>         very keen
>>              on introducing a separate buffering interface specifically
>> for
>>              batches, when a simpler change can solve the same problems.
>>
>>              -r
>>
>>
>>
>>
>>
>
_______________________________________________
Heka mailing list
[email protected]
https://mail.mozilla.org/listinfo/heka

Reply via email to