This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git


The following commit(s) were added to refs/heads/main by this push:
     new d853e3d  [FLINK-30998] Apply adding failureHandler on top of current 
apache:main branch
d853e3d is described below

commit d853e3d6be3e0f15e25c1220800b7d5fcf152c43
Author: Leonid Ilyevsky <leonidilyev...@yahoo.com>
AuthorDate: Thu Jul 6 18:54:58 2023 -0400

    [FLINK-30998] Apply adding failureHandler on top of current apache:main 
branch
---
 .../connector/opensearch/sink/FailureHandler.java  | 30 ++++++++++
 .../connector/opensearch/sink/OpensearchSink.java  |  8 ++-
 .../opensearch/sink/OpensearchSinkBuilder.java     | 18 +++++-
 .../opensearch/sink/OpensearchWriter.java          | 13 ++++-
 .../opensearch/sink/OpensearchWriterITCase.java    | 68 +++++++++++++++++++++-
 5 files changed, 129 insertions(+), 8 deletions(-)

diff --git 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/FailureHandler.java
 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/FailureHandler.java
new file mode 100644
index 0000000..3c94514
--- /dev/null
+++ 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/FailureHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/** Handler to process failures. */
+@PublicEvolving
+@FunctionalInterface
+public interface FailureHandler extends Serializable {
+    void onFailure(Throwable failure);
+}
diff --git 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java
 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java
