Thanks for the code review. I wrote that at 11pm and was going to go
over it again later today to clean it up, but you caught a couple things
I wouldn't have. queueFull got copied directly from TCPOutput and the
names changed, I was already going to go back and rework that for the
byte slice.
About framing:
http://hekad.readthedocs.org/en/v0.8.2/message/index.html#stream-framing
doesn't show a "trailer" character and
https://github.com/mozilla-services/heka/blob/dev/pipeline/stream_parser.go#L315
suggests to me that it will attempt to blindly grab `message_length -
header_size` bytes once it finds a header. It's definitely something to
make sure of for a more thorough implementation of buffering, I agree.
On Mon, Jan 12, 2015 at 2:08 PM, Rob Miller <[email protected]
<mailto:[email protected]>> wrote:
On 01/12/2015 11:52 AM, Tiru Srikantha wrote:
Yeah, the more I think about it the more I'm sure I was
overengineering.
Your point was a good one re: complexity. I'm not sure why we'd be
framing the messages anyway, though. Outer/inner framing
actually has no
purpose in this context since the records are JSON entries and the
format delimits the individual records. If we were doing binary
buffering, I'd say that more work was needed to ensure the
lengths were
read properly, but for this specific plugin it wouldn't be a problem
unless I totally misunderstand framing.
No, you're understanding correctly, I'm just thinking in the general
case. Our plan is to make the buffering functionality in
BufferedOutput available to all outputs, so any batching support
that's added needs to be able to support generic data streams. We
have to make sure that if someone decides to buffer a batch of
framed, protobuf encoded Heka messages that the buffer's batch
framing and the individual message framing don't interfere with each
other.
I implemented it
(https://github.com/__highlyunavailable/heka/__compare/feature/__bufferedelasticsearch?diff=__split&name=feature%__2Fbufferedelasticsearch
<https://github.com/highlyunavailable/heka/compare/feature/bufferedelasticsearch?diff=split&name=feature%2Fbufferedelasticsearch>)
and it works fine against a test ES instance without framing. I
tried
killing the instance and it errored as you might expect, then
immediately started sending data when the instance came back up
(including persisting the data through a Heka restart), so I
consider it
a success pending a bit more testing and cleaning up the unused
variables. This also got rid of the receiver/committer
goroutines from
`Init()` and changed the behavior from "block if the buffer is
full and
we haven't sent the buffer yet" to "write the buffer to disk and
send it
when we have time" which I consider a better/safer behavior than
leaving
packs in the queue until there's space to process them.
Notes from a quick peek:
* You don't need an `if` at the end of QueueRecord, just `return
b.QueueBytes(msgBytes)` should be fine.
* If you're storing the config on the ElasticSearchOutput struct,
you probably don't need to copy all of the attributes from the
config struct to the output struct. Is there a reason to set
`o.flushInterval = o.conf.FlushInterval` when you can just refer to
`o.conf.FlushInterval` anywhere?
* You're never checking the results of the QueueBytes call, and
instead you're checking if the result of the or.Encode call is
QueueIsFull, which doesn't really make much sense.
* The queueFull seems a bit off, it should be dealing w/ byte
slices, not a single pack. Also, a single success or failure might
represent an entire batch of messages, not just a single one, w.r.t.
your processMessageCount and dropMessageCount.
Also, the Processed Message Count seems to stay empty in the
dashboard
for the ES output - what do I need to implement to have it
properly show
how many messages it has sent to ES? I can't find anything in
https://hekad.readthedocs.org/__en/latest/developing/plugin.__html
<https://hekad.readthedocs.org/en/latest/developing/plugin.html>
about
what interfaces need to be implemented or where I need to
register to
have that happen. Just adding a ReportMessage method to satisfy
pipeline.ReportingPlugin didn't seem to do it.
Yes, this is a bit awkward. Right now you have to implement it on a
case by case basis, when really Heka should be doing at least some
of this accounting for you. We have an issue open for it:
https://github.com/mozilla-__services/heka/issues/918
<https://github.com/mozilla-services/heka/issues/918>
Using ReportMessage is the way to make it work, however. You can see
how it's supposed to work in the SandboxFilter implementation:
https://github.com/mozilla-__services/heka/blob/dev/__sandbox/plugins/sandbox___filter.go#L120
<https://github.com/mozilla-services/heka/blob/dev/sandbox/plugins/sandbox_filter.go#L120>
Hope this helps,
-r
On Mon, Jan 12, 2015 at 11:13 AM, Rob Miller
<[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>> wrote:
This is true, but this is true no matter how the buffering is
engineered. There might be messages sitting on the router's
channel,
or the output matcher's channel, or the output's channel,
all of
which would be lost if Heka crashes. I've imagined the
buffering to
be protecting against downtime and/or slowness of the
downstream
destination, not as a delivery guarantee mechanism. I wish
I could
tell you Heka guarantees message delivery, but it doesn't.
I'd love
to add support, but it would need to be considered within the
context of the whole pipeline, not just a single point.
That being said, harm reduction is reasonable, and I'm not
dead set
against being able to buffer partial batches. But I still
shy away
from the idea of having a separate SendRecords function for
bulk
sending, if possible. Maybe a buffer that was set up for
batching
could accumulate individual records to a short term buffer,
adding
them to the end of the send queue when a full batch is
acquired? I'm
open to further discussion.
Also, our message header contains a message length, so with
care it
should be possible to nest the framing, it's just that
confusing the
inner framing for the outer framing is a pitfall to beware.
-r
On 01/11/2015 11:03 PM, Tiru Srikantha wrote:
The simple approach means you can lose messages if the Heka
process dies
before the ticker expires or the maxcount is hit, as
well as the
nested
framing issues you raised. I'm probably mentally
over-engineering this,
though, and if it's good enough for you, then that's
what you
get. I'll
send a PR with an implementation based on what you
recommended - it
should be fairly simple given those limitations. Thanks
for the
guidance.
On Sun, Jan 11, 2015 at 6:52 PM, Rob Miller
<[email protected] <mailto:[email protected]>
<mailto:[email protected] <mailto:[email protected]>>
<mailto:[email protected]
<mailto:[email protected]> <mailto:[email protected]
<mailto:[email protected]>>>> wrote:
On 01/11/2015 12:34 AM, Tiru Srikantha wrote:
Yeah, I'm looking at this as well. I don't
like losing
the bulk
because
that means it gets slow when your load gets
high due to
HTTP
overhead.
If I didn't care about bulk it'd just be a quick
rewrite. The other
problem I ran into around bulk operations on a
buffered
queue is
that
there are 4 points when you want to flush to
the output:
1. Queued messages count equals count to send.
2. Queued message size with the new message
added will
exceed
the max
bulk message size, even if the count is less
than max,
due to large
messages.
3. A timer expires to force a flush to the output
target even if the
count or max size hasn't been hit yet, to get
the data
into the
output
target in a timely manner.
4. Plugin is shutting down.
I'm actually re-writing a lot of the
plugins/buffered_output.go
file in
my local fork because of this and teasing out
the "read
the next
record
from the file pile" operation from the "send
the next
record"
operation,
so what I'm aiming for is something like a
BulkBufferedOutput
interface
with a SendRecords(records [][]byte) method
that must be
implemented in
lieu of BufferedOutput's SendRecord(record
[]byte) and
some code
that's
shared between them to buffer new messages and
read
buffered
messages.
SendRecords would not advance the cursor until
the bulk
operation
succeeded, as you might expect.
I recommend reading my other message in this thread
(http://is.gd/kzuhxs) for an alternate approach. I
think
you can
achieve what you want with less effort, and better
separation of
concerns, by doing the batching *before* you pass
data in
to the
BufferedOutput. Ultimately each buffer "record" is
just a
slice of
bytes, the buffer doesn't need to know or care if
that slice
contains a single serialized message or an accumulated
batch of them.
I'll submit a PR once I finish.
I always look forward to PRs, but I'll warn you
that one
that the
approach described above will likely be rejected.
I'm not
very keen
on introducing a separate buffering interface
specifically for
batches, when a simpler change can solve the same
problems.
-r