Hi Averell,

This seems to be the bug that you encountered:
https://issues.apache.org/jira/browse/FLINK-11046.

Cheers,
Gordon

On Sat, Feb 9, 2019 at 3:27 PM Averell <lvhu...@gmail.com> wrote:

> Hello,
>
> I am trying to follow this Flink guide [1] to handle errors in
> ElasticSearchSink by re-adding the failed messages to the queue.
> The error scenarios that I got and going to retry are: (i) conflict in
> UpdateRequest document version and (ii) lost connection to ElasticSearch.
> These errors are expected to be non-persistent, would be solved by (i)
> changing the version / (ii) gone after some seconds
> What I expect is message got retried successfully.
> What I actually got was: Flink seemed to get stuck on that (first) retry,
> my
> flow queued up (backpressure is 1 everywhere), all processing hung.
>
> Here is my error handling code:
>
> <code>
>         private object MyElasticSearchFailureHandler extends
> ActionRequestFailureHandler {
>                 override def onFailure(actionRequest: ActionRequest,
> failure: Throwable,
> restStatusCode: Int, indexer: RequestIndexer): Unit = {
>                         if
> (ExceptionUtils.findThrowableWithMessage(failure,
> "version_conflict_engine_exception") != Optional.empty()) {
>                                 actionRequest match {
>                                         case s: UpdateRequest =>
>                                                 LOG.warn(s"Failed
> inserting record to ElasticSearch due to version
> conflict (${s.version()}). Retrying")
>
> LOG.warn(actionRequest.toString)
>
> indexer.add(s.version(s.version() + 1))
>                                         case _ =>
>                                                 LOG.error("Failed
> inserting record to ElasticSearch due to version
> conflict. However, this is not an Update-Request. Don't know why.")
>
> LOG.error(actionRequest.toString)
>                                                 throw failure
>                                 }
>                         } else if (restStatusCode == -1 &&
> failure.getMessage.contains("Connection closed")) {
>                                 LOG.warn(s"Retrying record:
> ${actionRequest.toString}")
>                                 actionRequest match {
>                                         case s: UpdateRequest =>
> indexer.add(s)
>                                         case s: IndexRequest =>
> indexer.add(s)
>                                 }
>                         } else {
>                                 LOG.error(s"ELASTICSEARCH FAILED:\n
> statusCode $restStatusCode\n
> message: ${failure.getMessage}\n${failure.getStackTrace}")
>                                 LOG.error(s"    DATA:\n
> ${actionRequest.toString}")
>                                 throw failure
>                         }
>                 }
>         }
> </code>
>
> Here is the extract from my task-manager logs:
>
> /2019-02-09 04:12:35.676 [I/O dispatcher 25] ERROR
> o.a.f.s.connectors.elasticsearch.ElasticsearchSinkBase  - Failed
> Elasticsearch bulk request: Connection closed
> 2019-02-09 04:12:35.678 [I/O dispatcher 25] WARN
> c.n.c......sink.MyElasticSearchSink$  - Retrying record: update
> {[idx-20190208][_doc][doc_id_1549622700000], doc_as_upsert[true], doc[index
> {*[null][null][null]*, source[{...}]}], scripted_upsert[false],
> detect_noop[true]}
> 2019-02-09 04:12:54.242 [Sink: S3 - Historical (1/4)] INFO
> o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
> checkpointing for checkpoint with id=24 (max part counter=26)./
>
> And job-manager logs:
> /2019-02-09 03:59:37.880 [flink-akka.actor.default-dispatcher-4] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
> checkpoint 23 for job 1a1438ca23387c4ef9a59ff9da6dafa1 (430392865 bytes in
> 307078 ms).
> 2019-02-09 04:09:30.970 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 24 @ 1549685370776 for job 1a1438ca23387c4ef9a59ff9da6dafa1.
> 2019-02-09 04:17:00.970 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 24
> of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing.
> 2019-02-09 04:24:31.035 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 25 @ 1549686270776 for job 1a1438ca23387c4ef9a59ff9da6dafa1.
> 2019-02-09 04:32:01.035 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 25
> of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing.
> 2019-02-09 04:39:30.961 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 26 @ 1549687170776 for job 1a1438ca23387c4ef9a59ff9da6dafa1./
>
> Thanks and best regards,
> Averell
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests
> <
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply via email to