index ff0b00a..c02b4fe 100644
--- 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java
+++ 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java
@@ -60,6 +60,7 @@ public class OpensearchSink<IN> implements Sink<IN> {
     private final NetworkClientConfig networkClientConfig;
     private final DeliveryGuarantee deliveryGuarantee;
     private final RestClientFactory restClientFactory;
+    private final FailureHandler failureHandler;
 
     OpensearchSink(
             List<HttpHost> hosts,
@@ -67,7 +68,8 @@ public class OpensearchSink<IN> implements Sink<IN> {
             DeliveryGuarantee deliveryGuarantee,
             BulkProcessorConfig buildBulkProcessorConfig,
             NetworkClientConfig networkClientConfig,
-            RestClientFactory restClientFactory) {
+            RestClientFactory restClientFactory,
+            FailureHandler failureHandler) {
         this.hosts = checkNotNull(hosts);
         checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
         this.emitter = checkNotNull(emitter);
@@ -75,6 +77,7 @@ public class OpensearchSink<IN> implements Sink<IN> {
         this.buildBulkProcessorConfig = checkNotNull(buildBulkProcessorConfig);
         this.networkClientConfig = checkNotNull(networkClientConfig);
         this.restClientFactory = checkNotNull(restClientFactory);
+        this.failureHandler = checkNotNull(failureHandler);
     }
 
     @Override
@@ -87,7 +90,8 @@ public class OpensearchSink<IN> implements Sink<IN> {
                 networkClientConfig,
                 context.metricGroup(),
                 context.getMailboxExecutor(),
-                restClientFactory);
+                restClientFactory,
+                failureHandler);
     }
 
     @VisibleForTesting
diff --git 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java
 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java
index b984120..736c607 100644
--- 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java
+++ 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java
@@ -27,6 +27,7 @@ import org.apache.http.HttpHost;
 import java.util.Arrays;
 import java.util.List;
 
+import static 
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DEFAULT_FAILURE_HANDLER;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -73,6 +74,7 @@ public class OpensearchSinkBuilder<IN> {
     private Integer socketTimeout;
     private Boolean allowInsecure;
     private RestClientFactory restClientFactory;
+    private FailureHandler failureHandler = DEFAULT_FAILURE_HANDLER;
 
     public OpensearchSinkBuilder() {
         restClientFactory = new DefaultRestClientFactory();
@@ -300,6 +302,19 @@ public class OpensearchSinkBuilder<IN> {
         return self();
     }
 
+    /**
+     * Allows to set custom failure handler. If not set, then the 
DEFAULT_FAILURE_HANDLER will be
+     * used which throws a runtime exception upon receiving a failure.
+     *
+     * @param failureHandler the custom handler
+     * @return this builder
+     */
+    public OpensearchSinkBuilder<IN> setFailureHandler(FailureHandler 
failureHandler) {
+        checkNotNull(failureHandler);
+        this.failureHandler = failureHandler;
+        return self();
+    }
+
     /**
      * Constructs the {@link OpensearchSink} with the properties configured 
this builder.
      *
@@ -318,7 +333,8 @@ public class OpensearchSinkBuilder<IN> {
                 deliveryGuarantee,
                 bulkProcessorConfig,
                 networkClientConfig,
-                restClientFactory);
+                restClientFactory,
+                failureHandler);
     }
 
     private NetworkClientConfig buildNetworkClientConfig() {
diff --git 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java
 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java
index 1cf059b..68da301 100644
--- 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java
+++ 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java
@@ -58,6 +58,11 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(OpensearchWriter.class);
 
+    public static final FailureHandler DEFAULT_FAILURE_HANDLER =
+            ex -> {
+                throw new FlinkRuntimeException(ex);
+            };
+
     private final OpensearchEmitter<? super IN> emitter;
     private final MailboxExecutor mailboxExecutor;
     private final boolean flushOnCheckpoint;
@@ -65,6 +70,7 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
     private final RestHighLevelClient client;
     private final RequestIndexer requestIndexer;
     private final Counter numBytesOutCounter;
+    private final FailureHandler failureHandler;
 
     private long pendingActions = 0;
     private boolean checkpointInProgress = false;
@@ -81,7 +87,6 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
      *     checkpoint
      * @param bulkProcessorConfig describing the flushing and failure handling 
of the used {@link
      *     BulkProcessor}
-     * @param bulkProcessorBuilderFactory configuring the {@link 
BulkProcessor}'s builder
      * @param networkClientConfig describing properties of the network 
connection used to connect to
      *     the Opensearch cluster
      * @param metricGroup for the sink writer
@@ -96,7 +101,8 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
             NetworkClientConfig networkClientConfig,
             SinkWriterMetricGroup metricGroup,
             MailboxExecutor mailboxExecutor,
-            RestClientFactory restClientFactory) {
+            RestClientFactory restClientFactory,
+            FailureHandler failureHandler) {
         this.emitter = checkNotNull(emitter);
         this.flushOnCheckpoint = flushOnCheckpoint;
         this.mailboxExecutor = checkNotNull(mailboxExecutor);
@@ -117,6 +123,7 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
         } catch (Exception e) {
             throw new FlinkRuntimeException("Failed to open the 
OpensearchEmitter", e);
         }
+        this.failureHandler = failureHandler;
     }
 
     @Override
@@ -278,7 +285,7 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
         if (chainedFailures == null) {
             return;
         }
-        throw new FlinkRuntimeException(chainedFailures);
+        failureHandler.onFailure(chainedFailures);
     }
 
     private static Throwable wrapException(
diff --git 
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
 
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
index deacd6c..afdf26b 100644
--- 
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
+++ 
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
@@ -56,6 +56,7 @@ import java.util.Map;
 import java.util.Optional;
 
 import static 
org.apache.flink.connector.opensearch.sink.OpensearchTestClient.buildMessage;
+import static 
org.apache.flink.connector.opensearch.sink.OpensearchWriter.DEFAULT_FAILURE_HANDLER;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link OpensearchWriter}. */
@@ -238,13 +239,61 @@ class OpensearchWriterITCase {
         }
     }
 
+    private static class TestHandler implements FailureHandler {
+        private boolean failed = false;
+
+        private synchronized void setFailed() {
+            failed = true;
+        }
+
+        public boolean isFailed() {
+            return failed;
+        }
+
+        @Override
+        public void onFailure(Throwable failure) {
+            setFailed();
+        }
+    }
+
+    @Test
+    void testWriteErrorOnUpdate() throws Exception {
+        final String index = "test-bulk-flush-with-error";
+        final int flushAfterNActions = 1;
+        final BulkProcessorConfig bulkProcessorConfig =
+                new BulkProcessorConfig(flushAfterNActions, -1, -1, 
FlushBackoffType.NONE, 0, 0);
+
+        final TestHandler testHandler = new TestHandler();
+        try (final OpensearchWriter<Tuple2<Integer, String>> writer =
+                createWriter(index, true, bulkProcessorConfig, testHandler)) {
+            // Trigger an error by updating non-existing document
+            writer.write(Tuple2.of(1, "u" + buildMessage(1)), null);
+            context.assertThatIdsAreNotWritten(index, 1);
+            assertThat(testHandler.isFailed()).isEqualTo(true);
+        }
+    }
+
     private OpensearchWriter<Tuple2<Integer, String>> createWriter(
             String index, boolean flushOnCheckpoint, BulkProcessorConfig 
bulkProcessorConfig) {
         return createWriter(
                 index,
                 flushOnCheckpoint,
                 bulkProcessorConfig,
-                
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()));
+                
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
+                DEFAULT_FAILURE_HANDLER);
+    }
+
+    private OpensearchWriter<Tuple2<Integer, String>> createWriter(
+            String index,
+            boolean flushOnCheckpoint,
+            BulkProcessorConfig bulkProcessorConfig,
+            FailureHandler failureHandler) {
+        return createWriter(
+                index,
+                flushOnCheckpoint,
+                bulkProcessorConfig,
+                
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
+                failureHandler);
     }
 
     private OpensearchWriter<Tuple2<Integer, String>> createWriter(
@@ -252,6 +301,20 @@ class OpensearchWriterITCase {
             boolean flushOnCheckpoint,
             BulkProcessorConfig bulkProcessorConfig,
             SinkWriterMetricGroup metricGroup) {
+        return createWriter(
+                index,
+                flushOnCheckpoint,
+                bulkProcessorConfig,
+                metricGroup,
+                DEFAULT_FAILURE_HANDLER);
+    }
+
+    private OpensearchWriter<Tuple2<Integer, String>> createWriter(
+            String index,
+            boolean flushOnCheckpoint,
+            BulkProcessorConfig bulkProcessorConfig,
+            SinkWriterMetricGroup metricGroup,
+            FailureHandler failureHandler) {
         return new OpensearchWriter<Tuple2<Integer, String>>(
                 
Collections.singletonList(HttpHost.create(OS_CONTAINER.getHttpHostAddress())),
                 new UpdatingEmitter(index, context.getDataFieldName()),
@@ -267,7 +330,8 @@ class OpensearchWriterITCase {
                         true),
                 metricGroup,
                 new TestMailbox(),
-                new DefaultRestClientFactory());
+                new DefaultRestClientFactory(),
+                failureHandler);
     }
 
     private static class UpdatingEmitter implements 
OpensearchEmitter<Tuple2<Integer, String>> {

Reply via email to