An option would be to increase these elasticsearch output configurations: flush_interval and flush_count ES could handle better the cluster synchronisation load with large bulks ..
http://hekad.readthedocs.io/en/v0.10.0/config/outputs/elasticsearch.html 2016-04-29 12:52 GMT+02:00 Ramin Ali Dousti <dou...@gmail.com>: > This behavior is being seen in testing. I clean up all my existing > indices, so we start from 0 index. There are 4 data nodes and 6 shards per > index and replication factor of 2. I just push 30,000 documents and am > seeing this behavior consistently. Not sure what to tweak... > > On Fri, Apr 29, 2016 at 4:56 AM, Swann Croiset <swan...@gmail.com> wrote: > >> Hi, >> >> ok, my only recomendation is: you should fix your Elasticsearch cluster >> to be able to handle the load because it seems the shards synchronisation >> is too slow [0]. >> how many ES nodes, indices and shards do you have ? >> >> IMHO, the implementation in Heka ES plugin of a retry strategy per >> document would be quite expensive and surely inefficient. >> >> [0] >> https://discuss.elastic.co/t/elasticsearch-2-2-0-i-am-occasionally-getting-process-cluster-event-timeout-exception-failed-to-process-cluster-event-put-mapping-as-within-30s-while-bulk-indexing-documents/42305/3 >> >> >> 2016-04-28 14:54 GMT+02:00 Ramin Ali Dousti <dou...@gmail.com>: >> >>> Hi, >>> >>> The ES version is "2.2.0". >>> >>> This is the HTTP response. Look for the status 503 in the payload: >>> >>> T 127.0.0.1:9200 -> 127.0.0.1:34497 [AP] >>> HTTP/1.1 200 OK. >>> Content-Type: application/json; charset=UTF-8. >>> Content-Length: 3770. >>> >>> { >>> >>> - "took": 39911, >>> - "errors": true, >>> - "items": [ >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.04.26", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qa", >>> - "status": 503, >>> - "error": { >>> - "type": "process_cluster_event_timeout_exception", >>> - "reason": "failed to process cluster event >>> (put-mapping [WAF]) within 30s" >>> } >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.04.26", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qb", >>> - "status": 503, >>> - "error": { >>> - "type": "process_cluster_event_timeout_exception", >>> - "reason": "failed to process cluster event >>> (put-mapping [WAF]) within 30s" >>> } >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.04.26", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qc", >>> - "_version": 1, >>> - "_shards": { >>> - "total": 3, >>> - "successful": 3, >>> - "failed": 0 >>> }, >>> - "status": 201 >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.03.28", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qd", >>> - "_version": 1, >>> - "_shards": { >>> - "total": 3, >>> - "successful": 3, >>> - "failed": 0 >>> }, >>> - "status": 201 >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.03.28", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qe", >>> - "status": 503, >>> - "error": { >>> - "type": "process_cluster_event_timeout_exception", >>> - "reason": "failed to process cluster event >>> (put-mapping [WAF]) within 30s" >>> } >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.03.28", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qf", >>> - "_version": 1, >>> - "_shards": { >>> - "total": 3, >>> - "successful": 3, >>> - "failed": 0 >>> }, >>> - "status": 201 >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.04.26", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qg", >>> - "status": 503, >>> - "error": { >>> - "type": "process_cluster_event_timeout_exception", >>> - "reason": "failed to process cluster event >>> (put-mapping [WAF]) within 30s" >>> } >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.04.26", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qh", >>> - "_version": 1, >>> - "_shards": { >>> - "total": 3, >>> - "successful": 3, >>> - "failed": 0 >>> }, >>> - "status": 201 >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.04.26", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qi", >>> - "_version": 1, >>> - "_shards": { >>> - "total": 3, >>> - "successful": 3, >>> - "failed": 0 >>> }, >>> - "status": 201 >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.04.26", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qj", >>> - "_version": 1, >>> - "_shards": { >>> - "total": 3, >>> - "successful": 3, >>> - "failed": 0 >>> }, >>> - "status": 201 >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.04.26", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qk", >>> - "_version": 1, >>> - "_shards": { >>> - "total": 3, >>> - "successful": 3, >>> - "failed": 0 >>> }, >>> - "status": 201 >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.04.26", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4ql", >>> - "_version": 1, >>> - "_shards": { >>> - "total": 3, >>> - "successful": 3, >>> - "failed": 0 >>> }, >>> - "status": 201 >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.03.28", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qm", >>> - "_version": 1, >>> - "_shards": { >>> - "total": 3, >>> - "successful": 3, >>> - "failed": 0 >>> }, >>> - "status": 201 >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.03.28", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qn", >>> - "_version": 1, >>> - "_shards": { >>> - "total": 3, >>> - "successful": 3, >>> - "failed": 0 >>> }, >>> - "status": 201 >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.03.28", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qo", >>> - "_version": 1, >>> - "_shards": { >>> - "total": 3, >>> - "successful": 3, >>> - "failed": 0 >>> }, >>> - "status": 201 >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.04.26", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qp", >>> - "_version": 1, >>> - "_shards": { >>> - "total": 3, >>> - "successful": 3, >>> - "failed": 0 >>> }, >>> - "status": 201 >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.04.26", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qq", >>> - "_version": 1, >>> - "_shards": { >>> - "total": 3, >>> - "successful": 3, >>> - "failed": 0 >>> }, >>> - "status": 201 >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.04.26", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qr", >>> - "_version": 1, >>> - "_shards": { >>> - "total": 3, >>> - "successful": 3, >>> - "failed": 0 >>> }, >>> - "status": 201 >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.04.26", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qs", >>> - "_version": 1, >>> - "_shards": { >>> - "total": 3, >>> - "successful": 3, >>> - "failed": 0 >>> }, >>> - "status": 201 >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.04.26", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qt", >>> - "_version": 1, >>> - "_shards": { >>> - "total": 3, >>> - "successful": 3, >>> - "failed": 0 >>> }, >>> - "status": 201 >>> } >>> }, >>> - { >>> - "create": { >>> - "_index": "vdps-log-wf-2016.04.26", >>> - "_type": "WAF", >>> - "_id": "AVRTguzBSCHabxnyv4qu", >>> - "_version": 1, >>> - "_shards": { >>> - "total": 3, >>> - "successful": 3, >>> - "failed": 0 >>> }, >>> - "status": 201 >>> } >>> } >>> ] >>> >>> } >>> >>> >>> On Wed, Apr 27, 2016 at 3:41 AM, Swann Croiset <swan...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> IIRC, according to the code you're right, heka doesn't handle such case. >>>> >>>> That said, I've never seen a such ES response ... I'm interested about >>>> it. >>>> >>>> Could you share these informations: the ES response, the ES version and >>>> ES logs (when the thing happens) >>>> also, what is your configuration on ES side? index template, field >>>> mapping ? >>>> >>>> -- >>>> Swann >>>> >>>> >>>> >>>> 2016-04-26 22:28 GMT+02:00 Ramin Ali Dousti <dou...@gmail.com>: >>>> >>>>> Hi, >>>>> >>>>> I have an ES output that bulk uploads to a cluster. The HTTP status >>>>> code is 200 OK but the reply payload says that it failed the upload for a >>>>> few of the items. But heka doesn't seem to care about the failed items. I >>>>> looked at the code and it says: >>>>> >>>>> >>>>> https://github.com/mozilla-services/heka/blob/dev/plugins/elasticsearch/elasticsearch.go#L429 >>>>> >>>>> if response != nil { >>>>> >>>>> defer response.Body.Close() >>>>> >>>>> if response_body, err = ioutil.ReadAll(response.Body); err != >>>>> nil { >>>>> >>>>> return fmt.Errorf("Can't read HTTP response body. >>>>> Status: %s. Error: %s", >>>>> >>>>> response.Status, err.Error()), true >>>>> >>>>> } >>>>> >>>>> err = json.Unmarshal(response_body, &response_body_json) >>>>> >>>>> if err != nil { >>>>> >>>>> return fmt.Errorf("HTTP response didn't contain valid >>>>> JSON. Status: %s. Body: %s", >>>>> >>>>> response.Status, string(response_body)), true >>>>> >>>>> } >>>>> >>>>> json_errors, ok := response_body_json["errors"].(bool) >>>>> >>>>> if ok && json_errors && response.StatusCode != 200 { >>>>> >>>>> return fmt.Errorf( >>>>> >>>>> "ElasticSearch server reported error within >>>>> JSON. Status: %s. Body: %s", >>>>> >>>>> response.Status, string(response_body)), false >>>>> >>>>> } >>>>> >>>>> if response.StatusCode > 304 { >>>>> >>>>> return fmt.Errorf("HTTP response error. Status: %s. >>>>> Body: %s", response.Status, >>>>> >>>>> string(response_body)), false >>>>> >>>>> } >>>>> >>>>> } >>>>> >>>>> >>>>> 1- In my case I see a 200 OK with "errors = true" which does not seem >>>>> to be caught, according to the code. >>>>> 2- I don't see any logic for recovery based on individual items. Am I >>>>> missing anything here? >>>>> >>>>> >>>>> -- >>>>> Ramin >>>>> >>>>> _______________________________________________ >>>>> Heka mailing list >>>>> Heka@mozilla.org >>>>> https://mail.mozilla.org/listinfo/heka >>>>> >>>>> >>>> >>> >>> >>> -- >>> Ramin >>> >> >> > > > -- > Ramin >
_______________________________________________ Heka mailing list Heka@mozilla.org https://mail.mozilla.org/listinfo/heka