This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git
The following commit(s) were added to refs/heads/main by this push: new 13c8338 [FLINK-31856] Add support for Opensearch Connector REST client customization 13c8338 is described below commit 13c8338a249843ce52b819307698181ff1a0f23a Author: Andriy Redko <andriy.re...@aiven.io> AuthorDate: Tue Jun 13 12:50:56 2023 -0400 [FLINK-31856] Add support for Opensearch Connector REST client customization --------- Signed-off-by: Andriy Redko <andriy.re...@aiven.io> --- .../opensearch/sink/DefaultRestClientConfig.java | 49 ++++++++++ .../opensearch/sink/DefaultRestClientFactory.java | 97 +++++++++++++++++++ .../connector/opensearch/sink/OpensearchSink.java | 8 +- .../opensearch/sink/OpensearchSinkBuilder.java | 24 ++++- .../opensearch/sink/OpensearchWriter.java | 85 ++--------------- .../opensearch/sink/RestClientFactory.java | 104 +++++++++++++++++++++ .../opensearch/sink/OpensearchSinkBuilderTest.java | 12 +++ .../opensearch/sink/OpensearchWriterITCase.java | 3 +- 8 files changed, 302 insertions(+), 80 deletions(-) diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DefaultRestClientConfig.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DefaultRestClientConfig.java new file mode 100644 index 0000000..312e47f --- /dev/null +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DefaultRestClientConfig.java @@ -0,0 +1,49 @@ +package org.apache.flink.connector.opensearch.sink; + +import javax.annotation.Nullable; + +import java.util.Optional; + +/** Provides the default implementation for {@link RestClientFactory.RestClientConfig}. */ +class DefaultRestClientConfig implements RestClientFactory.RestClientConfig { + private final NetworkClientConfig networkClientConfig; + + DefaultRestClientConfig(NetworkClientConfig networkClientConfig) { + this.networkClientConfig = networkClientConfig; + } + + @Override + public @Nullable String getUsername() { + return networkClientConfig.getUsername(); + } + + @Override + public @Nullable String getPassword() { + return networkClientConfig.getPassword(); + } + + @Override + public @Nullable Integer getConnectionRequestTimeout() { + return networkClientConfig.getConnectionRequestTimeout(); + } + + @Override + public @Nullable Integer getConnectionTimeout() { + return networkClientConfig.getConnectionTimeout(); + } + + @Override + public @Nullable Integer getSocketTimeout() { + return networkClientConfig.getSocketTimeout(); + } + + @Override + public @Nullable String getConnectionPathPrefix() { + return networkClientConfig.getConnectionPathPrefix(); + } + + @Override + public Optional<Boolean> isAllowInsecure() { + return networkClientConfig.isAllowInsecure(); + } +} diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DefaultRestClientFactory.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DefaultRestClientFactory.java new file mode 100644 index 0000000..44d2060 --- /dev/null +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DefaultRestClientFactory.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.conn.ssl.TrustAllStrategy; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; +import org.apache.http.ssl.SSLContexts; +import org.opensearch.client.RestClientBuilder; + +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; + +/** Provides the default implementation for {@link RestClientFactory}. */ +public class DefaultRestClientFactory implements RestClientFactory { + private static final long serialVersionUID = 1L; + + @Override + public void configureRestClientBuilder( + RestClientBuilder builder, RestClientConfig networkClientConfig) { + + if (networkClientConfig.getConnectionPathPrefix() != null) { + builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix()); + } + + builder.setHttpClientConfigCallback( + httpClientBuilder -> { + configureHttpClientBuilder(httpClientBuilder, networkClientConfig); + return httpClientBuilder; + }); + if (networkClientConfig.getConnectionRequestTimeout() != null + || networkClientConfig.getConnectionTimeout() != null + || networkClientConfig.getSocketTimeout() != null) { + builder.setRequestConfigCallback( + requestConfigBuilder -> { + if (networkClientConfig.getConnectionRequestTimeout() != null) { + requestConfigBuilder.setConnectionRequestTimeout( + networkClientConfig.getConnectionRequestTimeout()); + } + if (networkClientConfig.getConnectionTimeout() != null) { + requestConfigBuilder.setConnectTimeout( + networkClientConfig.getConnectionTimeout()); + } + if (networkClientConfig.getSocketTimeout() != null) { + requestConfigBuilder.setSocketTimeout( + networkClientConfig.getSocketTimeout()); + } + return requestConfigBuilder; + }); + } + } + + protected void configureHttpClientBuilder( + HttpAsyncClientBuilder httpClientBuilder, RestClientConfig networkClientConfig) { + if (networkClientConfig.getPassword() != null + && networkClientConfig.getUsername() != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials( + networkClientConfig.getUsername(), networkClientConfig.getPassword())); + + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } + + if (networkClientConfig.isAllowInsecure().orElse(false)) { + try { + httpClientBuilder.setSSLContext( + SSLContexts.custom().loadTrustMaterial(new TrustAllStrategy()).build()); + } catch (final NoSuchAlgorithmException + | KeyStoreException + | KeyManagementException ex) { + throw new IllegalStateException("Unable to create custom SSL context", ex); + } + } + } +} diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java index b23b9fc..ff0b00a 100644 --- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java @@ -59,19 +59,22 @@ public class OpensearchSink<IN> implements Sink<IN> { private final BulkProcessorConfig buildBulkProcessorConfig; private final NetworkClientConfig networkClientConfig; private final DeliveryGuarantee deliveryGuarantee; + private final RestClientFactory restClientFactory; OpensearchSink( List<HttpHost> hosts, OpensearchEmitter<? super IN> emitter, DeliveryGuarantee deliveryGuarantee, BulkProcessorConfig buildBulkProcessorConfig, - NetworkClientConfig networkClientConfig) { + NetworkClientConfig networkClientConfig, + RestClientFactory restClientFactory) { this.hosts = checkNotNull(hosts); checkArgument(!hosts.isEmpty(), "Hosts cannot be empty."); this.emitter = checkNotNull(emitter); this.deliveryGuarantee = checkNotNull(deliveryGuarantee); this.buildBulkProcessorConfig = checkNotNull(buildBulkProcessorConfig); this.networkClientConfig = checkNotNull(networkClientConfig); + this.restClientFactory = checkNotNull(restClientFactory); } @Override @@ -83,7 +86,8 @@ public class OpensearchSink<IN> implements Sink<IN> { buildBulkProcessorConfig, networkClientConfig, context.metricGroup(), - context.getMailboxExecutor()); + context.getMailboxExecutor(), + restClientFactory); } @VisibleForTesting diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java index 895ca03..b984120 100644 --- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java @@ -72,8 +72,11 @@ public class OpensearchSinkBuilder<IN> { private Integer connectionRequestTimeout; private Integer socketTimeout; private Boolean allowInsecure; + private RestClientFactory restClientFactory; - public OpensearchSinkBuilder() {} + public OpensearchSinkBuilder() { + restClientFactory = new DefaultRestClientFactory(); + } @SuppressWarnings("unchecked") protected <S extends OpensearchSinkBuilder<?>> S self() { @@ -285,6 +288,18 @@ public class OpensearchSinkBuilder<IN> { return self(); } + /** + * Sets the {@link RestClientFactory} to be used for configuring the instance of the OpenSearch + * REST client. + * + * @param restClientFactory the {@link RestClientFactory} instance + * @return this builder + */ + public OpensearchSinkBuilder<IN> setRestClientFactory(RestClientFactory restClientFactory) { + this.restClientFactory = checkNotNull(restClientFactory); + return self(); + } + /** * Constructs the {@link OpensearchSink} with the properties configured this builder. * @@ -298,7 +313,12 @@ public class OpensearchSinkBuilder<IN> { BulkProcessorConfig bulkProcessorConfig = buildBulkProcessorConfig(); return new OpensearchSink<>( - hosts, emitter, deliveryGuarantee, bulkProcessorConfig, networkClientConfig); + hosts, + emitter, + deliveryGuarantee, + bulkProcessorConfig, + networkClientConfig, + restClientFactory); } private NetworkClientConfig buildNetworkClientConfig() { diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java index 3231b28..1cf059b 100644 --- a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java @@ -27,12 +27,6 @@ import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.function.ThrowingRunnable; import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.conn.ssl.TrustAllStrategy; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.ssl.SSLContexts; import org.opensearch.action.ActionListener; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.bulk.BackoffPolicy; @@ -55,9 +49,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; import java.util.List; import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed; @@ -95,6 +86,7 @@ class OpensearchWriter<IN> implements SinkWriter<IN> { * the Opensearch cluster * @param metricGroup for the sink writer * @param mailboxExecutor Flink's mailbox executor + * @param restClientFactory Flink's mailbox executor */ OpensearchWriter( List<HttpHost> hosts, @@ -103,15 +95,18 @@ class OpensearchWriter<IN> implements SinkWriter<IN> { BulkProcessorConfig bulkProcessorConfig, NetworkClientConfig networkClientConfig, SinkWriterMetricGroup metricGroup, - MailboxExecutor mailboxExecutor) { + MailboxExecutor mailboxExecutor, + RestClientFactory restClientFactory) { this.emitter = checkNotNull(emitter); this.flushOnCheckpoint = flushOnCheckpoint; this.mailboxExecutor = checkNotNull(mailboxExecutor); - this.client = - new RestHighLevelClient( - configureRestClientBuilder( - RestClient.builder(hosts.toArray(new HttpHost[0])), - networkClientConfig)); + + final RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[0])); + checkNotNull(restClientFactory) + .configureRestClientBuilder( + builder, new DefaultRestClientConfig(networkClientConfig)); + + this.client = new RestHighLevelClient(builder); this.bulkProcessor = createBulkProcessor(bulkProcessorConfig); this.requestIndexer = new DefaultRequestIndexer(metricGroup.getNumRecordsSendCounter()); checkNotNull(metricGroup); @@ -161,66 +156,6 @@ class OpensearchWriter<IN> implements SinkWriter<IN> { client.close(); } - private static RestClientBuilder configureRestClientBuilder( - RestClientBuilder builder, NetworkClientConfig networkClientConfig) { - if (networkClientConfig.getConnectionPathPrefix() != null) { - builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix()); - } - - builder.setHttpClientConfigCallback( - httpClientBuilder -> { - if (networkClientConfig.getPassword() != null - && networkClientConfig.getUsername() != null) { - final CredentialsProvider credentialsProvider = - new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials( - networkClientConfig.getUsername(), - networkClientConfig.getPassword())); - - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); - } - - if (networkClientConfig.isAllowInsecure().orElse(false)) { - try { - httpClientBuilder.setSSLContext( - SSLContexts.custom() - .loadTrustMaterial(new TrustAllStrategy()) - .build()); - } catch (final NoSuchAlgorithmException - | KeyStoreException - | KeyManagementException ex) { - throw new IllegalStateException( - "Unable to create custom SSL context", ex); - } - } - - return httpClientBuilder; - }); - if (networkClientConfig.getConnectionRequestTimeout() != null - || networkClientConfig.getConnectionTimeout() != null - || networkClientConfig.getSocketTimeout() != null) { - builder.setRequestConfigCallback( - requestConfigBuilder -> { - if (networkClientConfig.getConnectionRequestTimeout() != null) { - requestConfigBuilder.setConnectionRequestTimeout( - networkClientConfig.getConnectionRequestTimeout()); - } - if (networkClientConfig.getConnectionTimeout() != null) { - requestConfigBuilder.setConnectTimeout( - networkClientConfig.getConnectionTimeout()); - } - if (networkClientConfig.getSocketTimeout() != null) { - requestConfigBuilder.setSocketTimeout( - networkClientConfig.getSocketTimeout()); - } - return requestConfigBuilder; - }); - } - return builder; - } - private BulkProcessor createBulkProcessor(BulkProcessorConfig bulkProcessorConfig) { final BulkProcessor.Builder builder = diff --git a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/RestClientFactory.java b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/RestClientFactory.java new file mode 100644 index 0000000..2f7c9bc --- /dev/null +++ b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/RestClientFactory.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.annotation.PublicEvolving; + +import org.opensearch.client.RestClientBuilder; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Optional; + +/** + * A factory that is used to configure the {@link org.opensearch.client.RestHighLevelClient} + * internally used in the {@link OpensearchSink}. + */ +@PublicEvolving +public interface RestClientFactory extends Serializable { + + /** The REST client configuration. */ + @PublicEvolving + interface RestClientConfig { + /** + * Gets the configured username. + * + * @return the configured username + */ + @Nullable + String getUsername(); + + /** + * Gets the configured password. + * + * @return the configured password + */ + @Nullable + String getPassword(); + + /** + * Gets the configured connection request timeout. + * + * @return the configured connection request timeout + */ + @Nullable + Integer getConnectionRequestTimeout(); + + /** + * Gets the configured connection timeout. + * + * @return the configured connection timeout + */ + @Nullable + Integer getConnectionTimeout(); + + /** + * Gets the configured socket timeout. + * + * @return the configured socket timeout + */ + @Nullable + Integer getSocketTimeout(); + + /** + * Gets the configured connection path prefix. + * + * @return the configured connection path prefix + */ + @Nullable + String getConnectionPathPrefix(); + + /** + * Returns if the insecure HTTPS connections are allowed or not (self-signed certificates, + * etc). + * + * @return if the insecure HTTPS connections are allowed or not + */ + Optional<Boolean> isAllowInsecure(); + } + + /** + * Configures the rest client builder. + * + * @param restClientBuilder the configured REST client builder. + * @param clientConfig the client network configuration + */ + void configureRestClientBuilder( + RestClientBuilder restClientBuilder, RestClientConfig clientConfig); +} diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java index cba2ddc..9939313 100644 --- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java +++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java @@ -99,6 +99,18 @@ class OpensearchSinkBuilderTest { .isInstanceOf(IllegalStateException.class); } + @Test + void testThrowIfRestClientFactoryNotSet() { + assertThatThrownBy(() -> createEmptyBuilder().setRestClientFactory(null).build()) + .isInstanceOf(NullPointerException.class); + } + + @Test + void testThrowIfConnectionPathPrefixNotSet() { + assertThatThrownBy(() -> createEmptyBuilder().setConnectionPathPrefix(null).build()) + .isInstanceOf(NullPointerException.class); + } + private OpensearchSinkBuilder<Object> createEmptyBuilder() { return new OpensearchSinkBuilder<>(); } diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java index 11a17fc..deacd6c 100644 --- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java +++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java @@ -266,7 +266,8 @@ class OpensearchWriterITCase { null, true), metricGroup, - new TestMailbox()); + new TestMailbox(), + new DefaultRestClientFactory()); } private static class UpdatingEmitter implements OpensearchEmitter<Tuple2<Integer, String>> {