This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git
The following commit(s) were added to refs/heads/main by this push: new 40774fa [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling 40774fa is described below commit 40774fad0f4ecd1a0d104dcb339e6bb860b0a4bf Author: Peter Fischer <pfisc...@wikimedia.org> AuthorDate: Fri Dec 1 12:29:32 2023 +0100 [FLINK-32028][connectors/elasticsearch] Allow customising bulk failure handling Extracted `BulkResponseInspector` interface to allow custom handling of (partially) failed bulk requests. If not overridden, default behaviour remains unchanged and partial failures are escalated. * fixes https://issues.apache.org/jira/browse/FLINK-32028 * allows custom metrics to be exposed --- .../elasticsearch/sink/BulkResponseInspector.java | 60 +++++++++ .../elasticsearch/sink/ElasticsearchSink.java | 12 +- .../sink/ElasticsearchSinkBuilderBase.java | 63 +++++++++- .../elasticsearch/sink/ElasticsearchWriter.java | 112 ++++++++++++----- .../elasticsearch/sink/FailureHandler.java | 36 ++++++ .../sink/DefaultBulkResponseInspectorTest.java | 127 +++++++++++++++++++ .../sink/ElasticsearchSinkBuilderBaseTest.java | 136 +++++++++++++++++++++ .../sink/ElasticsearchWriterITCase.java | 2 + 8 files changed, 513 insertions(+), 35 deletions(-) diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/BulkResponseInspector.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/BulkResponseInspector.java new file mode 100644 index 0000000..9f4ce10 --- /dev/null +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/BulkResponseInspector.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.function.SerializableFunction; + +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; + +/** Callback for inspecting a {@link BulkResponse}. */ +@PublicEvolving +@FunctionalInterface +public interface BulkResponseInspector { + + /** + * Callback to inspect a {@code response} in the context of its {@code request}. It may throw a + * {@link org.apache.flink.util.FlinkRuntimeException} to indicate that the bulk failed + * (partially). + */ + void inspect(BulkRequest request, BulkResponse response); + + /** + * Factory interface for creating a {@link BulkResponseInspector} in the context of a sink. + * Allows obtaining a {@link org.apache.flink.metrics.MetricGroup} to capture custom metrics. + */ + @PublicEvolving + @FunctionalInterface + interface BulkResponseInspectorFactory + extends SerializableFunction< + BulkResponseInspectorFactory.InitContext, BulkResponseInspector> { + + /** + * The interface exposes a subset of {@link + * org.apache.flink.api.connector.sink2.Sink.InitContext}. + */ + interface InitContext { + + /** Returns: The metric group of the surrounding writer. */ + MetricGroup metricGroup(); + } + } +} diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java index efe6dc2..05ac47a 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java @@ -23,6 +23,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.elasticsearch.sink.BulkResponseInspector.BulkResponseInspectorFactory; import org.apache.http.HttpHost; @@ -58,6 +59,7 @@ public class ElasticsearchSink<IN> implements Sink<IN> { private final ElasticsearchEmitter<? super IN> emitter; private final BulkProcessorConfig buildBulkProcessorConfig; private final BulkProcessorBuilderFactory bulkProcessorBuilderFactory; + private final BulkResponseInspectorFactory bulkResponseInspectorFactory; private final NetworkClientConfig networkClientConfig; private final DeliveryGuarantee deliveryGuarantee; @@ -67,9 +69,11 @@ public class ElasticsearchSink<IN> implements Sink<IN> { DeliveryGuarantee deliveryGuarantee, BulkProcessorBuilderFactory bulkProcessorBuilderFactory, BulkProcessorConfig buildBulkProcessorConfig, - NetworkClientConfig networkClientConfig) { + NetworkClientConfig networkClientConfig, + BulkResponseInspectorFactory bulkResponseInspectorFactory) { this.hosts = checkNotNull(hosts); this.bulkProcessorBuilderFactory = checkNotNull(bulkProcessorBuilderFactory); + this.bulkResponseInspectorFactory = checkNotNull(bulkResponseInspectorFactory); checkArgument(!hosts.isEmpty(), "Hosts cannot be empty."); this.emitter = checkNotNull(emitter); this.deliveryGuarantee = checkNotNull(deliveryGuarantee); @@ -85,6 +89,7 @@ public class ElasticsearchSink<IN> implements Sink<IN> { deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE, buildBulkProcessorConfig, bulkProcessorBuilderFactory, + bulkResponseInspectorFactory.apply(context::metricGroup), networkClientConfig, context.metricGroup(), context.getMailboxExecutor()); @@ -94,4 +99,9 @@ public class ElasticsearchSink<IN> implements Sink<IN> { DeliveryGuarantee getDeliveryGuarantee() { return deliveryGuarantee; } + + @VisibleForTesting + BulkResponseInspectorFactory getBulkResponseInspectorFactory() { + return bulkResponseInspectorFactory; + } } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java index 3d51356..2904eff 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java @@ -22,6 +22,9 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.elasticsearch.sink.BulkResponseInspector.BulkResponseInspectorFactory; +import org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector; +import org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultFailureHandler; import org.apache.flink.util.InstantiationUtil; import org.apache.http.HttpHost; @@ -57,6 +60,8 @@ public abstract class ElasticsearchSinkBuilderBase< private Integer connectionTimeout; private Integer connectionRequestTimeout; private Integer socketTimeout; + private FailureHandler failureHandler = new DefaultFailureHandler(); + private BulkResponseInspectorFactory bulkResponseInspectorFactory; protected ElasticsearchSinkBuilderBase() {} @@ -258,8 +263,41 @@ public abstract class ElasticsearchSinkBuilderBase< return self(); } + /** + * Overrides the default {@link FailureHandler}. A custom failure handler can handle partial + * failures gracefully. See {@link #bulkResponseInspectorFactory} for more extensive control. + * + * @param failureHandler the handler + * @see #bulkResponseInspectorFactory + * @return this builder + */ + public B setFailureHandler(FailureHandler failureHandler) { + this.failureHandler = checkNotNull(failureHandler); + return self(); + } + + /** + * Overrides the default {@link BulkResponseInspectorFactory}. A custom {@link + * BulkResponseInspector}, for example, can change the failure handling and capture additional + * metrics. See {@link #failureHandler} for a simpler way of handling failures. + * + * @param bulkResponseInspectorFactory the factory + * @return this builder + */ + public B setBulkResponseInspectorFactory( + BulkResponseInspectorFactory bulkResponseInspectorFactory) { + this.bulkResponseInspectorFactory = checkNotNull(bulkResponseInspectorFactory); + return self(); + } + protected abstract BulkProcessorBuilderFactory getBulkProcessorBuilderFactory(); + protected BulkResponseInspectorFactory getBulkResponseInspectorFactory() { + return this.bulkResponseInspectorFactory == null + ? new DefaultBulkResponseInspectorFactory(failureHandler) + : this.bulkResponseInspectorFactory; + } + /** * Constructs the {@link ElasticsearchSink} with the properties configured this builder. * @@ -276,13 +314,17 @@ public abstract class ElasticsearchSinkBuilderBase< ClosureCleaner.clean( bulkProcessorBuilderFactory, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + final BulkResponseInspectorFactory bulkResponseInspectorFactory = + getBulkResponseInspectorFactory(); + return new ElasticsearchSink<>( hosts, emitter, deliveryGuarantee, bulkProcessorBuilderFactory, bulkProcessorConfig, - networkClientConfig); + networkClientConfig, + bulkResponseInspectorFactory); } private NetworkClientConfig buildNetworkClientConfig() { @@ -339,4 +381,23 @@ public abstract class ElasticsearchSinkBuilderBase< + '\'' + '}'; } + + /** + * Default factory for {@link FailureHandler}-bound {@link BulkResponseInspector + * BulkResponseInspectors}. A Static class is used instead of anonymous/lambda to avoid + * non-serializable references to {@link ElasticsearchSinkBuilderBase}. + */ + static class DefaultBulkResponseInspectorFactory implements BulkResponseInspectorFactory { + + private final FailureHandler failureHandler; + + DefaultBulkResponseInspectorFactory(FailureHandler failureHandler) { + this.failureHandler = failureHandler; + } + + @Override + public BulkResponseInspector apply(InitContext context) { + return new DefaultBulkResponseInspector(failureHandler); + } + } } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java index fa8ed67..b3d8ed5 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java @@ -91,6 +91,7 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN> { boolean flushOnCheckpoint, BulkProcessorConfig bulkProcessorConfig, BulkProcessorBuilderFactory bulkProcessorBuilderFactory, + BulkResponseInspector bulkResponseInspector, NetworkClientConfig networkClientConfig, SinkWriterMetricGroup metricGroup, MailboxExecutor mailboxExecutor) { @@ -102,9 +103,13 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN> { configureRestClientBuilder( RestClient.builder(hosts.toArray(new HttpHost[0])), networkClientConfig)); - this.bulkProcessor = createBulkProcessor(bulkProcessorBuilderFactory, bulkProcessorConfig); - this.requestIndexer = new DefaultRequestIndexer(metricGroup.getNumRecordsSendCounter()); + this.bulkProcessor = + createBulkProcessor( + bulkProcessorBuilderFactory, + bulkProcessorConfig, + checkNotNull(bulkResponseInspector)); checkNotNull(metricGroup); + this.requestIndexer = new DefaultRequestIndexer(metricGroup.getNumRecordsSendCounter()); metricGroup.setCurrentSendTimeGauge(() -> ackTime - lastSendTime); this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter(); try { @@ -192,10 +197,12 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN> { private BulkProcessor createBulkProcessor( BulkProcessorBuilderFactory bulkProcessorBuilderFactory, - BulkProcessorConfig bulkProcessorConfig) { + BulkProcessorConfig bulkProcessorConfig, + BulkResponseInspector bulkResponseInspector) { BulkProcessor.Builder builder = - bulkProcessorBuilderFactory.apply(client, bulkProcessorConfig, new BulkListener()); + bulkProcessorBuilderFactory.apply( + client, bulkProcessorConfig, new BulkListener(bulkResponseInspector)); // This makes flush() blocking builder.setConcurrentRequests(0); @@ -205,6 +212,12 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN> { private class BulkListener implements BulkProcessor.Listener { + private final BulkResponseInspector bulkResponseInspector; + + public BulkListener(BulkResponseInspector bulkResponseInspector) { + this.bulkResponseInspector = bulkResponseInspector; + } + @Override public void beforeBulk(long executionId, BulkRequest request) { LOG.info("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions()); @@ -227,6 +240,11 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN> { }, "elasticsearchErrorCallback"); } + + private void extractFailures(BulkRequest request, BulkResponse response) { + bulkResponseInspector.inspect(request, response); + pendingActions -= request.numberOfActions(); + } } private void enqueueActionInMailbox( @@ -241,35 +259,6 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN> { mailboxExecutor.execute(action, actionName); } - private void extractFailures(BulkRequest request, BulkResponse response) { - if (!response.hasFailures()) { - pendingActions -= request.numberOfActions(); - return; - } - - Throwable chainedFailures = null; - for (int i = 0; i < response.getItems().length; i++) { - final BulkItemResponse itemResponse = response.getItems()[i]; - if (!itemResponse.isFailed()) { - continue; - } - final Throwable failure = itemResponse.getFailure().getCause(); - if (failure == null) { - continue; - } - final RestStatus restStatus = itemResponse.getFailure().getStatus(); - final DocWriteRequest<?> actionRequest = request.requests().get(i); - - chainedFailures = - firstOrSuppressed( - wrapException(restStatus, failure, actionRequest), chainedFailures); - } - if (chainedFailures == null) { - return; - } - throw new FlinkRuntimeException(chainedFailures); - } - private static Throwable wrapException( RestStatus restStatus, Throwable rootFailure, DocWriteRequest<?> actionRequest) { if (restStatus == null) { @@ -327,4 +316,61 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN> { } } } + + /** + * A strict implementation that fails if either the whole bulk request failed or any of its + * actions. + */ + static class DefaultBulkResponseInspector implements BulkResponseInspector { + + @VisibleForTesting final FailureHandler failureHandler; + + DefaultBulkResponseInspector() { + this(new DefaultFailureHandler()); + } + + DefaultBulkResponseInspector(FailureHandler failureHandler) { + this.failureHandler = checkNotNull(failureHandler); + } + + @Override + public void inspect(BulkRequest request, BulkResponse response) { + if (!response.hasFailures()) { + return; + } + + Throwable chainedFailures = null; + for (int i = 0; i < response.getItems().length; i++) { + final BulkItemResponse itemResponse = response.getItems()[i]; + if (!itemResponse.isFailed()) { + continue; + } + final Throwable failure = itemResponse.getFailure().getCause(); + if (failure == null) { + continue; + } + final RestStatus restStatus = itemResponse.getFailure().getStatus(); + final DocWriteRequest<?> actionRequest = request.requests().get(i); + + chainedFailures = + firstOrSuppressed( + wrapException(restStatus, failure, actionRequest), chainedFailures); + } + if (chainedFailures == null) { + return; + } + failureHandler.onFailure(chainedFailures); + } + } + + static class DefaultFailureHandler implements FailureHandler { + + @Override + public void onFailure(Throwable failure) { + if (failure instanceof FlinkRuntimeException) { + throw (FlinkRuntimeException) failure; + } + throw new FlinkRuntimeException(failure); + } + } } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/FailureHandler.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/FailureHandler.java new file mode 100644 index 0000000..84eb8c5 --- /dev/null +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/FailureHandler.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; + +/** A handler for partial bulk action failures. */ +@PublicEvolving +@FunctionalInterface +public interface FailureHandler extends Serializable { + + /** + * Callback that is passed the first bulk action failure. + * + * @param failure the first bulk action failure + */ + void onFailure(Throwable failure); +} diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/DefaultBulkResponseInspectorTest.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/DefaultBulkResponseInspectorTest.java new file mode 100644 index 0000000..49ae0ad --- /dev/null +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/DefaultBulkResponseInspectorTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.elasticsearch.sink; + +import org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector; +import org.apache.flink.util.FlinkRuntimeException; + +import org.assertj.core.api.Assertions; +import org.elasticsearch.action.DocWriteRequest.OpType; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkItemResponse.Failure; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +class DefaultBulkResponseInspectorTest { + + @Test + void testPassWithoutFailures() { + final DefaultBulkResponseInspector inspector = new DefaultBulkResponseInspector(); + Assertions.assertThatCode( + () -> + inspector.inspect( + new BulkRequest(), + new BulkResponse(new BulkItemResponse[] {}, 0))) + .doesNotThrowAnyException(); + } + + @Test + void testPassesDespiteChainedFailure() { + final DefaultBulkResponseInspector inspector = + new DefaultBulkResponseInspector((failure) -> {}); + Assertions.assertThatCode( + () -> { + final BulkRequest request = new BulkRequest(); + request.add( + new IndexRequest(), new DeleteRequest(), new DeleteRequest()); + + inspector.inspect( + request, + new BulkResponse( + new BulkItemResponse[] { + new BulkItemResponse( + 0, OpType.CREATE, (DocWriteResponse) null), + new BulkItemResponse( + 1, + OpType.DELETE, + new Failure( + "index", + "type", + "id", + new IOException("A"))), + new BulkItemResponse( + 2, + OpType.DELETE, + new Failure( + "index", + "type", + "id", + new IOException("B"))) + }, + 0)); + }) + .doesNotThrowAnyException(); + } + + @Test + void testThrowsChainedFailure() { + final IOException failureCause0 = new IOException("A"); + final IOException failureCause1 = new IOException("B"); + final DefaultBulkResponseInspector inspector = new DefaultBulkResponseInspector(); + Assertions.assertThatExceptionOfType(FlinkRuntimeException.class) + .isThrownBy( + () -> { + final BulkRequest request = new BulkRequest(); + request.add( + new IndexRequest(), new DeleteRequest(), new DeleteRequest()); + + inspector.inspect( + request, + new BulkResponse( + new BulkItemResponse[] { + new BulkItemResponse( + 0, OpType.CREATE, (DocWriteResponse) null), + new BulkItemResponse( + 1, + OpType.DELETE, + new Failure( + "index", + "type", + "id", + failureCause0)), + new BulkItemResponse( + 2, + OpType.DELETE, + new Failure( + "index", + "type", + "id", + failureCause1)) + }, + 0)); + }) + .withCause(failureCause0); + } +} diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java index 3fc2abc..d2ec22d 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java @@ -17,8 +17,26 @@ package org.apache.flink.connector.elasticsearch.sink; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.Sink.InitContext; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.elasticsearch.sink.BulkResponseInspector.BulkResponseInspectorFactory; +import org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.SimpleUserCodeClassLoader; import org.apache.flink.util.TestLoggerExtension; +import org.apache.flink.util.UserCodeClassLoader; +import org.apache.flink.util.function.ThrowingRunnable; import org.apache.http.HttpHost; import org.junit.jupiter.api.DynamicTest; @@ -27,6 +45,8 @@ import org.junit.jupiter.api.TestFactory; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.extension.ExtendWith; +import java.util.OptionalLong; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -99,7 +119,123 @@ abstract class ElasticsearchSinkBuilderBaseTest<B extends ElasticsearchSinkBuild .isInstanceOf(IllegalStateException.class); } + @Test + void testOverrideFailureHandler() { + final FailureHandler failureHandler = (failure) -> {}; + final ElasticsearchSink<Object> sink = + createMinimalBuilder().setFailureHandler(failureHandler).build(); + + final InitContext sinkInitContext = new MockInitContext(); + final BulkResponseInspector bulkResponseInspector = + sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup); + assertThat(bulkResponseInspector) + .isInstanceOf(DefaultBulkResponseInspector.class) + .extracting( + (inspector) -> ((DefaultBulkResponseInspector) inspector).failureHandler) + .isEqualTo(failureHandler); + } + + @Test + void testOverrideBulkResponseInspectorFactory() { + final AtomicBoolean called = new AtomicBoolean(); + final BulkResponseInspectorFactory bulkResponseInspectorFactory = + initContext -> { + final MetricGroup metricGroup = initContext.metricGroup(); + metricGroup.addGroup("bulk").addGroup("result", "failed").counter("actions"); + called.set(true); + return (BulkResponseInspector) (request, response) -> {}; + }; + final ElasticsearchSink<Object> sink = + createMinimalBuilder() + .setBulkResponseInspectorFactory(bulkResponseInspectorFactory) + .build(); + + final InitContext sinkInitContext = new MockInitContext(); + + assertThatCode(() -> sink.createWriter(sinkInitContext)).doesNotThrowAnyException(); + assertThat(called).isTrue(); + } + abstract B createEmptyBuilder(); abstract B createMinimalBuilder(); + + private static class DummyMailboxExecutor implements MailboxExecutor { + private DummyMailboxExecutor() {} + + public void execute( + ThrowingRunnable<? extends Exception> command, + String descriptionFormat, + Object... descriptionArgs) {} + + public void yield() throws InterruptedException, FlinkRuntimeException {} + + public boolean tryYield() throws FlinkRuntimeException { + return false; + } + } + + private static class MockInitContext + implements Sink.InitContext, SerializationSchema.InitializationContext { + + public UserCodeClassLoader getUserCodeClassLoader() { + return SimpleUserCodeClassLoader.create( + ElasticsearchSinkBuilderBaseTest.class.getClassLoader()); + } + + public MailboxExecutor getMailboxExecutor() { + return new ElasticsearchSinkBuilderBaseTest.DummyMailboxExecutor(); + } + + public ProcessingTimeService getProcessingTimeService() { + return new TestProcessingTimeService(); + } + + public int getSubtaskId() { + return 0; + } + + public int getNumberOfParallelSubtasks() { + return 0; + } + + public int getAttemptNumber() { + return 0; + } + + public SinkWriterMetricGroup metricGroup() { + return InternalSinkWriterMetricGroup.wrap( + new TestingSinkWriterMetricGroup.Builder() + .setIoMetricGroupSupplier( + UnregisteredMetricsGroup::createOperatorIOMetricGroup) + .setParentMetricGroup( + UnregisteredMetricsGroup.createOperatorMetricGroup()) + .build()); + } + + public MetricGroup getMetricGroup() { + return this.metricGroup(); + } + + public OptionalLong getRestoredCheckpointId() { + return OptionalLong.empty(); + } + + public SerializationSchema.InitializationContext + asSerializationSchemaInitializationContext() { + return this; + } + + public boolean isObjectReuseEnabled() { + return false; + } + + public <IN> TypeSerializer<IN> createInputSerializer() { + throw new UnsupportedOperationException(); + } + + public JobID getJobId() { + throw new UnsupportedOperationException(); + } + } } diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java index e8002cc..25a5f18 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.elasticsearch.ElasticsearchUtil; +import org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector; import org.apache.flink.connector.elasticsearch.test.DockerImageVersions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; @@ -283,6 +284,7 @@ class ElasticsearchWriterITCase { flushOnCheckpoint, bulkProcessorConfig, new TestBulkProcessorBuilderFactory(), + new DefaultBulkResponseInspector(), new NetworkClientConfig(null, null, null, null, null, null), metricGroup, new TestMailbox());