Hi,
The reason you're not seeing any payload data is that the messages are protocol
buffer encoded when they hit the second Heka instance, so they need to be
decoded from protobuf before you can get at the contained data. You didn't
specify an encoder for your TcpOutput, and if you read the TcpOutput docs
carefully you'll note that it will default to using ProtobufEncoding with
Heka's stream framing.
On the receiving end, the TcpInput defaults to using HekaFramingSplitter with
ProtobufDecoder. This means if you don't specify any other decoder, the
messages will be decoded properly and you'll see messages with the payload you
expect flowing through the pipeline. So far so good.
As soon as you added the CombinedLogDecoder on the remote end, however, things
go pear-shaped, because you've now overridden the default ProtobufDecoder. But
the raw data is still coming through in protobuf format, and the
CombinedLogDecoder has no idea what to do with it.
So what to do? You have a few options. The first is to use a MultiDecoder, with
cascade_strategy 'all', and explicitly add a ProtobufDecoder in the chain before the
CombinedLogDecoder. The next, which is likely more efficient (although I haven't
benchmarked so you might want to test to be sure), would be to do the protobuf decoding
in the sandbox. If you call `local msgBytes = read_message("raw")` in the
sandbox Heka will return the raw protobuf bytes. Then you can call `msg =
decode_message(msgBytes)` to get a Lua table containing the contents of the decoded
message. `msg.Payload` will then contain the payload value you're looking for. You could
insert that code before the rest of the parsing code and you should be able to do what
you need to do.
For completeness' sake I feel compelled to mention that you could decide to
skip the protobuf encoding altogether. If you used a PayloadEncoder on your
TcpOutput, you'd end up sending the data over the wire in raw text format. Then
on the remote side you'd use a TokenSplitter to split on newlines and the
original CombinedLogDecoder would work, since the data you care about would be
sitting in the message payload. This might save a few cycles since you don't
have to protobuf decode each message, but it would also mean that your TcpInput
would only be useful for exactly these message types. I'd probably go with one
of the first two options, myself.
Hope this helps,
-r
On 04/13/2015 02:17 PM, Giordano, J C. wrote:
Heka community:
I’m new to Heka and am having some difficulties setting up delivery of
Apache log messages from a local running Heka agent to a remote Heka
instance via TCP Output/Input plugins. The issue is directly coupled
to using the Apache Access Log Decoder configured on at the remote Heka
instance. When this is configured on my TCPInput there is no message
Payload available to the [lua] decoder. I have been able to create a
working configuration that does not use the Apache Access Log Decoder
but would like to request assistance on how to troubleshoot this issue
further.
What I have done thus far is to modify the function process_message() in
the file: lua_decoders/apache_access.lua I’ve added a field called
tcplog containing the Payload to verify there is no message available to
parse. The modification I’ve made is described following the
configurations below of my working configuration and non-working
configuration for comparison.
I must add that I am able to use the Apache Access Log Decoder with the
LogstreamerInput to process local files. So, this issue is specifically
related to the TCPInput/Apache Access Log Decoder combination.
My installation is Heka 0.9.1 on Ubuntu 14.04
# hekad -version
0.9.1
# lsb_release -a
No LSB modules are available.
Distributor ID:Ubuntu
Description:Ubuntu 14.04.2 LTS
Release:14.04
Codename:trusty
I didn’t see any outstanding bugs related to my issues. Any advice
would be greatly appreciated.
Thanks,
Chris
Local running Heka agent used to ship Apache logs to remote Heka instance:
****************************************************************************************
[test_com]
type = "LogstreamerInput"
log_directory = "/export/test/apache2/test_com"
file_match = '/(?P<Year>)\d+/(?P<Month>\d+)_(?P<Day>\d+)_access\.log'
priority = ["Year", "Month", "Day"]
[aggregator_output]
type = "TcpOutput"
address = "10.10.10.1:5565"
message_matcher = “TRUE”
Remote Heka instance - Working config
****************************************************************************************
[TcpInput]
address = ":5565"
[Influxdb]
type = "SandboxEncoder"
filename = "lua_encoders/schema_influx.lua"
[Influxdb.config]
series = "%{logger}"
skip_fields = "Pid EnvVersion"
[FileOutput]
message_matcher = "TRUE"
path = "/home/giordano/heka/output.log"
perm = "775"
flush_count = 100
flush_operator = "OR"
encoder = "Influxdb"
Remote Heka instance - Configuration not working config
****************************************************************************************
[TcpInput]
address = ":5565"
decoder = "CombinedLogDecoder"
[CombinedLogDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/apache_access.lua"
[CombinedLogDecoder.config]
type = "combinedutrack"
user_agent_transform = false
payload_keep = true
# combinedutrack log format
log_format = "%v %h %l %u %t \"%r\" %s %b \"%{Referer}i\"
\"%{User-Agent}i\" \"%{Cookie}i\""
[Influxdb]
type = "SandboxEncoder"
filename = "lua_encoders/schema_influx.lua"
[Influxdb.config]
series = "%{logger}"
skip_fields = "Pid EnvVersion"
[FileOutput]
message_matcher = "TRUE"
path = "/home/giordano/heka/output.log"
perm = "775"
flush_count = 100
flush_operator = "OR"
encoder = “Influxdb"
Additions to the apache access log decoder process_message() function
****************************************************************************************
function process_message ()
local log = read_message("Payload")
local fields = grammar:match(log)
— if not fields then return -1 end
if not fields then fields = {} end
fields.tcplog = log
msg.Timestamp = fields.time
fields.time = nil
…
Sample output from non working configuration
****************************************************************************************
[{"points":[[1427985263000,"combinedutrack","","","",7,""]],"name":"%{logger}",”columns":["time","Type","Payload","Hostname","Logger","Severity",”tcplog”]}]
_______________________________________________
Heka mailing list
[email protected]
https://mail.mozilla.org/listinfo/heka
_______________________________________________
Heka mailing list
[email protected]
https://mail.mozilla.org/listinfo/heka