Hi,

The reason you're not seeing any payload data is that the messages are protocol 
buffer encoded when they hit the second Heka instance, so they need to be 
decoded from protobuf before you can get at the contained data. You didn't 
specify an encoder for your TcpOutput, and if you read the TcpOutput docs 
carefully you'll note that it will default to using ProtobufEncoding with 
Heka's stream framing.

On the receiving end, the TcpInput defaults to using HekaFramingSplitter with 
ProtobufDecoder. This means if you don't specify any other decoder, the 
messages will be decoded properly and you'll see messages with the payload you 
expect flowing through the pipeline. So far so good.

As soon as you added the CombinedLogDecoder on the remote end, however, things 
go pear-shaped, because you've now overridden the default ProtobufDecoder. But 
the raw data is still coming through in protobuf format, and the 
CombinedLogDecoder has no idea what to do with it.

So what to do? You have a few options. The first is to use a MultiDecoder, with 
cascade_strategy 'all', and explicitly add a ProtobufDecoder in the chain before the 
CombinedLogDecoder. The next, which is likely more efficient (although I haven't 
benchmarked so you might want to test to be sure), would be to do the protobuf decoding 
in the sandbox. If you call `local msgBytes = read_message("raw")` in the 
sandbox Heka will return the raw protobuf bytes. Then you can call `msg = 
decode_message(msgBytes)` to get a Lua table containing the contents of the decoded 
message. `msg.Payload` will then contain the payload value you're looking for. You could 
insert that code before the rest of the parsing code and you should be able to do what 
you need to do.

For completeness' sake I feel compelled to mention that you could decide to 
skip the protobuf encoding altogether. If you used a PayloadEncoder on your 
TcpOutput, you'd end up sending the data over the wire in raw text format. Then 
on the remote side you'd use a TokenSplitter to split on newlines and the 
original CombinedLogDecoder would work, since the data you care about would be 
sitting in the message payload. This might save a few cycles since you don't 
have to protobuf decode each message, but it would also mean that your TcpInput 
would only be useful for exactly these message types. I'd probably go with one 
of the first two options, myself.

Hope this helps,

-r



On 04/13/2015 02:17 PM, Giordano, J C. wrote:
Heka community:

I’m new to Heka and am having some difficulties setting up delivery of
Apache log messages from a local running Heka agent to a remote Heka
instance via TCP Output/Input plugins.   The issue is directly coupled
to using the Apache Access Log Decoder configured on at the remote Heka
instance.  When this is configured on my TCPInput there is no message
Payload available to the [lua] decoder.  I have been able to create a
working configuration that does not use the Apache Access Log Decoder
but would like to request assistance on how to troubleshoot this issue
further.

What I have done thus far is to modify the function process_message() in
the file: lua_decoders/apache_access.lua I’ve added a field called
tcplog containing the Payload to verify there is no message available to
parse.  The modification I’ve made is described following the
configurations below of my working configuration and non-working
configuration for comparison.

I must add that I am able to use the Apache Access Log Decoder with the
LogstreamerInput to process local files.  So, this issue is specifically
related to the TCPInput/Apache Access Log Decoder combination.

My installation is Heka 0.9.1 on Ubuntu 14.04

# hekad -version
0.9.1

# lsb_release -a
No LSB modules are available.
Distributor ID:Ubuntu
Description:Ubuntu 14.04.2 LTS
Release:14.04
Codename:trusty

I didn’t see any outstanding bugs related to my issues.  Any advice
would be greatly appreciated.

Thanks,

Chris

Local running Heka agent used to ship Apache logs to remote Heka instance:

****************************************************************************************
[test_com]
type = "LogstreamerInput"
log_directory = "/export/test/apache2/test_com"
file_match = '/(?P<Year>)\d+/(?P<Month>\d+)_(?P<Day>\d+)_access\.log'
priority = ["Year", "Month", "Day"]

[aggregator_output]
type = "TcpOutput"
address = "10.10.10.1:5565"
message_matcher = “TRUE”

Remote Heka instance - Working config

****************************************************************************************
[TcpInput]
address = ":5565"

[Influxdb]
type = "SandboxEncoder"
filename = "lua_encoders/schema_influx.lua"

[Influxdb.config]
series = "%{logger}"
skip_fields = "Pid EnvVersion"

[FileOutput]
message_matcher = "TRUE"
path = "/home/giordano/heka/output.log"
perm = "775"
flush_count = 100
flush_operator = "OR"
encoder = "Influxdb"

Remote Heka instance - Configuration not working config

****************************************************************************************
[TcpInput]
address = ":5565"
decoder = "CombinedLogDecoder"

[CombinedLogDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/apache_access.lua"

[CombinedLogDecoder.config]
type = "combinedutrack"
user_agent_transform = false
payload_keep = true

# combinedutrack log format
log_format = "%v %h %l %u %t \"%r\" %s %b \"%{Referer}i\"
\"%{User-Agent}i\" \"%{Cookie}i\""

[Influxdb]
type = "SandboxEncoder"
filename = "lua_encoders/schema_influx.lua"

[Influxdb.config]
series = "%{logger}"
skip_fields = "Pid EnvVersion"

[FileOutput]
message_matcher = "TRUE"
path = "/home/giordano/heka/output.log"
perm = "775"
flush_count = 100
flush_operator = "OR"
encoder = “Influxdb"

Additions to the apache access log decoder process_message() function

****************************************************************************************

function process_message ()
     local log = read_message("Payload")
     local fields = grammar:match(log)
— if not fields then return -1 end
     if not fields then fields = {} end
     fields.tcplog = log

     msg.Timestamp = fields.time
     fields.time = nil
     …

Sample output from non working configuration

****************************************************************************************

[{"points":[[1427985263000,"combinedutrack","","","",7,""]],"name":"%{logger}",”columns":["time","Type","Payload","Hostname","Logger","Severity",”tcplog”]}]






_______________________________________________
Heka mailing list
[email protected]
https://mail.mozilla.org/listinfo/heka


_______________________________________________
Heka mailing list
[email protected]
https://mail.mozilla.org/listinfo/heka

Reply via email to