This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git


The following commit(s) were added to refs/heads/main by this push:
     new 40774fa  [FLINK-32028][connectors/elasticsearch] Allow customising 
bulk failure handling
40774fa is described below

commit 40774fad0f4ecd1a0d104dcb339e6bb860b0a4bf
Author: Peter Fischer <pfisc...@wikimedia.org>
AuthorDate: Fri Dec 1 12:29:32 2023 +0100

    [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure 
handling
    
    Extracted `BulkResponseInspector` interface to allow custom handling of 
(partially) failed bulk requests. If not overridden, default behaviour remains 
unchanged and partial failures are escalated.
    
    * fixes https://issues.apache.org/jira/browse/FLINK-32028
    * allows custom metrics to be exposed
---
 .../elasticsearch/sink/BulkResponseInspector.java  |  60 +++++++++
 .../elasticsearch/sink/ElasticsearchSink.java      |  12 +-
 .../sink/ElasticsearchSinkBuilderBase.java         |  63 +++++++++-
 .../elasticsearch/sink/ElasticsearchWriter.java    | 112 ++++++++++++-----
 .../elasticsearch/sink/FailureHandler.java         |  36 ++++++
 .../sink/DefaultBulkResponseInspectorTest.java     | 127 +++++++++++++++++++
 .../sink/ElasticsearchSinkBuilderBaseTest.java     | 136 +++++++++++++++++++++
 .../sink/ElasticsearchWriterITCase.java            |   2 +
 8 files changed, 513 insertions(+), 35 deletions(-)

diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/BulkResponseInspector.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/BulkResponseInspector.java
new file mode 100644
index 0000000..9f4ce10
--- /dev/null
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/BulkResponseInspector.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.function.SerializableFunction;
+
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+
+/** Callback for inspecting a {@link BulkResponse}. */
+@PublicEvolving
+@FunctionalInterface
+public interface BulkResponseInspector {
+
+    /**
+     * Callback to inspect a {@code response} in the context of its {@code 
request}. It may throw a
+     * {@link org.apache.flink.util.FlinkRuntimeException} to indicate that 
the bulk failed
+     * (partially).
+     */
+    void inspect(BulkRequest request, BulkResponse response);
+
+    /**
+     * Factory interface for creating a {@link BulkResponseInspector} in the 
context of a sink.
+     * Allows obtaining a {@link org.apache.flink.metrics.MetricGroup} to 
capture custom metrics.
+     */
+    @PublicEvolving
+    @FunctionalInterface
+    interface BulkResponseInspectorFactory
+            extends SerializableFunction<
+                    BulkResponseInspectorFactory.InitContext, 
BulkResponseInspector> {
+
+        /**
+         * The interface exposes a subset of {@link
+         * org.apache.flink.api.connector.sink2.Sink.InitContext}.
+         */
+        interface InitContext {
+
+            /** Returns: The metric group of the surrounding writer. */
+            MetricGroup metricGroup();
+        }
+    }
+}
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
index efe6dc2..05ac47a 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.elasticsearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
 
 import org.apache.http.HttpHost;
 
@@ -58,6 +59,7 @@ public class ElasticsearchSink<IN> implements Sink<IN> {
     private final ElasticsearchEmitter<? super IN> emitter;
     private final BulkProcessorConfig buildBulkProcessorConfig;
     private final BulkProcessorBuilderFactory bulkProcessorBuilderFactory;
+    private final BulkResponseInspectorFactory bulkResponseInspectorFactory;
     private final NetworkClientConfig networkClientConfig;
     private final DeliveryGuarantee deliveryGuarantee;
 
@@ -67,9 +69,11 @@ public class ElasticsearchSink<IN> implements Sink<IN> {
             DeliveryGuarantee deliveryGuarantee,
             BulkProcessorBuilderFactory bulkProcessorBuilderFactory,
             BulkProcessorConfig buildBulkProcessorConfig,
-            NetworkClientConfig networkClientConfig) {
+            NetworkClientConfig networkClientConfig,
+            BulkResponseInspectorFactory bulkResponseInspectorFactory) {
         this.hosts = checkNotNull(hosts);
         this.bulkProcessorBuilderFactory = 
checkNotNull(bulkProcessorBuilderFactory);
+        this.bulkResponseInspectorFactory = 
checkNotNull(bulkResponseInspectorFactory);
         checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
         this.emitter = checkNotNull(emitter);
         this.deliveryGuarantee = checkNotNull(deliveryGuarantee);
@@ -85,6 +89,7 @@ public class ElasticsearchSink<IN> implements Sink<IN> {
                 deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE,
                 buildBulkProcessorConfig,
                 bulkProcessorBuilderFactory,
+                bulkResponseInspectorFactory.apply(context::metricGroup),
                 networkClientConfig,
                 context.metricGroup(),
                 context.getMailboxExecutor());
@@ -94,4 +99,9 @@ public class ElasticsearchSink<IN> implements Sink<IN> {
     DeliveryGuarantee getDeliveryGuarantee() {
         return deliveryGuarantee;
     }
+
+    @VisibleForTesting
+    BulkResponseInspectorFactory getBulkResponseInspectorFactory() {
+        return bulkResponseInspectorFactory;
+    }
 }
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
index 3d51356..2904eff 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
@@ -22,6 +22,9 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.elasticsearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
+import 
org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector;
+import 
org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultFailureHandler;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.apache.http.HttpHost;
@@ -57,6 +60,8 @@ public abstract class ElasticsearchSinkBuilderBase<
     private Integer connectionTimeout;
     private Integer connectionRequestTimeout;
     private Integer socketTimeout;
+    private FailureHandler failureHandler = new DefaultFailureHandler();
+    private BulkResponseInspectorFactory bulkResponseInspectorFactory;
 
     protected ElasticsearchSinkBuilderBase() {}
 
@@ -258,8 +263,41 @@ public abstract class ElasticsearchSinkBuilderBase<
         return self();
     }
 
+    /**
+     * Overrides the default {@link FailureHandler}. A custom failure handler 
can handle partial
+     * failures gracefully. See {@link #bulkResponseInspectorFactory} for more 
extensive control.
+     *
+     * @param failureHandler the handler
+     * @see #bulkResponseInspectorFactory
+     * @return this builder
+     */
+    public B setFailureHandler(FailureHandler failureHandler) {
+        this.failureHandler = checkNotNull(failureHandler);
+        return self();
+    }
+
+    /**
+     * Overrides the default {@link BulkResponseInspectorFactory}. A custom 
{@link
+     * BulkResponseInspector}, for example, can change the failure handling 
and capture additional
+     * metrics. See {@link #failureHandler} for a simpler way of handling 
failures.
+     *
+     * @param bulkResponseInspectorFactory the factory
+     * @return this builder
+     */
+    public B setBulkResponseInspectorFactory(
+            BulkResponseInspectorFactory bulkResponseInspectorFactory) {
+        this.bulkResponseInspectorFactory = 
checkNotNull(bulkResponseInspectorFactory);
+        return self();
+    }
+
     protected abstract BulkProcessorBuilderFactory 
getBulkProcessorBuilderFactory();
 
+    protected BulkResponseInspectorFactory getBulkResponseInspectorFactory() {
+        return this.bulkResponseInspectorFactory == null
+                ? new DefaultBulkResponseInspectorFactory(failureHandler)
+                : this.bulkResponseInspectorFactory;
+    }
+
     /**
      * Constructs the {@link ElasticsearchSink} with the properties configured 
this builder.
      *
@@ -276,13 +314,17 @@ public abstract class ElasticsearchSinkBuilderBase<
         ClosureCleaner.clean(
                 bulkProcessorBuilderFactory, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
 
+        final BulkResponseInspectorFactory bulkResponseInspectorFactory =
+                getBulkResponseInspectorFactory();
+
         return new ElasticsearchSink<>(
                 hosts,
                 emitter,
                 deliveryGuarantee,
                 bulkProcessorBuilderFactory,
                 bulkProcessorConfig,
-                networkClientConfig);
+                networkClientConfig,
+                bulkResponseInspectorFactory);
     }
 
     private NetworkClientConfig buildNetworkClientConfig() {
@@ -339,4 +381,23 @@ public abstract class ElasticsearchSinkBuilderBase<
                 + '\''
                 + '}';
     }
+
+    /**
+     * Default factory for {@link FailureHandler}-bound {@link 
BulkResponseInspector
+     * BulkResponseInspectors}. A Static class is used instead of 
anonymous/lambda to avoid
+     * non-serializable references to {@link ElasticsearchSinkBuilderBase}.
+     */
+    static class DefaultBulkResponseInspectorFactory implements 
BulkResponseInspectorFactory {
+
+        private final FailureHandler failureHandler;
+
+        DefaultBulkResponseInspectorFactory(FailureHandler failureHandler) {
+            this.failureHandler = failureHandler;
+        }
+
+        @Override
+        public BulkResponseInspector apply(InitContext context) {
+            return new DefaultBulkResponseInspector(failureHandler);
+        }
+    }
 }
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
index fa8ed67..b3d8ed5 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
@@ -91,6 +91,7 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN> {
             boolean flushOnCheckpoint,
             BulkProcessorConfig bulkProcessorConfig,
             BulkProcessorBuilderFactory bulkProcessorBuilderFactory,
+            BulkResponseInspector bulkResponseInspector,
             NetworkClientConfig networkClientConfig,
             SinkWriterMetricGroup metricGroup,
             MailboxExecutor mailboxExecutor) {
@@ -102,9 +103,13 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN> {
                         configureRestClientBuilder(
                                 RestClient.builder(hosts.toArray(new 
HttpHost[0])),
                                 networkClientConfig));
-        this.bulkProcessor = createBulkProcessor(bulkProcessorBuilderFactory, 
bulkProcessorConfig);
-        this.requestIndexer = new 
DefaultRequestIndexer(metricGroup.getNumRecordsSendCounter());
+        this.bulkProcessor =
+                createBulkProcessor(
+                        bulkProcessorBuilderFactory,
+                        bulkProcessorConfig,
+                        checkNotNull(bulkResponseInspector));
         checkNotNull(metricGroup);
+        this.requestIndexer = new 
DefaultRequestIndexer(metricGroup.getNumRecordsSendCounter());
         metricGroup.setCurrentSendTimeGauge(() -> ackTime - lastSendTime);
         this.numBytesOutCounter = 
metricGroup.getIOMetricGroup().getNumBytesOutCounter();
         try {
@@ -192,10 +197,12 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN> {
 
     private BulkProcessor createBulkProcessor(
             BulkProcessorBuilderFactory bulkProcessorBuilderFactory,
-            BulkProcessorConfig bulkProcessorConfig) {
+            BulkProcessorConfig bulkProcessorConfig,
+            BulkResponseInspector bulkResponseInspector) {
 
         BulkProcessor.Builder builder =
-                bulkProcessorBuilderFactory.apply(client, bulkProcessorConfig, 
new BulkListener());
+                bulkProcessorBuilderFactory.apply(
+                        client, bulkProcessorConfig, new 
BulkListener(bulkResponseInspector));
 
         // This makes flush() blocking
         builder.setConcurrentRequests(0);
@@ -205,6 +212,12 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN> {
 
     private class BulkListener implements BulkProcessor.Listener {
 
+        private final BulkResponseInspector bulkResponseInspector;
+
+        public BulkListener(BulkResponseInspector bulkResponseInspector) {
+            this.bulkResponseInspector = bulkResponseInspector;
+        }
+
         @Override
         public void beforeBulk(long executionId, BulkRequest request) {
             LOG.info("Sending bulk of {} actions to Elasticsearch.", 
request.numberOfActions());
@@ -227,6 +240,11 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN> {
                     },
                     "elasticsearchErrorCallback");
         }
+
+        private void extractFailures(BulkRequest request, BulkResponse 
response) {
+            bulkResponseInspector.inspect(request, response);
+            pendingActions -= request.numberOfActions();
+        }
     }
 
     private void enqueueActionInMailbox(
@@ -241,35 +259,6 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN> {
         mailboxExecutor.execute(action, actionName);
     }
 
-    private void extractFailures(BulkRequest request, BulkResponse response) {
-        if (!response.hasFailures()) {
-            pendingActions -= request.numberOfActions();
-            return;
-        }
-
-        Throwable chainedFailures = null;
-        for (int i = 0; i < response.getItems().length; i++) {
-            final BulkItemResponse itemResponse = response.getItems()[i];
-            if (!itemResponse.isFailed()) {
-                continue;
-            }
-            final Throwable failure = itemResponse.getFailure().getCause();
-            if (failure == null) {
-                continue;
-            }
-            final RestStatus restStatus = 
itemResponse.getFailure().getStatus();
-            final DocWriteRequest<?> actionRequest = request.requests().get(i);
-
-            chainedFailures =
-                    firstOrSuppressed(
-                            wrapException(restStatus, failure, actionRequest), 
chainedFailures);
-        }
-        if (chainedFailures == null) {
-            return;
-        }
-        throw new FlinkRuntimeException(chainedFailures);
-    }
-
     private static Throwable wrapException(
             RestStatus restStatus, Throwable rootFailure, DocWriteRequest<?> 
actionRequest) {
         if (restStatus == null) {
@@ -327,4 +316,61 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN> {
             }
         }
     }
+
+    /**
+     * A strict implementation that fails if either the whole bulk request 
failed or any of its
+     * actions.
+     */
+    static class DefaultBulkResponseInspector implements BulkResponseInspector 
{
+
+        @VisibleForTesting final FailureHandler failureHandler;
+
+        DefaultBulkResponseInspector() {
+            this(new DefaultFailureHandler());
+        }
+
+        DefaultBulkResponseInspector(FailureHandler failureHandler) {
+            this.failureHandler = checkNotNull(failureHandler);
+        }
+
+        @Override
+        public void inspect(BulkRequest request, BulkResponse response) {
+            if (!response.hasFailures()) {
+                return;
+            }
+
+            Throwable chainedFailures = null;
+            for (int i = 0; i < response.getItems().length; i++) {
+                final BulkItemResponse itemResponse = response.getItems()[i];
+                if (!itemResponse.isFailed()) {
+                    continue;
+                }
+                final Throwable failure = itemResponse.getFailure().getCause();
+                if (failure == null) {
+                    continue;
+                }
+                final RestStatus restStatus = 
itemResponse.getFailure().getStatus();
+                final DocWriteRequest<?> actionRequest = 
request.requests().get(i);
+
+                chainedFailures =
+                        firstOrSuppressed(
+                                wrapException(restStatus, failure, 
actionRequest), chainedFailures);
+            }
+            if (chainedFailures == null) {
+                return;
+            }
+            failureHandler.onFailure(chainedFailures);
+        }
+    }
+
+    static class DefaultFailureHandler implements FailureHandler {
+
+        @Override
+        public void onFailure(Throwable failure) {
+            if (failure instanceof FlinkRuntimeException) {
+                throw (FlinkRuntimeException) failure;
+            }
+            throw new FlinkRuntimeException(failure);
+        }
+    }
 }
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/FailureHandler.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/FailureHandler.java
new file mode 100644
index 0000000..84eb8c5
--- /dev/null
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/FailureHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/** A handler for partial bulk action failures. */
+@PublicEvolving
+@FunctionalInterface
+public interface FailureHandler extends Serializable {
+
+    /**
+     * Callback that is passed the first bulk action failure.
+     *
+     * @param failure the first bulk action failure
+     */
+    void onFailure(Throwable failure);
+}
diff --git 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/DefaultBulkResponseInspectorTest.java
 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/DefaultBulkResponseInspectorTest.java
new file mode 100644
index 0000000..49ae0ad
--- /dev/null
+++ 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/DefaultBulkResponseInspectorTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.elasticsearch.sink;
+
+import 
org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.assertj.core.api.Assertions;
+import org.elasticsearch.action.DocWriteRequest.OpType;
+import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+class DefaultBulkResponseInspectorTest {
+
+    @Test
+    void testPassWithoutFailures() {
+        final DefaultBulkResponseInspector inspector = new 
DefaultBulkResponseInspector();
+        Assertions.assertThatCode(
+                        () ->
+                                inspector.inspect(
+                                        new BulkRequest(),
+                                        new BulkResponse(new 
BulkItemResponse[] {}, 0)))
+                .doesNotThrowAnyException();
+    }
+
+    @Test
+    void testPassesDespiteChainedFailure() {
+        final DefaultBulkResponseInspector inspector =
+                new DefaultBulkResponseInspector((failure) -> {});
+        Assertions.assertThatCode(
+                        () -> {
+                            final BulkRequest request = new BulkRequest();
+                            request.add(
+                                    new IndexRequest(), new DeleteRequest(), 
new DeleteRequest());
+
+                            inspector.inspect(
+                                    request,
+                                    new BulkResponse(
+                                            new BulkItemResponse[] {
+                                                new BulkItemResponse(
+                                                        0, OpType.CREATE, 
(DocWriteResponse) null),
+                                                new BulkItemResponse(
+                                                        1,
+                                                        OpType.DELETE,
+                                                        new Failure(
+                                                                "index",
+                                                                "type",
+                                                                "id",
+                                                                new 
IOException("A"))),
+                                                new BulkItemResponse(
+                                                        2,
+                                                        OpType.DELETE,
+                                                        new Failure(
+                                                                "index",
+                                                                "type",
+                                                                "id",
+                                                                new 
IOException("B")))
+                                            },
+                                            0));
+                        })
+                .doesNotThrowAnyException();
+    }
+
+    @Test
+    void testThrowsChainedFailure() {
+        final IOException failureCause0 = new IOException("A");
+        final IOException failureCause1 = new IOException("B");
+        final DefaultBulkResponseInspector inspector = new 
DefaultBulkResponseInspector();
+        Assertions.assertThatExceptionOfType(FlinkRuntimeException.class)
+                .isThrownBy(
+                        () -> {
+                            final BulkRequest request = new BulkRequest();
+                            request.add(
+                                    new IndexRequest(), new DeleteRequest(), 
new DeleteRequest());
+
+                            inspector.inspect(
+                                    request,
+                                    new BulkResponse(
+                                            new BulkItemResponse[] {
+                                                new BulkItemResponse(
+                                                        0, OpType.CREATE, 
(DocWriteResponse) null),
+                                                new BulkItemResponse(
+                                                        1,
+                                                        OpType.DELETE,
+                                                        new Failure(
+                                                                "index",
+                                                                "type",
+                                                                "id",
+                                                                
failureCause0)),
+                                                new BulkItemResponse(
+                                                        2,
+                                                        OpType.DELETE,
+                                                        new Failure(
+                                                                "index",
+                                                                "type",
+                                                                "id",
+                                                                failureCause1))
+                                            },
+                                            0));
+                        })
+                .withCause(failureCause0);
+    }
+}
diff --git 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
index 3fc2abc..d2ec22d 100644
--- 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
+++ 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
@@ -17,8 +17,26 @@
 
 package org.apache.flink.connector.elasticsearch.sink;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.elasticsearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
+import 
org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
 import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.UserCodeClassLoader;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.apache.http.HttpHost;
 import org.junit.jupiter.api.DynamicTest;
@@ -27,6 +45,8 @@ import org.junit.jupiter.api.TestFactory;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.util.OptionalLong;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -99,7 +119,123 @@ abstract class ElasticsearchSinkBuilderBaseTest<B extends 
ElasticsearchSinkBuild
                 .isInstanceOf(IllegalStateException.class);
     }
 
+    @Test
+    void testOverrideFailureHandler() {
+        final FailureHandler failureHandler = (failure) -> {};
+        final ElasticsearchSink<Object> sink =
+                
createMinimalBuilder().setFailureHandler(failureHandler).build();
+
+        final InitContext sinkInitContext = new MockInitContext();
+        final BulkResponseInspector bulkResponseInspector =
+                
sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup);
+        assertThat(bulkResponseInspector)
+                .isInstanceOf(DefaultBulkResponseInspector.class)
+                .extracting(
+                        (inspector) -> ((DefaultBulkResponseInspector) 
inspector).failureHandler)
+                .isEqualTo(failureHandler);
+    }
+
+    @Test
+    void testOverrideBulkResponseInspectorFactory() {
+        final AtomicBoolean called = new AtomicBoolean();
+        final BulkResponseInspectorFactory bulkResponseInspectorFactory =
+                initContext -> {
+                    final MetricGroup metricGroup = initContext.metricGroup();
+                    metricGroup.addGroup("bulk").addGroup("result", 
"failed").counter("actions");
+                    called.set(true);
+                    return (BulkResponseInspector) (request, response) -> {};
+                };
+        final ElasticsearchSink<Object> sink =
+                createMinimalBuilder()
+                        
.setBulkResponseInspectorFactory(bulkResponseInspectorFactory)
+                        .build();
+
+        final InitContext sinkInitContext = new MockInitContext();
+
+        assertThatCode(() -> 
sink.createWriter(sinkInitContext)).doesNotThrowAnyException();
+        assertThat(called).isTrue();
+    }
+
     abstract B createEmptyBuilder();
 
     abstract B createMinimalBuilder();
+
+    private static class DummyMailboxExecutor implements MailboxExecutor {
+        private DummyMailboxExecutor() {}
+
+        public void execute(
+                ThrowingRunnable<? extends Exception> command,
+                String descriptionFormat,
+                Object... descriptionArgs) {}
+
+        public void yield() throws InterruptedException, FlinkRuntimeException 
{}
+
+        public boolean tryYield() throws FlinkRuntimeException {
+            return false;
+        }
+    }
+
+    private static class MockInitContext
+            implements Sink.InitContext, 
SerializationSchema.InitializationContext {
+
+        public UserCodeClassLoader getUserCodeClassLoader() {
+            return SimpleUserCodeClassLoader.create(
+                    ElasticsearchSinkBuilderBaseTest.class.getClassLoader());
+        }
+
+        public MailboxExecutor getMailboxExecutor() {
+            return new ElasticsearchSinkBuilderBaseTest.DummyMailboxExecutor();
+        }
+
+        public ProcessingTimeService getProcessingTimeService() {
+            return new TestProcessingTimeService();
+        }
+
+        public int getSubtaskId() {
+            return 0;
+        }
+
+        public int getNumberOfParallelSubtasks() {
+            return 0;
+        }
+
+        public int getAttemptNumber() {
+            return 0;
+        }
+
+        public SinkWriterMetricGroup metricGroup() {
+            return InternalSinkWriterMetricGroup.wrap(
+                    new TestingSinkWriterMetricGroup.Builder()
+                            .setIoMetricGroupSupplier(
+                                    
UnregisteredMetricsGroup::createOperatorIOMetricGroup)
+                            .setParentMetricGroup(
+                                    
UnregisteredMetricsGroup.createOperatorMetricGroup())
+                            .build());
+        }
+
+        public MetricGroup getMetricGroup() {
+            return this.metricGroup();
+        }
+
+        public OptionalLong getRestoredCheckpointId() {
+            return OptionalLong.empty();
+        }
+
+        public SerializationSchema.InitializationContext
+                asSerializationSchemaInitializationContext() {
+            return this;
+        }
+
+        public boolean isObjectReuseEnabled() {
+            return false;
+        }
+
+        public <IN> TypeSerializer<IN> createInputSerializer() {
+            throw new UnsupportedOperationException();
+        }
+
+        public JobID getJobId() {
+            throw new UnsupportedOperationException();
+        }
+    }
 }
diff --git 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
index e8002cc..25a5f18 100644
--- 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
+++ 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.elasticsearch.ElasticsearchUtil;
+import 
org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector;
 import org.apache.flink.connector.elasticsearch.test.DockerImageVersions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
@@ -283,6 +284,7 @@ class ElasticsearchWriterITCase {
                 flushOnCheckpoint,
                 bulkProcessorConfig,
                 new TestBulkProcessorBuilderFactory(),
+                new DefaultBulkResponseInspector(),
                 new NetworkClientConfig(null, null, null, null, null, null),
                 metricGroup,
                 new TestMailbox());


Reply via email to