Thanks for the code review. I wrote that at 11pm and was going to go over it again later today to clean it up, but you caught a couple things I wouldn't have. queueFull got copied directly from TCPOutput and the names changed, I was already going to go back and rework that for the byte slice.
About framing: http://hekad.readthedocs.org/en/v0.8.2/message/index.html#stream-framing doesn't show a "trailer" character and https://github.com/mozilla-services/heka/blob/dev/pipeline/stream_parser.go#L315 suggests to me that it will attempt to blindly grab `message_length - header_size` bytes once it finds a header. It's definitely something to make sure of for a more thorough implementation of buffering, I agree. On Mon, Jan 12, 2015 at 2:08 PM, Rob Miller <[email protected]> wrote: > On 01/12/2015 11:52 AM, Tiru Srikantha wrote: > >> Yeah, the more I think about it the more I'm sure I was overengineering. >> Your point was a good one re: complexity. I'm not sure why we'd be >> framing the messages anyway, though. Outer/inner framing actually has no >> purpose in this context since the records are JSON entries and the >> format delimits the individual records. If we were doing binary >> buffering, I'd say that more work was needed to ensure the lengths were >> read properly, but for this specific plugin it wouldn't be a problem >> unless I totally misunderstand framing. >> > No, you're understanding correctly, I'm just thinking in the general case. > Our plan is to make the buffering functionality in BufferedOutput available > to all outputs, so any batching support that's added needs to be able to > support generic data streams. We have to make sure that if someone decides > to buffer a batch of framed, protobuf encoded Heka messages that the > buffer's batch framing and the individual message framing don't interfere > with each other. > >> I implemented it >> (https://github.com/highlyunavailable/heka/compare/feature/ >> bufferedelasticsearch?diff=split&name=feature%2Fbufferedelasticsearch) >> and it works fine against a test ES instance without framing. I tried >> killing the instance and it errored as you might expect, then >> immediately started sending data when the instance came back up >> (including persisting the data through a Heka restart), so I consider it >> a success pending a bit more testing and cleaning up the unused >> variables. This also got rid of the receiver/committer goroutines from >> `Init()` and changed the behavior from "block if the buffer is full and >> we haven't sent the buffer yet" to "write the buffer to disk and send it >> when we have time" which I consider a better/safer behavior than leaving >> packs in the queue until there's space to process them. >> > Notes from a quick peek: > > * You don't need an `if` at the end of QueueRecord, just `return > b.QueueBytes(msgBytes)` should be fine. > * If you're storing the config on the ElasticSearchOutput struct, you > probably don't need to copy all of the attributes from the config struct to > the output struct. Is there a reason to set `o.flushInterval = > o.conf.FlushInterval` when you can just refer to `o.conf.FlushInterval` > anywhere? > * You're never checking the results of the QueueBytes call, and instead > you're checking if the result of the or.Encode call is QueueIsFull, which > doesn't really make much sense. > * The queueFull seems a bit off, it should be dealing w/ byte slices, not > a single pack. Also, a single success or failure might represent an entire > batch of messages, not just a single one, w.r.t. your processMessageCount > and dropMessageCount. > >> Also, the Processed Message Count seems to stay empty in the dashboard >> for the ES output - what do I need to implement to have it properly show >> how many messages it has sent to ES? I can't find anything in >> https://hekad.readthedocs.org/en/latest/developing/plugin.html about >> what interfaces need to be implemented or where I need to register to >> have that happen. Just adding a ReportMessage method to satisfy >> pipeline.ReportingPlugin didn't seem to do it. >> > Yes, this is a bit awkward. Right now you have to implement it on a case > by case basis, when really Heka should be doing at least some of this > accounting for you. We have an issue open for it: > https://github.com/mozilla-services/heka/issues/918 > > Using ReportMessage is the way to make it work, however. You can see how > it's supposed to work in the SandboxFilter implementation: > https://github.com/mozilla-services/heka/blob/dev/sandbox/plugins/sandbox_ > filter.go#L120 > > Hope this helps, > > -r > > >> On Mon, Jan 12, 2015 at 11:13 AM, Rob Miller <[email protected] >> <mailto:[email protected]>> wrote: >> >> This is true, but this is true no matter how the buffering is >> engineered. There might be messages sitting on the router's channel, >> or the output matcher's channel, or the output's channel, all of >> which would be lost if Heka crashes. I've imagined the buffering to >> be protecting against downtime and/or slowness of the downstream >> destination, not as a delivery guarantee mechanism. I wish I could >> tell you Heka guarantees message delivery, but it doesn't. I'd love >> to add support, but it would need to be considered within the >> context of the whole pipeline, not just a single point. >> >> That being said, harm reduction is reasonable, and I'm not dead set >> against being able to buffer partial batches. But I still shy away >> from the idea of having a separate SendRecords function for bulk >> sending, if possible. Maybe a buffer that was set up for batching >> could accumulate individual records to a short term buffer, adding >> them to the end of the send queue when a full batch is acquired? I'm >> open to further discussion. >> >> Also, our message header contains a message length, so with care it >> should be possible to nest the framing, it's just that confusing the >> inner framing for the outer framing is a pitfall to beware. >> >> -r >> >> >> On 01/11/2015 11:03 PM, Tiru Srikantha wrote: >> >> The simple approach means you can lose messages if the Heka >> process dies >> before the ticker expires or the maxcount is hit, as well as the >> nested >> framing issues you raised. I'm probably mentally >> over-engineering this, >> though, and if it's good enough for you, then that's what you >> get. I'll >> send a PR with an implementation based on what you recommended - >> it >> should be fairly simple given those limitations. Thanks for the >> guidance. >> >> On Sun, Jan 11, 2015 at 6:52 PM, Rob Miller <[email protected] >> <mailto:[email protected]> >> <mailto:[email protected] <mailto:[email protected]>>> wrote: >> >> >> >> On 01/11/2015 12:34 AM, Tiru Srikantha wrote: >> >> Yeah, I'm looking at this as well. I don't like losing >> the bulk >> because >> that means it gets slow when your load gets high due to >> HTTP >> overhead. >> If I didn't care about bulk it'd just be a quick >> rewrite. The other >> problem I ran into around bulk operations on a buffered >> queue is >> that >> there are 4 points when you want to flush to the output: >> >> 1. Queued messages count equals count to send. >> 2. Queued message size with the new message added will >> exceed >> the max >> bulk message size, even if the count is less than max, >> due to large >> messages. >> 3. A timer expires to force a flush to the output >> target even if the >> count or max size hasn't been hit yet, to get the data >> into the >> output >> target in a timely manner. >> 4. Plugin is shutting down. >> >> I'm actually re-writing a lot of the >> plugins/buffered_output.go >> file in >> my local fork because of this and teasing out the "read >> the next >> record >> from the file pile" operation from the "send the next >> record" >> operation, >> so what I'm aiming for is something like a >> BulkBufferedOutput >> interface >> with a SendRecords(records [][]byte) method that must be >> implemented in >> lieu of BufferedOutput's SendRecord(record []byte) and >> some code >> that's >> shared between them to buffer new messages and read >> buffered >> messages. >> SendRecords would not advance the cursor until the bulk >> operation >> succeeded, as you might expect. >> >> >> I recommend reading my other message in this thread >> (http://is.gd/kzuhxs) for an alternate approach. I think >> you can >> achieve what you want with less effort, and better >> separation of >> concerns, by doing the batching *before* you pass data in >> to the >> BufferedOutput. Ultimately each buffer "record" is just a >> slice of >> bytes, the buffer doesn't need to know or care if that slice >> contains a single serialized message or an accumulated >> batch of them. >> >> I'll submit a PR once I finish. >> >> >> I always look forward to PRs, but I'll warn you that one >> that the >> approach described above will likely be rejected. I'm not >> very keen >> on introducing a separate buffering interface specifically >> for >> batches, when a simpler change can solve the same problems. >> >> -r >> >> >> >> >> >
_______________________________________________ Heka mailing list [email protected] https://mail.mozilla.org/listinfo/heka

