Pretty sure my problems will be solved by this PR: https://github.com/mozilla-services/heka/pull/1457
Aaron On Tue, Mar 31, 2015 at 3:44 PM, Aaron Feng <[email protected]> wrote: > Just realized I had a couple typos in my index name. > > Replace "222-foo-2015.03.31" with "123-foo-2015.03.31". > > Aaron > > On Tue, Mar 31, 2015 at 12:23 PM, Aaron Feng <[email protected]> wrote: >> I'm running into some issues when I'm trying to setup Heka to send >> json log files to Elastic Search. The general issue is that not all >> the data in the log file is being sent to heka. I'm running the late >> release (0.9.1). >> >> 1. For some reason only the first line is sent to ES. I checked the >> logstreamer cache (look below), it shows that it has read the whole >> file. My guess is that the problem is my custom decoder? I have >> wrapped the json decode and encode with pcall so it should catch all >> the errors. >> >> 2. What's the best way to enrich incoming log lines? I want to >> inject a new field into every log line that is being processed. Is >> there a better way than what I'm currently doing (see lua script >> below)? >> >> 3. The single and only log line has been sent to ES has the index >> appended to the end of json. >> >> {"bar":"foo","log":"awesome1"}{"index":{"_index":"222-foo-2015.03.31","_type":"7d98073d388e"}} >> >> I know it's using the bulk api, but this doesn't seem correct (more below). >> >> Sorry for the long post, but I have been debugging for the past couple >> days with no luck. >> Below is code that I'm seeing this behavior with and various debugging >> output. >> >> Log file: >> >> // https://gist.github.com/aaronfeng/607e965513a38bff8c9d >> {"log": "awesome1"} >> {"log": "awesome2"} >> {"log": "awesome3"} >> {"log": "awesome4"} >> {"log": "awesome5"} >> {"log": "awesome6"} >> {"log": "awesome7"} >> {"log": "awesome8"} >> {"log": "awesome9"} >> {"log": "awesome10"} >> >> hekad.toml >> >> // https://gist.github.com/aaronfeng/29629f2e7f33aae83725 >> [foo_decoder] >> type = "SandboxDecoder" >> filename = "/etc/heka/foo_decoder.lua" >> >> [Logs] >> type = "LogstreamerInput" >> log_directory = "/var/log" >> file_match = 'foo/(?P<Logger>[0-9a-z]+)-json\.log' >> priority = ["^Logger"] >> differentiator = ["Logger"] >> decoder = "foo_decoder" >> >> [es_payload] >> type = "SandboxEncoder" >> filename = "lua_encoders/es_payload.lua" >> [es_payload.config] >> es_index_from_timestamp = true >> index = "123-%{Logger}-%{%Y.%m.%d}" >> type_name = "%{Hostname}" >> >> [ElasticSearchOutput] >> message_matcher = "TRUE" >> encoder = "es_payload" >> server = "http://localhost:9200" >> queue_full_action = "drop" >> flush_interval = 5000 >> flush_count = 10 >> >> Custom lua decoder: >> >> // https://gist.github.com/aaronfeng/5ae1db42c419f4c5ad4a >> local cjson = require("cjson") >> >> function process_message() >> local payload_str = read_message("Payload") >> >> local ok, payload = pcall(cjson.decode, payload_str) >> >> if not ok then >> return -1 >> end >> >> payload["bar"] = read_message("Logger") >> >> local ok, payload_with_bar = pcall(cjson.encode, payload) >> >> if not ok then >> return -1 >> end >> >> write_message("Payload", payload_with_bar) >> return 0 >> end >> >> $ curl 'localhost:9200/123-foo-2015.03.31/_search?pretty' >> { >> "took" : 3, >> "timed_out" : false, >> "_shards" : { >> "total" : 5, >> "successful" : 5, >> "failed" : 0 >> }, >> "hits" : { >> "total" : 1, >> "max_score" : 1.0, >> "hits" : [ { >> "_index" : "222-foo-2015.03.31", >> "_type" : "7d98073d388e", >> "_id" : "AUxwiBVWybZUhE047CW2", >> "_score" : 1.0, >> >> "_source":{"bar":"foo","log":"awesome1"}{"index":{"_index":"222-foo-2015.03.31","_type":"7d98073d388e"}} >> } ] >> } >> } >> >> cat /var/cache/hekad/logstreamer/foo >> {"seek":201,"file_name":"/var/log/foo/foo-json.log","last_hash":"0148f620988f958494ef181d1ce78897bd850e6d"}% _______________________________________________ Heka mailing list [email protected] https://mail.mozilla.org/listinfo/heka

