I'm running into some issues when I'm trying to setup Heka to send
json log files to Elastic Search.  The general issue is that not all
the data in the log file is being sent to heka.  I'm running the late
release (0.9.1).

1.  For some reason only the first line is sent to ES.  I checked the
logstreamer cache (look below), it shows that it has read the whole
file.  My guess is that the problem is my custom decoder?  I have
wrapped the json decode and encode with pcall so it should catch all
the errors.

2.  What's the best way to enrich incoming log lines?  I want to
inject a new field into every log line that is being processed.  Is
there a better way than what I'm currently doing (see lua script
below)?

3.  The single and only log line has been sent to ES has the index
appended to the end of json.

{"bar":"foo","log":"awesome1"}{"index":{"_index":"222-foo-2015.03.31","_type":"7d98073d388e"}}

I know it's using the bulk api, but this doesn't seem correct (more below).

Sorry for the long post, but I have been debugging for the past couple
days with no luck.
Below is code that I'm seeing this behavior with and various debugging output.

Log file:

// https://gist.github.com/aaronfeng/607e965513a38bff8c9d
{"log": "awesome1"}
{"log": "awesome2"}
{"log": "awesome3"}
{"log": "awesome4"}
{"log": "awesome5"}
{"log": "awesome6"}
{"log": "awesome7"}
{"log": "awesome8"}
{"log": "awesome9"}
{"log": "awesome10"}

hekad.toml

// https://gist.github.com/aaronfeng/29629f2e7f33aae83725
[foo_decoder]
type = "SandboxDecoder"
filename = "/etc/heka/foo_decoder.lua"

[Logs]
type = "LogstreamerInput"
log_directory = "/var/log"
file_match = 'foo/(?P<Logger>[0-9a-z]+)-json\.log'
priority = ["^Logger"]
differentiator = ["Logger"]
decoder = "foo_decoder"

[es_payload]
type = "SandboxEncoder"
filename = "lua_encoders/es_payload.lua"
    [es_payload.config]
    es_index_from_timestamp = true
    index = "123-%{Logger}-%{%Y.%m.%d}"
    type_name = "%{Hostname}"

[ElasticSearchOutput]
message_matcher = "TRUE"
encoder = "es_payload"
server = "http://localhost:9200";
queue_full_action = "drop"
flush_interval = 5000
flush_count = 10

Custom lua decoder:

// https://gist.github.com/aaronfeng/5ae1db42c419f4c5ad4a
local cjson = require("cjson")

function process_message()
    local payload_str = read_message("Payload")

    local ok, payload = pcall(cjson.decode, payload_str)

    if not ok then
      return -1
    end

    payload["bar"] = read_message("Logger")

    local ok, payload_with_bar = pcall(cjson.encode, payload)

    if not ok then
      return -1
    end

    write_message("Payload", payload_with_bar)
    return 0
end

$ curl 'localhost:9200/123-foo-2015.03.31/_search?pretty'
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "222-foo-2015.03.31",
      "_type" : "7d98073d388e",
      "_id" : "AUxwiBVWybZUhE047CW2",
      "_score" : 1.0,
      
"_source":{"bar":"foo","log":"awesome1"}{"index":{"_index":"222-foo-2015.03.31","_type":"7d98073d388e"}}
    } ]
  }
}

cat /var/cache/hekad/logstreamer/foo
{"seek":201,"file_name":"/var/log/foo/foo-json.log","last_hash":"0148f620988f958494ef181d1ce78897bd850e6d"}%
_______________________________________________
Heka mailing list
[email protected]
https://mail.mozilla.org/listinfo/heka

Reply via email to