This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git
The following commit(s) were added to refs/heads/main by this push: new d853e3d [FLINK-30998] Apply adding failureHandler on top of current apache:main branch d853e3d is described below commit d853e3d6be3e0f15e25c1220800b7d5fcf152c43 Author: Leonid Ilyevsky <leonidilyev...@yahoo.com> AuthorDate: Thu Jul 6 18:54:58 2023 -0400 [FLINK-30998] Apply adding failureHandler on top of current apache:main branch --- .../connector/opensearch/sink/FailureHandler.java | 30 ++++++++++ .../connector/opensearch/sink/OpensearchSink.java | 8 ++- .../opensearch/sink/OpensearchSinkBuilder.java | 18 +++++- .../opensearch/sink/OpensearchWriter.java | 13 ++++- .../opensearch/sink/OpensearchWriterITCase.java | 68 +++++++++++++++++++++- 5 files changed, 129 insertions(+), 8 deletions(-) diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/FailureHandler.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/FailureHandler.java new file mode 100644 index 0000000..3c94514 --- /dev/null +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/FailureHandler.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; + +/** Handler to process failures. */ +@PublicEvolving +@FunctionalInterface +public interface FailureHandler extends Serializable { + void onFailure(Throwable failure); +} diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java index ff0b00a..c02b4fe 100644 --- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java @@ -60,6 +60,7 @@ public class OpensearchSink<IN> implements Sink<IN> { private final NetworkClientConfig networkClientConfig; private final DeliveryGuarantee deliveryGuarantee; private final RestClientFactory restClientFactory; + private final FailureHandler failureHandler; OpensearchSink( List<HttpHost> hosts, @@ -67,7 +68,8 @@ public class OpensearchSink<IN> implements Sink<IN> { DeliveryGuarantee deliveryGuarantee, BulkProcessorConfig buildBulkProcessorConfig, NetworkClientConfig networkClientConfig, - RestClientFactory restClientFactory) { + RestClientFactory restClientFactory, + FailureHandler failureHandler) { this.hosts = checkNotNull(hosts); checkArgument(!hosts.isEmpty(), "Hosts cannot be empty."); this.emitter = checkNotNull(emitter); @@ -75,6 +77,7 @@ public class OpensearchSink<IN> implements Sink<IN> { this.buildBulkProcessorConfig = checkNotNull(buildBulkProcessorConfig); this.networkClientConfig = checkNotNull(networkClientConfig); this.restClientFactory = checkNotNull(restClientFactory); + this.failureHandler = checkNotNull(failureHandler); } @Override @@ -87,7 +90,8 @@ public class OpensearchSink<IN> implements Sink<IN> { networkClientConfig, context.metricGroup(), context.getMailboxExecutor(), - restClientFactory); + restClientFactory, + failureHandler); } @VisibleForTesting diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java index b984120..736c607 100644 --- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java @@ -27,6 +27,7 @@ import org.apache.http.HttpHost; import java.util.Arrays; import java.util.List; +import static org.apache.flink.connector.opensearch.sink.OpensearchWriter.DEFAULT_FAILURE_HANDLER; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -73,6 +74,7 @@ public class OpensearchSinkBuilder<IN> { private Integer socketTimeout; private Boolean allowInsecure; private RestClientFactory restClientFactory; + private FailureHandler failureHandler = DEFAULT_FAILURE_HANDLER; public OpensearchSinkBuilder() { restClientFactory = new DefaultRestClientFactory(); @@ -300,6 +302,19 @@ public class OpensearchSinkBuilder<IN> { return self(); } + /** + * Allows to set custom failure handler. If not set, then the DEFAULT_FAILURE_HANDLER will be + * used which throws a runtime exception upon receiving a failure. + * + * @param failureHandler the custom handler + * @return this builder + */ + public OpensearchSinkBuilder<IN> setFailureHandler(FailureHandler failureHandler) { + checkNotNull(failureHandler); + this.failureHandler = failureHandler; + return self(); + } + /** * Constructs the {@link OpensearchSink} with the properties configured this builder. * @@ -318,7 +333,8 @@ public class OpensearchSinkBuilder<IN> { deliveryGuarantee, bulkProcessorConfig, networkClientConfig, - restClientFactory); + restClientFactory, + failureHandler); } private NetworkClientConfig buildNetworkClientConfig() { diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java index 1cf059b..68da301 100644 --- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java @@ -58,6 +58,11 @@ class OpensearchWriter<IN> implements SinkWriter<IN> { private static final Logger LOG = LoggerFactory.getLogger(OpensearchWriter.class); + public static final FailureHandler DEFAULT_FAILURE_HANDLER = + ex -> { + throw new FlinkRuntimeException(ex); + }; + private final OpensearchEmitter<? super IN> emitter; private final MailboxExecutor mailboxExecutor; private final boolean flushOnCheckpoint; @@ -65,6 +70,7 @@ class OpensearchWriter<IN> implements SinkWriter<IN> { private final RestHighLevelClient client; private final RequestIndexer requestIndexer; private final Counter numBytesOutCounter; + private final FailureHandler failureHandler; private long pendingActions = 0; private boolean checkpointInProgress = false; @@ -81,7 +87,6 @@ class OpensearchWriter<IN> implements SinkWriter<IN> { * checkpoint * @param bulkProcessorConfig describing the flushing and failure handling of the used {@link * BulkProcessor} - * @param bulkProcessorBuilderFactory configuring the {@link BulkProcessor}'s builder * @param networkClientConfig describing properties of the network connection used to connect to * the Opensearch cluster * @param metricGroup for the sink writer @@ -96,7 +101,8 @@ class OpensearchWriter<IN> implements SinkWriter<IN> { NetworkClientConfig networkClientConfig, SinkWriterMetricGroup metricGroup, MailboxExecutor mailboxExecutor, - RestClientFactory restClientFactory) { + RestClientFactory restClientFactory, + FailureHandler failureHandler) { this.emitter = checkNotNull(emitter); this.flushOnCheckpoint = flushOnCheckpoint; this.mailboxExecutor = checkNotNull(mailboxExecutor); @@ -117,6 +123,7 @@ class OpensearchWriter<IN> implements SinkWriter<IN> { } catch (Exception e) { throw new FlinkRuntimeException("Failed to open the OpensearchEmitter", e); } + this.failureHandler = failureHandler; } @Override @@ -278,7 +285,7 @@ class OpensearchWriter<IN> implements SinkWriter<IN> { if (chainedFailures == null) { return; } - throw new FlinkRuntimeException(chainedFailures); + failureHandler.onFailure(chainedFailures); } private static Throwable wrapException( diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java index deacd6c..afdf26b 100644 --- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java +++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java @@ -56,6 +56,7 @@ import java.util.Map; import java.util.Optional; import static org.apache.flink.connector.opensearch.sink.OpensearchTestClient.buildMessage; +import static org.apache.flink.connector.opensearch.sink.OpensearchWriter.DEFAULT_FAILURE_HANDLER; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link OpensearchWriter}. */ @@ -238,13 +239,61 @@ class OpensearchWriterITCase { } } + private static class TestHandler implements FailureHandler { + private boolean failed = false; + + private synchronized void setFailed() { + failed = true; + } + + public boolean isFailed() { + return failed; + } + + @Override + public void onFailure(Throwable failure) { + setFailed(); + } + } + + @Test + void testWriteErrorOnUpdate() throws Exception { + final String index = "test-bulk-flush-with-error"; + final int flushAfterNActions = 1; + final BulkProcessorConfig bulkProcessorConfig = + new BulkProcessorConfig(flushAfterNActions, -1, -1, FlushBackoffType.NONE, 0, 0); + + final TestHandler testHandler = new TestHandler(); + try (final OpensearchWriter<Tuple2<Integer, String>> writer = + createWriter(index, true, bulkProcessorConfig, testHandler)) { + // Trigger an error by updating non-existing document + writer.write(Tuple2.of(1, "u" + buildMessage(1)), null); + context.assertThatIdsAreNotWritten(index, 1); + assertThat(testHandler.isFailed()).isEqualTo(true); + } + } + private OpensearchWriter<Tuple2<Integer, String>> createWriter( String index, boolean flushOnCheckpoint, BulkProcessorConfig bulkProcessorConfig) { return createWriter( index, flushOnCheckpoint, bulkProcessorConfig, - InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup())); + InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), + DEFAULT_FAILURE_HANDLER); + } + + private OpensearchWriter<Tuple2<Integer, String>> createWriter( + String index, + boolean flushOnCheckpoint, + BulkProcessorConfig bulkProcessorConfig, + FailureHandler failureHandler) { + return createWriter( + index, + flushOnCheckpoint, + bulkProcessorConfig, + InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), + failureHandler); } private OpensearchWriter<Tuple2<Integer, String>> createWriter( @@ -252,6 +301,20 @@ class OpensearchWriterITCase { boolean flushOnCheckpoint, BulkProcessorConfig bulkProcessorConfig, SinkWriterMetricGroup metricGroup) { + return createWriter( + index, + flushOnCheckpoint, + bulkProcessorConfig, + metricGroup, + DEFAULT_FAILURE_HANDLER); + } + + private OpensearchWriter<Tuple2<Integer, String>> createWriter( + String index, + boolean flushOnCheckpoint, + BulkProcessorConfig bulkProcessorConfig, + SinkWriterMetricGroup metricGroup, + FailureHandler failureHandler) { return new OpensearchWriter<Tuple2<Integer, String>>( Collections.singletonList(HttpHost.create(OS_CONTAINER.getHttpHostAddress())), new UpdatingEmitter(index, context.getDataFieldName()), @@ -267,7 +330,8 @@ class OpensearchWriterITCase { true), metricGroup, new TestMailbox(), - new DefaultRestClientFactory()); + new DefaultRestClientFactory(), + failureHandler); } private static class UpdatingEmitter implements OpensearchEmitter<Tuple2<Integer, String>> {