Just realized I had a couple typos in my index name.

Replace "222-foo-2015.03.31" with "123-foo-2015.03.31".

Aaron

On Tue, Mar 31, 2015 at 12:23 PM, Aaron Feng <[email protected]> wrote:
> I'm running into some issues when I'm trying to setup Heka to send
> json log files to Elastic Search.  The general issue is that not all
> the data in the log file is being sent to heka.  I'm running the late
> release (0.9.1).
>
> 1.  For some reason only the first line is sent to ES.  I checked the
> logstreamer cache (look below), it shows that it has read the whole
> file.  My guess is that the problem is my custom decoder?  I have
> wrapped the json decode and encode with pcall so it should catch all
> the errors.
>
> 2.  What's the best way to enrich incoming log lines?  I want to
> inject a new field into every log line that is being processed.  Is
> there a better way than what I'm currently doing (see lua script
> below)?
>
> 3.  The single and only log line has been sent to ES has the index
> appended to the end of json.
>
> {"bar":"foo","log":"awesome1"}{"index":{"_index":"222-foo-2015.03.31","_type":"7d98073d388e"}}
>
> I know it's using the bulk api, but this doesn't seem correct (more below).
>
> Sorry for the long post, but I have been debugging for the past couple
> days with no luck.
> Below is code that I'm seeing this behavior with and various debugging output.
>
> Log file:
>
> // https://gist.github.com/aaronfeng/607e965513a38bff8c9d
> {"log": "awesome1"}
> {"log": "awesome2"}
> {"log": "awesome3"}
> {"log": "awesome4"}
> {"log": "awesome5"}
> {"log": "awesome6"}
> {"log": "awesome7"}
> {"log": "awesome8"}
> {"log": "awesome9"}
> {"log": "awesome10"}
>
> hekad.toml
>
> // https://gist.github.com/aaronfeng/29629f2e7f33aae83725
> [foo_decoder]
> type = "SandboxDecoder"
> filename = "/etc/heka/foo_decoder.lua"
>
> [Logs]
> type = "LogstreamerInput"
> log_directory = "/var/log"
> file_match = 'foo/(?P<Logger>[0-9a-z]+)-json\.log'
> priority = ["^Logger"]
> differentiator = ["Logger"]
> decoder = "foo_decoder"
>
> [es_payload]
> type = "SandboxEncoder"
> filename = "lua_encoders/es_payload.lua"
>     [es_payload.config]
>     es_index_from_timestamp = true
>     index = "123-%{Logger}-%{%Y.%m.%d}"
>     type_name = "%{Hostname}"
>
> [ElasticSearchOutput]
> message_matcher = "TRUE"
> encoder = "es_payload"
> server = "http://localhost:9200";
> queue_full_action = "drop"
> flush_interval = 5000
> flush_count = 10
>
> Custom lua decoder:
>
> // https://gist.github.com/aaronfeng/5ae1db42c419f4c5ad4a
> local cjson = require("cjson")
>
> function process_message()
>     local payload_str = read_message("Payload")
>
>     local ok, payload = pcall(cjson.decode, payload_str)
>
>     if not ok then
>       return -1
>     end
>
>     payload["bar"] = read_message("Logger")
>
>     local ok, payload_with_bar = pcall(cjson.encode, payload)
>
>     if not ok then
>       return -1
>     end
>
>     write_message("Payload", payload_with_bar)
>     return 0
> end
>
> $ curl 'localhost:9200/123-foo-2015.03.31/_search?pretty'
> {
>   "took" : 3,
>   "timed_out" : false,
>   "_shards" : {
>     "total" : 5,
>     "successful" : 5,
>     "failed" : 0
>   },
>   "hits" : {
>     "total" : 1,
>     "max_score" : 1.0,
>     "hits" : [ {
>       "_index" : "222-foo-2015.03.31",
>       "_type" : "7d98073d388e",
>       "_id" : "AUxwiBVWybZUhE047CW2",
>       "_score" : 1.0,
>       
> "_source":{"bar":"foo","log":"awesome1"}{"index":{"_index":"222-foo-2015.03.31","_type":"7d98073d388e"}}
>     } ]
>   }
> }
>
> cat /var/cache/hekad/logstreamer/foo
> {"seek":201,"file_name":"/var/log/foo/foo-json.log","last_hash":"0148f620988f958494ef181d1ce78897bd850e6d"}%
_______________________________________________
Heka mailing list
[email protected]
https://mail.mozilla.org/listinfo/heka

Reply via email to