Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3358#discussion_r102177099
  
    --- Diff: docs/dev/connectors/elasticsearch.md ---
    @@ -272,9 +308,115 @@ input.addSink(new ElasticsearchSink(config, new 
ElasticsearchSinkFunction[String
     The difference is that now we do not need to provide a list of addresses
     of Elasticsearch nodes.
     
    +### Handling Failing Elasticsearch Requests
    +
    +Elasticsearch action requests may fail due to a variety of reasons, 
including
    +temporarily saturated node queue capacity or malformed documents to be 
indexed.
    +The Flink Elasticsearch Sink allows the user to specify how request
    +failures are handled, by simply implementing an 
`ActionRequestFailureHandler` and
    +providing it to the constructor.
    +
    +Below is an example:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<String> input = ...;
    +
    +input.addSink(new ElasticsearchSink<>(
    +    config, transportAddresses,
    +    new ElasticsearchSinkFunction<String>() {...},
    +    new ActionRequestFailureHandler() {
    +        @Override
    +        boolean onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
    +            // this example uses Apache Commons to search for nested 
exceptions
    +            
    +            if (ExceptionUtils.indexOfThrowable(failure, 
EsRejectedExecutionException.class) >= 0) {
    +                // full queue; re-add document for indexing
    +                indexer.add(action);
    +                return false;
    +            } else if (ExceptionUtils.indexOfThrowable(failure, 
ElasticsearchParseException.class) >= 0) {
    +                // malformed document; simply drop request without failing 
sink
    +                return false;
    +            } else {
    +                // for all other failures, fail the sink
    +                return true;
    +            }
    +        }
    +}));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val input: DataStream[String] = ...
    +
    +input.addSink(new ElasticsearchSink(
    +    config, transportAddresses,
    +    new ElasticsearchSinkFunction[String] {...},
    +    new ActionRequestFailureHandler {
    +        override def onFailure(ActionRequest action, Throwable failure, 
RequestIndexer indexer) {
    +            // this example uses Apache Commons to search for nested 
exceptions
    +
    +            if (ExceptionUtils.indexOfThrowable(failure, 
EsRejectedExecutionException.class) >= 0) {
    +                // full queue; re-add document for indexing
    +                indexer.add(action)
    +                return false
    +            } else if (ExceptionUtils.indexOfThrowable(failure, 
ElasticsearchParseException.class) {
    +                // malformed document; simply drop request without failing 
sink
    +                return false
    +            } else {
    +                // for all other failures, fail the sink
    +                return true
    +            }
    +        }
    +}))
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +The above example will let the sink re-add requests that failed due to
    +queue capacity saturation and drop requests with malformed documents, 
without
    +failing the sink. For all other failures, the sink will fail. If a 
`ActionRequestFailureHandler`
    +is not provided to the constructor, the sink will fail for any kind of 
error.
    +
    +Note that `onFailure` is called for failures that still occur only after 
the
    +`BulkProcessor` internally finishes all backoff retry attempts.
    +By default, the `BulkProcessor` retries to a maximum of 8 attempts with
    +an exponential backoff. For more information on the behaviour of the
    +internal `BulkProcessor` and how to configure it, please see the following 
section.
    +
    +<p style="border-radius: 5px; padding: 5px" class="bg-danger">
    +<b>IMPORTANT</b>: Re-adding requests back to the internal 
<b>BulkProcessor</b>
    +on failures will lead to longer checkpoints, as the sink will also
    +need to wait for the re-added requests to be flushed when checkpointing.
    +This also means that if re-added requests never succeed, the checkpoint 
will
    +never finish.
    +</p>
    +
    + 
    +### Configuring the Internal Bulk Processor
    +
    +The internal `BulkProcessor` can be further configured for its behaviour
    +on how buffered action requests are flushed, by setting the following 
values in
    +the provided `Map<String, String>`:
    +
    + * **bulk.flush.max.actions**: Maximum amount of actions to buffer before 
flushing.
    + * **bulk.flush.max.size.mb**: Maximum size of data (in megabytes) to 
buffer before flushing.
    + * **bulk.flush.interval.ms**: Interval at which to flush regardless of 
the amount or size of buffered actions.
    + 
    +For versions 2.x and above, configuring how temporary request errors are
    +retried is also supported:
    + 
    + * **bulk.flush.backoff.enable**: Whether or not to perform retries with 
backoff delay for a flush
    + if one or more of its actions failed due to a temporary 
`EsRejectedExecutionException`.
    + * **bulk.flush.backoff.type**: The type of backoff delay, either 
`CONSTANT` or `EXPONENTIAL`
    + * **bulk.flush.backoff.delay**: The amount of delay for backoff. For 
constant backoff, this
    + is simply the delay between each retry. For exponential backoff, this is 
the initial base delay.
    + * **bulk.flush.backoff.retries**: The amount of backoff retries to 
attempt.
    +
     More information about Elasticsearch can be found 
[here](https://elastic.co).
     
    -#### Packaging the Elasticsearch Connector into an Uber-Jar
    +## Packaging the Elasticsearch Connector into an Uber-Jar
    --- End diff --
    
    I like the reworked documentation page a lot!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to