An option would be to increase these elasticsearch output configurations:
flush_interval  and flush_count
ES could handle better the cluster synchronisation load with large bulks ..

http://hekad.readthedocs.io/en/v0.10.0/config/outputs/elasticsearch.html


2016-04-29 12:52 GMT+02:00 Ramin Ali Dousti <dou...@gmail.com>:

> This behavior is being seen in testing. I clean up all my existing
> indices, so we start from 0 index. There are 4 data nodes and 6 shards per
> index and replication factor of 2. I just push 30,000 documents and am
> seeing this behavior consistently. Not sure what to tweak...
>
> On Fri, Apr 29, 2016 at 4:56 AM, Swann Croiset <swan...@gmail.com> wrote:
>
>> Hi,
>>
>> ok, my only recomendation is: you should fix your Elasticsearch cluster
>> to be able to handle the load because it seems the shards synchronisation
>> is too slow [0].
>> how many ES nodes, indices and shards do you have ?
>>
>> IMHO, the implementation in Heka ES plugin of a retry strategy per
>> document would be quite expensive and surely inefficient.
>>
>> [0]
>> https://discuss.elastic.co/t/elasticsearch-2-2-0-i-am-occasionally-getting-process-cluster-event-timeout-exception-failed-to-process-cluster-event-put-mapping-as-within-30s-while-bulk-indexing-documents/42305/3
>>
>>
>> 2016-04-28 14:54 GMT+02:00 Ramin Ali Dousti <dou...@gmail.com>:
>>
>>> Hi,
>>>
>>> The ES version is "2.2.0".
>>>
>>> This is the HTTP response. Look for the status 503 in the payload:
>>>
>>> T 127.0.0.1:9200 -> 127.0.0.1:34497 [AP]
>>> HTTP/1.1 200 OK.
>>> Content-Type: application/json; charset=UTF-8.
>>> Content-Length: 3770.
>>>
>>> {
>>>
>>>    - "took": 39911,
>>>    - "errors": true,
>>>    - "items": [
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.04.26",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qa",
>>>             - "status": 503,
>>>             - "error": {
>>>                - "type": "process_cluster_event_timeout_exception",
>>>                - "reason": "failed to process cluster event
>>>                (put-mapping [WAF]) within 30s"
>>>             }
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.04.26",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qb",
>>>             - "status": 503,
>>>             - "error": {
>>>                - "type": "process_cluster_event_timeout_exception",
>>>                - "reason": "failed to process cluster event
>>>                (put-mapping [WAF]) within 30s"
>>>             }
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.04.26",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qc",
>>>             - "_version": 1,
>>>             - "_shards": {
>>>                - "total": 3,
>>>                - "successful": 3,
>>>                - "failed": 0
>>>             },
>>>             - "status": 201
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.03.28",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qd",
>>>             - "_version": 1,
>>>             - "_shards": {
>>>                - "total": 3,
>>>                - "successful": 3,
>>>                - "failed": 0
>>>             },
>>>             - "status": 201
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.03.28",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qe",
>>>             - "status": 503,
>>>             - "error": {
>>>                - "type": "process_cluster_event_timeout_exception",
>>>                - "reason": "failed to process cluster event
>>>                (put-mapping [WAF]) within 30s"
>>>             }
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.03.28",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qf",
>>>             - "_version": 1,
>>>             - "_shards": {
>>>                - "total": 3,
>>>                - "successful": 3,
>>>                - "failed": 0
>>>             },
>>>             - "status": 201
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.04.26",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qg",
>>>             - "status": 503,
>>>             - "error": {
>>>                - "type": "process_cluster_event_timeout_exception",
>>>                - "reason": "failed to process cluster event
>>>                (put-mapping [WAF]) within 30s"
>>>             }
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.04.26",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qh",
>>>             - "_version": 1,
>>>             - "_shards": {
>>>                - "total": 3,
>>>                - "successful": 3,
>>>                - "failed": 0
>>>             },
>>>             - "status": 201
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.04.26",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qi",
>>>             - "_version": 1,
>>>             - "_shards": {
>>>                - "total": 3,
>>>                - "successful": 3,
>>>                - "failed": 0
>>>             },
>>>             - "status": 201
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.04.26",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qj",
>>>             - "_version": 1,
>>>             - "_shards": {
>>>                - "total": 3,
>>>                - "successful": 3,
>>>                - "failed": 0
>>>             },
>>>             - "status": 201
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.04.26",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qk",
>>>             - "_version": 1,
>>>             - "_shards": {
>>>                - "total": 3,
>>>                - "successful": 3,
>>>                - "failed": 0
>>>             },
>>>             - "status": 201
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.04.26",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4ql",
>>>             - "_version": 1,
>>>             - "_shards": {
>>>                - "total": 3,
>>>                - "successful": 3,
>>>                - "failed": 0
>>>             },
>>>             - "status": 201
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.03.28",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qm",
>>>             - "_version": 1,
>>>             - "_shards": {
>>>                - "total": 3,
>>>                - "successful": 3,
>>>                - "failed": 0
>>>             },
>>>             - "status": 201
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.03.28",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qn",
>>>             - "_version": 1,
>>>             - "_shards": {
>>>                - "total": 3,
>>>                - "successful": 3,
>>>                - "failed": 0
>>>             },
>>>             - "status": 201
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.03.28",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qo",
>>>             - "_version": 1,
>>>             - "_shards": {
>>>                - "total": 3,
>>>                - "successful": 3,
>>>                - "failed": 0
>>>             },
>>>             - "status": 201
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.04.26",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qp",
>>>             - "_version": 1,
>>>             - "_shards": {
>>>                - "total": 3,
>>>                - "successful": 3,
>>>                - "failed": 0
>>>             },
>>>             - "status": 201
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.04.26",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qq",
>>>             - "_version": 1,
>>>             - "_shards": {
>>>                - "total": 3,
>>>                - "successful": 3,
>>>                - "failed": 0
>>>             },
>>>             - "status": 201
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.04.26",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qr",
>>>             - "_version": 1,
>>>             - "_shards": {
>>>                - "total": 3,
>>>                - "successful": 3,
>>>                - "failed": 0
>>>             },
>>>             - "status": 201
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.04.26",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qs",
>>>             - "_version": 1,
>>>             - "_shards": {
>>>                - "total": 3,
>>>                - "successful": 3,
>>>                - "failed": 0
>>>             },
>>>             - "status": 201
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.04.26",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qt",
>>>             - "_version": 1,
>>>             - "_shards": {
>>>                - "total": 3,
>>>                - "successful": 3,
>>>                - "failed": 0
>>>             },
>>>             - "status": 201
>>>          }
>>>       },
>>>       - {
>>>          - "create": {
>>>             - "_index": "vdps-log-wf-2016.04.26",
>>>             - "_type": "WAF",
>>>             - "_id": "AVRTguzBSCHabxnyv4qu",
>>>             - "_version": 1,
>>>             - "_shards": {
>>>                - "total": 3,
>>>                - "successful": 3,
>>>                - "failed": 0
>>>             },
>>>             - "status": 201
>>>          }
>>>       }
>>>    ]
>>>
>>> }
>>>
>>>
>>> On Wed, Apr 27, 2016 at 3:41 AM, Swann Croiset <swan...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> IIRC, according to the code you're right, heka doesn't handle such case.
>>>>
>>>> That said, I've never seen a such ES response ... I'm interested about
>>>> it.
>>>>
>>>> Could you share these informations: the ES response, the ES version and
>>>> ES logs (when the thing happens)
>>>> also, what is your configuration on ES side? index template, field
>>>> mapping ?
>>>>
>>>> --
>>>> Swann
>>>>
>>>>
>>>>
>>>> 2016-04-26 22:28 GMT+02:00 Ramin Ali Dousti <dou...@gmail.com>:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have an ES output that bulk uploads to a cluster. The HTTP status
>>>>> code is 200 OK but the reply payload says that it failed the upload for a
>>>>> few of the items. But heka doesn't seem to care about the failed items. I
>>>>> looked at the code and it says:
>>>>>
>>>>>
>>>>> https://github.com/mozilla-services/heka/blob/dev/plugins/elasticsearch/elasticsearch.go#L429
>>>>>
>>>>> if response != nil {
>>>>>
>>>>>         defer response.Body.Close()
>>>>>
>>>>>         if response_body, err = ioutil.ReadAll(response.Body); err !=
>>>>> nil {
>>>>>
>>>>>                 return fmt.Errorf("Can't read HTTP response body.
>>>>> Status: %s. Error: %s",
>>>>>
>>>>>                         response.Status, err.Error()), true
>>>>>
>>>>>         }
>>>>>
>>>>>         err = json.Unmarshal(response_body, &response_body_json)
>>>>>
>>>>>         if err != nil {
>>>>>
>>>>>                 return fmt.Errorf("HTTP response didn't contain valid
>>>>> JSON. Status: %s. Body: %s",
>>>>>
>>>>>                         response.Status, string(response_body)), true
>>>>>
>>>>>         }
>>>>>
>>>>>         json_errors, ok := response_body_json["errors"].(bool)
>>>>>
>>>>>         if ok && json_errors && response.StatusCode != 200 {
>>>>>
>>>>>                 return fmt.Errorf(
>>>>>
>>>>>                         "ElasticSearch server reported error within
>>>>> JSON. Status: %s. Body: %s",
>>>>>
>>>>>                         response.Status, string(response_body)), false
>>>>>
>>>>>         }
>>>>>
>>>>>         if response.StatusCode > 304 {
>>>>>
>>>>>                 return fmt.Errorf("HTTP response error. Status: %s.
>>>>> Body: %s", response.Status,
>>>>>
>>>>>                         string(response_body)), false
>>>>>
>>>>>         }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> 1- In my case I see a 200 OK with "errors = true" which does not seem
>>>>> to be caught, according to the code.
>>>>> 2- I don't see any logic for recovery based on individual items. Am I
>>>>> missing anything here?
>>>>>
>>>>>
>>>>> --
>>>>> Ramin
>>>>>
>>>>> _______________________________________________
>>>>> Heka mailing list
>>>>> Heka@mozilla.org
>>>>> https://mail.mozilla.org/listinfo/heka
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Ramin
>>>
>>
>>
>
>
> --
> Ramin
>
_______________________________________________
Heka mailing list
Heka@mozilla.org
https://mail.mozilla.org/listinfo/heka

Reply via email to