Just realized I had a couple typos in my index name. Replace "222-foo-2015.03.31" with "123-foo-2015.03.31".
Aaron On Tue, Mar 31, 2015 at 12:23 PM, Aaron Feng <[email protected]> wrote: > I'm running into some issues when I'm trying to setup Heka to send > json log files to Elastic Search. The general issue is that not all > the data in the log file is being sent to heka. I'm running the late > release (0.9.1). > > 1. For some reason only the first line is sent to ES. I checked the > logstreamer cache (look below), it shows that it has read the whole > file. My guess is that the problem is my custom decoder? I have > wrapped the json decode and encode with pcall so it should catch all > the errors. > > 2. What's the best way to enrich incoming log lines? I want to > inject a new field into every log line that is being processed. Is > there a better way than what I'm currently doing (see lua script > below)? > > 3. The single and only log line has been sent to ES has the index > appended to the end of json. > > {"bar":"foo","log":"awesome1"}{"index":{"_index":"222-foo-2015.03.31","_type":"7d98073d388e"}} > > I know it's using the bulk api, but this doesn't seem correct (more below). > > Sorry for the long post, but I have been debugging for the past couple > days with no luck. > Below is code that I'm seeing this behavior with and various debugging output. > > Log file: > > // https://gist.github.com/aaronfeng/607e965513a38bff8c9d > {"log": "awesome1"} > {"log": "awesome2"} > {"log": "awesome3"} > {"log": "awesome4"} > {"log": "awesome5"} > {"log": "awesome6"} > {"log": "awesome7"} > {"log": "awesome8"} > {"log": "awesome9"} > {"log": "awesome10"} > > hekad.toml > > // https://gist.github.com/aaronfeng/29629f2e7f33aae83725 > [foo_decoder] > type = "SandboxDecoder" > filename = "/etc/heka/foo_decoder.lua" > > [Logs] > type = "LogstreamerInput" > log_directory = "/var/log" > file_match = 'foo/(?P<Logger>[0-9a-z]+)-json\.log' > priority = ["^Logger"] > differentiator = ["Logger"] > decoder = "foo_decoder" > > [es_payload] > type = "SandboxEncoder" > filename = "lua_encoders/es_payload.lua" > [es_payload.config] > es_index_from_timestamp = true > index = "123-%{Logger}-%{%Y.%m.%d}" > type_name = "%{Hostname}" > > [ElasticSearchOutput] > message_matcher = "TRUE" > encoder = "es_payload" > server = "http://localhost:9200" > queue_full_action = "drop" > flush_interval = 5000 > flush_count = 10 > > Custom lua decoder: > > // https://gist.github.com/aaronfeng/5ae1db42c419f4c5ad4a > local cjson = require("cjson") > > function process_message() > local payload_str = read_message("Payload") > > local ok, payload = pcall(cjson.decode, payload_str) > > if not ok then > return -1 > end > > payload["bar"] = read_message("Logger") > > local ok, payload_with_bar = pcall(cjson.encode, payload) > > if not ok then > return -1 > end > > write_message("Payload", payload_with_bar) > return 0 > end > > $ curl 'localhost:9200/123-foo-2015.03.31/_search?pretty' > { > "took" : 3, > "timed_out" : false, > "_shards" : { > "total" : 5, > "successful" : 5, > "failed" : 0 > }, > "hits" : { > "total" : 1, > "max_score" : 1.0, > "hits" : [ { > "_index" : "222-foo-2015.03.31", > "_type" : "7d98073d388e", > "_id" : "AUxwiBVWybZUhE047CW2", > "_score" : 1.0, > > "_source":{"bar":"foo","log":"awesome1"}{"index":{"_index":"222-foo-2015.03.31","_type":"7d98073d388e"}} > } ] > } > } > > cat /var/cache/hekad/logstreamer/foo > {"seek":201,"file_name":"/var/log/foo/foo-json.log","last_hash":"0148f620988f958494ef181d1ce78897bd850e6d"}% _______________________________________________ Heka mailing list [email protected] https://mail.mozilla.org/listinfo/heka

