Hello,

I am trying to follow this Flink guide [1] to handle errors in
ElasticSearchSink by re-adding the failed messages to the queue.
The error scenarios that I got and going to retry are: (i) conflict in
UpdateRequest document version and (ii) lost connection to ElasticSearch.
These errors are expected to be non-persistent, would be solved by (i)
changing the version / (ii) gone after some seconds
What I expect is message got retried successfully.
What I actually got was: Flink seemed to get stuck on that (first) retry, my
flow queued up (backpressure is 1 everywhere), all processing hung.

Here is my error handling code:

<code>
        private object MyElasticSearchFailureHandler extends
ActionRequestFailureHandler {
                override def onFailure(actionRequest: ActionRequest, failure: 
Throwable,
restStatusCode: Int, indexer: RequestIndexer): Unit = {
                        if (ExceptionUtils.findThrowableWithMessage(failure,
"version_conflict_engine_exception") != Optional.empty()) {
                                actionRequest match {
                                        case s: UpdateRequest =>
                                                LOG.warn(s"Failed inserting 
record to ElasticSearch due to version
conflict (${s.version()}). Retrying")
                                                LOG.warn(actionRequest.toString)
                                                
indexer.add(s.version(s.version() + 1))
                                        case _ =>
                                                LOG.error("Failed inserting 
record to ElasticSearch due to version
conflict. However, this is not an Update-Request. Don't know why.")
                                                
LOG.error(actionRequest.toString)
                                                throw failure
                                }
                        } else if (restStatusCode == -1 &&
failure.getMessage.contains("Connection closed")) {
                                LOG.warn(s"Retrying record: 
${actionRequest.toString}")
                                actionRequest match {
                                        case s: UpdateRequest => indexer.add(s)
                                        case s: IndexRequest => indexer.add(s)
                                }
                        } else {
                                LOG.error(s"ELASTICSEARCH FAILED:\n    
statusCode $restStatusCode\n   
message: ${failure.getMessage}\n${failure.getStackTrace}")
                                LOG.error(s"    DATA:\n    
${actionRequest.toString}")
                                throw failure
                        }
                }
        }
</code>

Here is the extract from my task-manager logs:

/2019-02-09 04:12:35.676 [I/O dispatcher 25] ERROR
o.a.f.s.connectors.elasticsearch.ElasticsearchSinkBase  - Failed
Elasticsearch bulk request: Connection closed
2019-02-09 04:12:35.678 [I/O dispatcher 25] WARN 
c.n.c......sink.MyElasticSearchSink$  - Retrying record: update
{[idx-20190208][_doc][doc_id_1549622700000], doc_as_upsert[true], doc[index
{*[null][null][null]*, source[{...}]}], scripted_upsert[false],
detect_noop[true]}
2019-02-09 04:12:54.242 [Sink: S3 - Historical (1/4)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=24 (max part counter=26)./

And job-manager logs:
/2019-02-09 03:59:37.880 [flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
checkpoint 23 for job 1a1438ca23387c4ef9a59ff9da6dafa1 (430392865 bytes in
307078 ms).
2019-02-09 04:09:30.970 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 24 @ 1549685370776 for job 1a1438ca23387c4ef9a59ff9da6dafa1.
2019-02-09 04:17:00.970 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 24
of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing.
2019-02-09 04:24:31.035 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 25 @ 1549686270776 for job 1a1438ca23387c4ef9a59ff9da6dafa1.
2019-02-09 04:32:01.035 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 25
of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing.
2019-02-09 04:39:30.961 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 26 @ 1549687170776 for job 1a1438ca23387c4ef9a59ff9da6dafa1./

Thanks and best regards,
Averell

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests>
  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to