Hello, I am trying to follow this Flink guide [1] to handle errors in ElasticSearchSink by re-adding the failed messages to the queue. The error scenarios that I got and going to retry are: (i) conflict in UpdateRequest document version and (ii) lost connection to ElasticSearch. These errors are expected to be non-persistent, would be solved by (i) changing the version / (ii) gone after some seconds What I expect is message got retried successfully. What I actually got was: Flink seemed to get stuck on that (first) retry, my flow queued up (backpressure is 1 everywhere), all processing hung.
Here is my error handling code: <code> private object MyElasticSearchFailureHandler extends ActionRequestFailureHandler { override def onFailure(actionRequest: ActionRequest, failure: Throwable, restStatusCode: Int, indexer: RequestIndexer): Unit = { if (ExceptionUtils.findThrowableWithMessage(failure, "version_conflict_engine_exception") != Optional.empty()) { actionRequest match { case s: UpdateRequest => LOG.warn(s"Failed inserting record to ElasticSearch due to version conflict (${s.version()}). Retrying") LOG.warn(actionRequest.toString) indexer.add(s.version(s.version() + 1)) case _ => LOG.error("Failed inserting record to ElasticSearch due to version conflict. However, this is not an Update-Request. Don't know why.") LOG.error(actionRequest.toString) throw failure } } else if (restStatusCode == -1 && failure.getMessage.contains("Connection closed")) { LOG.warn(s"Retrying record: ${actionRequest.toString}") actionRequest match { case s: UpdateRequest => indexer.add(s) case s: IndexRequest => indexer.add(s) } } else { LOG.error(s"ELASTICSEARCH FAILED:\n statusCode $restStatusCode\n message: ${failure.getMessage}\n${failure.getStackTrace}") LOG.error(s" DATA:\n ${actionRequest.toString}") throw failure } } } </code> Here is the extract from my task-manager logs: /2019-02-09 04:12:35.676 [I/O dispatcher 25] ERROR o.a.f.s.connectors.elasticsearch.ElasticsearchSinkBase - Failed Elasticsearch bulk request: Connection closed 2019-02-09 04:12:35.678 [I/O dispatcher 25] WARN c.n.c......sink.MyElasticSearchSink$ - Retrying record: update {[idx-20190208][_doc][doc_id_1549622700000], doc_as_upsert[true], doc[index {*[null][null][null]*, source[{...}]}], scripted_upsert[false], detect_noop[true]} 2019-02-09 04:12:54.242 [Sink: S3 - Historical (1/4)] INFO o.a.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 checkpointing for checkpoint with id=24 (max part counter=26)./ And job-manager logs: /2019-02-09 03:59:37.880 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 23 for job 1a1438ca23387c4ef9a59ff9da6dafa1 (430392865 bytes in 307078 ms). 2019-02-09 04:09:30.970 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 24 @ 1549685370776 for job 1a1438ca23387c4ef9a59ff9da6dafa1. 2019-02-09 04:17:00.970 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 24 of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing. 2019-02-09 04:24:31.035 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 25 @ 1549686270776 for job 1a1438ca23387c4ef9a59ff9da6dafa1. 2019-02-09 04:32:01.035 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 25 of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing. 2019-02-09 04:39:30.961 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 26 @ 1549687170776 for job 1a1438ca23387c4ef9a59ff9da6dafa1./ Thanks and best regards, Averell [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/