This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-opensearch.git


The following commit(s) were added to refs/heads/main by this push:
     new 13c8338  [FLINK-31856] Add support for Opensearch Connector REST 
client customization
13c8338 is described below

commit 13c8338a249843ce52b819307698181ff1a0f23a
Author: Andriy Redko <andriy.re...@aiven.io>
AuthorDate: Tue Jun 13 12:50:56 2023 -0400

    [FLINK-31856] Add support for Opensearch Connector REST client customization
    
    
    ---------
    
    Signed-off-by: Andriy Redko <andriy.re...@aiven.io>
---
 .../opensearch/sink/DefaultRestClientConfig.java   |  49 ++++++++++
 .../opensearch/sink/DefaultRestClientFactory.java  |  97 +++++++++++++++++++
 .../connector/opensearch/sink/OpensearchSink.java  |   8 +-
 .../opensearch/sink/OpensearchSinkBuilder.java     |  24 ++++-
 .../opensearch/sink/OpensearchWriter.java          |  85 ++---------------
 .../opensearch/sink/RestClientFactory.java         | 104 +++++++++++++++++++++
 .../opensearch/sink/OpensearchSinkBuilderTest.java |  12 +++
 .../opensearch/sink/OpensearchWriterITCase.java    |   3 +-
 8 files changed, 302 insertions(+), 80 deletions(-)

diff --git 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DefaultRestClientConfig.java
 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DefaultRestClientConfig.java
new file mode 100644
index 0000000..312e47f
--- /dev/null
+++ 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DefaultRestClientConfig.java
@@ -0,0 +1,49 @@
+package org.apache.flink.connector.opensearch.sink;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+/** Provides the default implementation for {@link 
RestClientFactory.RestClientConfig}. */
+class DefaultRestClientConfig implements RestClientFactory.RestClientConfig {
+    private final NetworkClientConfig networkClientConfig;
+
+    DefaultRestClientConfig(NetworkClientConfig networkClientConfig) {
+        this.networkClientConfig = networkClientConfig;
+    }
+
+    @Override
+    public @Nullable String getUsername() {
+        return networkClientConfig.getUsername();
+    }
+
+    @Override
+    public @Nullable String getPassword() {
+        return networkClientConfig.getPassword();
+    }
+
+    @Override
+    public @Nullable Integer getConnectionRequestTimeout() {
+        return networkClientConfig.getConnectionRequestTimeout();
+    }
+
+    @Override
+    public @Nullable Integer getConnectionTimeout() {
+        return networkClientConfig.getConnectionTimeout();
+    }
+
+    @Override
+    public @Nullable Integer getSocketTimeout() {
+        return networkClientConfig.getSocketTimeout();
+    }
+
+    @Override
+    public @Nullable String getConnectionPathPrefix() {
+        return networkClientConfig.getConnectionPathPrefix();
+    }
+
+    @Override
+    public Optional<Boolean> isAllowInsecure() {
+        return networkClientConfig.isAllowInsecure();
+    }
+}
diff --git 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DefaultRestClientFactory.java
 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DefaultRestClientFactory.java
new file mode 100644
index 0000000..44d2060
--- /dev/null
+++ 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/DefaultRestClientFactory.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.conn.ssl.TrustAllStrategy;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.opensearch.client.RestClientBuilder;
+
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+
+/** Provides the default implementation for {@link RestClientFactory}. */
+public class DefaultRestClientFactory implements RestClientFactory {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void configureRestClientBuilder(
+            RestClientBuilder builder, RestClientConfig networkClientConfig) {
+
+        if (networkClientConfig.getConnectionPathPrefix() != null) {
+            
builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix());
+        }
+
+        builder.setHttpClientConfigCallback(
+                httpClientBuilder -> {
+                    configureHttpClientBuilder(httpClientBuilder, 
networkClientConfig);
+                    return httpClientBuilder;
+                });
+        if (networkClientConfig.getConnectionRequestTimeout() != null
+                || networkClientConfig.getConnectionTimeout() != null
+                || networkClientConfig.getSocketTimeout() != null) {
+            builder.setRequestConfigCallback(
+                    requestConfigBuilder -> {
+                        if (networkClientConfig.getConnectionRequestTimeout() 
!= null) {
+                            requestConfigBuilder.setConnectionRequestTimeout(
+                                    
networkClientConfig.getConnectionRequestTimeout());
+                        }
+                        if (networkClientConfig.getConnectionTimeout() != 
null) {
+                            requestConfigBuilder.setConnectTimeout(
+                                    
networkClientConfig.getConnectionTimeout());
+                        }
+                        if (networkClientConfig.getSocketTimeout() != null) {
+                            requestConfigBuilder.setSocketTimeout(
+                                    networkClientConfig.getSocketTimeout());
+                        }
+                        return requestConfigBuilder;
+                    });
+        }
+    }
+
+    protected void configureHttpClientBuilder(
+            HttpAsyncClientBuilder httpClientBuilder, RestClientConfig 
networkClientConfig) {
+        if (networkClientConfig.getPassword() != null
+                && networkClientConfig.getUsername() != null) {
+            final CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
+            credentialsProvider.setCredentials(
+                    AuthScope.ANY,
+                    new UsernamePasswordCredentials(
+                            networkClientConfig.getUsername(), 
networkClientConfig.getPassword()));
+
+            
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+        }
+
+        if (networkClientConfig.isAllowInsecure().orElse(false)) {
+            try {
+                httpClientBuilder.setSSLContext(
+                        SSLContexts.custom().loadTrustMaterial(new 
TrustAllStrategy()).build());
+            } catch (final NoSuchAlgorithmException
+                    | KeyStoreException
+                    | KeyManagementException ex) {
+                throw new IllegalStateException("Unable to create custom SSL 
context", ex);
+            }
+        }
+    }
+}
diff --git 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java
 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java
index b23b9fc..ff0b00a 100644
--- 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java
+++ 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java
@@ -59,19 +59,22 @@ public class OpensearchSink<IN> implements Sink<IN> {
     private final BulkProcessorConfig buildBulkProcessorConfig;
     private final NetworkClientConfig networkClientConfig;
     private final DeliveryGuarantee deliveryGuarantee;
+    private final RestClientFactory restClientFactory;
 
     OpensearchSink(
             List<HttpHost> hosts,
             OpensearchEmitter<? super IN> emitter,
             DeliveryGuarantee deliveryGuarantee,
             BulkProcessorConfig buildBulkProcessorConfig,
-            NetworkClientConfig networkClientConfig) {
+            NetworkClientConfig networkClientConfig,
+            RestClientFactory restClientFactory) {
         this.hosts = checkNotNull(hosts);
         checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
         this.emitter = checkNotNull(emitter);
         this.deliveryGuarantee = checkNotNull(deliveryGuarantee);
         this.buildBulkProcessorConfig = checkNotNull(buildBulkProcessorConfig);
         this.networkClientConfig = checkNotNull(networkClientConfig);
+        this.restClientFactory = checkNotNull(restClientFactory);
     }
 
     @Override
@@ -83,7 +86,8 @@ public class OpensearchSink<IN> implements Sink<IN> {
                 buildBulkProcessorConfig,
                 networkClientConfig,
                 context.metricGroup(),
-                context.getMailboxExecutor());
+                context.getMailboxExecutor(),
+                restClientFactory);
     }
 
     @VisibleForTesting
diff --git 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java
 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java
index 895ca03..b984120 100644
--- 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java
+++ 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilder.java
@@ -72,8 +72,11 @@ public class OpensearchSinkBuilder<IN> {
     private Integer connectionRequestTimeout;
     private Integer socketTimeout;
     private Boolean allowInsecure;
+    private RestClientFactory restClientFactory;
 
-    public OpensearchSinkBuilder() {}
+    public OpensearchSinkBuilder() {
+        restClientFactory = new DefaultRestClientFactory();
+    }
 
     @SuppressWarnings("unchecked")
     protected <S extends OpensearchSinkBuilder<?>> S self() {
@@ -285,6 +288,18 @@ public class OpensearchSinkBuilder<IN> {
         return self();
     }
 
+    /**
+     * Sets the {@link RestClientFactory} to be used for configuring the 
instance of the OpenSearch
+     * REST client.
+     *
+     * @param restClientFactory the {@link RestClientFactory} instance
+     * @return this builder
+     */
+    public OpensearchSinkBuilder<IN> setRestClientFactory(RestClientFactory 
restClientFactory) {
+        this.restClientFactory = checkNotNull(restClientFactory);
+        return self();
+    }
+
     /**
      * Constructs the {@link OpensearchSink} with the properties configured 
this builder.
      *
@@ -298,7 +313,12 @@ public class OpensearchSinkBuilder<IN> {
         BulkProcessorConfig bulkProcessorConfig = buildBulkProcessorConfig();
 
         return new OpensearchSink<>(
-                hosts, emitter, deliveryGuarantee, bulkProcessorConfig, 
networkClientConfig);
+                hosts,
+                emitter,
+                deliveryGuarantee,
+                bulkProcessorConfig,
+                networkClientConfig,
+                restClientFactory);
     }
 
     private NetworkClientConfig buildNetworkClientConfig() {
diff --git 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java
 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java
index 3231b28..1cf059b 100644
--- 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java
+++ 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java
@@ -27,12 +27,6 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.conn.ssl.TrustAllStrategy;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.ssl.SSLContexts;
 import org.opensearch.action.ActionListener;
 import org.opensearch.action.DocWriteRequest;
 import org.opensearch.action.bulk.BackoffPolicy;
@@ -55,9 +49,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
 import java.util.List;
 
 import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
@@ -95,6 +86,7 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
      *     the Opensearch cluster
      * @param metricGroup for the sink writer
      * @param mailboxExecutor Flink's mailbox executor
+     * @param restClientFactory Flink's mailbox executor
      */
     OpensearchWriter(
             List<HttpHost> hosts,
@@ -103,15 +95,18 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
             BulkProcessorConfig bulkProcessorConfig,
             NetworkClientConfig networkClientConfig,
             SinkWriterMetricGroup metricGroup,
-            MailboxExecutor mailboxExecutor) {
+            MailboxExecutor mailboxExecutor,
+            RestClientFactory restClientFactory) {
         this.emitter = checkNotNull(emitter);
         this.flushOnCheckpoint = flushOnCheckpoint;
         this.mailboxExecutor = checkNotNull(mailboxExecutor);
-        this.client =
-                new RestHighLevelClient(
-                        configureRestClientBuilder(
-                                RestClient.builder(hosts.toArray(new 
HttpHost[0])),
-                                networkClientConfig));
+
+        final RestClientBuilder builder = RestClient.builder(hosts.toArray(new 
HttpHost[0]));
+        checkNotNull(restClientFactory)
+                .configureRestClientBuilder(
+                        builder, new 
DefaultRestClientConfig(networkClientConfig));
+
+        this.client = new RestHighLevelClient(builder);
         this.bulkProcessor = createBulkProcessor(bulkProcessorConfig);
         this.requestIndexer = new 
DefaultRequestIndexer(metricGroup.getNumRecordsSendCounter());
         checkNotNull(metricGroup);
@@ -161,66 +156,6 @@ class OpensearchWriter<IN> implements SinkWriter<IN> {
         client.close();
     }
 
-    private static RestClientBuilder configureRestClientBuilder(
-            RestClientBuilder builder, NetworkClientConfig 
networkClientConfig) {
-        if (networkClientConfig.getConnectionPathPrefix() != null) {
-            
builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix());
-        }
-
-        builder.setHttpClientConfigCallback(
-                httpClientBuilder -> {
-                    if (networkClientConfig.getPassword() != null
-                            && networkClientConfig.getUsername() != null) {
-                        final CredentialsProvider credentialsProvider =
-                                new BasicCredentialsProvider();
-                        credentialsProvider.setCredentials(
-                                AuthScope.ANY,
-                                new UsernamePasswordCredentials(
-                                        networkClientConfig.getUsername(),
-                                        networkClientConfig.getPassword()));
-
-                        
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
-                    }
-
-                    if (networkClientConfig.isAllowInsecure().orElse(false)) {
-                        try {
-                            httpClientBuilder.setSSLContext(
-                                    SSLContexts.custom()
-                                            .loadTrustMaterial(new 
TrustAllStrategy())
-                                            .build());
-                        } catch (final NoSuchAlgorithmException
-                                | KeyStoreException
-                                | KeyManagementException ex) {
-                            throw new IllegalStateException(
-                                    "Unable to create custom SSL context", ex);
-                        }
-                    }
-
-                    return httpClientBuilder;
-                });
-        if (networkClientConfig.getConnectionRequestTimeout() != null
-                || networkClientConfig.getConnectionTimeout() != null
-                || networkClientConfig.getSocketTimeout() != null) {
-            builder.setRequestConfigCallback(
-                    requestConfigBuilder -> {
-                        if (networkClientConfig.getConnectionRequestTimeout() 
!= null) {
-                            requestConfigBuilder.setConnectionRequestTimeout(
-                                    
networkClientConfig.getConnectionRequestTimeout());
-                        }
-                        if (networkClientConfig.getConnectionTimeout() != 
null) {
-                            requestConfigBuilder.setConnectTimeout(
-                                    
networkClientConfig.getConnectionTimeout());
-                        }
-                        if (networkClientConfig.getSocketTimeout() != null) {
-                            requestConfigBuilder.setSocketTimeout(
-                                    networkClientConfig.getSocketTimeout());
-                        }
-                        return requestConfigBuilder;
-                    });
-        }
-        return builder;
-    }
-
     private BulkProcessor createBulkProcessor(BulkProcessorConfig 
bulkProcessorConfig) {
 
         final BulkProcessor.Builder builder =
diff --git 
a/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/RestClientFactory.java
 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/RestClientFactory.java
new file mode 100644
index 0000000..2f7c9bc
--- /dev/null
+++ 
b/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/RestClientFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.opensearch.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import org.opensearch.client.RestClientBuilder;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/**
+ * A factory that is used to configure the {@link 
org.opensearch.client.RestHighLevelClient}
+ * internally used in the {@link OpensearchSink}.
+ */
+@PublicEvolving
+public interface RestClientFactory extends Serializable {
+
+    /** The REST client configuration. */
+    @PublicEvolving
+    interface RestClientConfig {
+        /**
+         * Gets the configured username.
+         *
+         * @return the configured username
+         */
+        @Nullable
+        String getUsername();
+
+        /**
+         * Gets the configured password.
+         *
+         * @return the configured password
+         */
+        @Nullable
+        String getPassword();
+
+        /**
+         * Gets the configured connection request timeout.
+         *
+         * @return the configured connection request timeout
+         */
+        @Nullable
+        Integer getConnectionRequestTimeout();
+
+        /**
+         * Gets the configured connection timeout.
+         *
+         * @return the configured connection timeout
+         */
+        @Nullable
+        Integer getConnectionTimeout();
+
+        /**
+         * Gets the configured socket timeout.
+         *
+         * @return the configured socket timeout
+         */
+        @Nullable
+        Integer getSocketTimeout();
+
+        /**
+         * Gets the configured connection path prefix.
+         *
+         * @return the configured connection path prefix
+         */
+        @Nullable
+        String getConnectionPathPrefix();
+
+        /**
+         * Returns if the insecure HTTPS connections are allowed or not 
(self-signed certificates,
+         * etc).
+         *
+         * @return if the insecure HTTPS connections are allowed or not
+         */
+        Optional<Boolean> isAllowInsecure();
+    }
+
+    /**
+     * Configures the rest client builder.
+     *
+     * @param restClientBuilder the configured REST client builder.
+     * @param clientConfig the client network configuration
+     */
+    void configureRestClientBuilder(
+            RestClientBuilder restClientBuilder, RestClientConfig 
clientConfig);
+}
diff --git 
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
 
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
index cba2ddc..9939313 100644
--- 
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
+++ 
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java
@@ -99,6 +99,18 @@ class OpensearchSinkBuilderTest {
                 .isInstanceOf(IllegalStateException.class);
     }
 
+    @Test
+    void testThrowIfRestClientFactoryNotSet() {
+        assertThatThrownBy(() -> 
createEmptyBuilder().setRestClientFactory(null).build())
+                .isInstanceOf(NullPointerException.class);
+    }
+
+    @Test
+    void testThrowIfConnectionPathPrefixNotSet() {
+        assertThatThrownBy(() -> 
createEmptyBuilder().setConnectionPathPrefix(null).build())
+                .isInstanceOf(NullPointerException.class);
+    }
+
     private OpensearchSinkBuilder<Object> createEmptyBuilder() {
         return new OpensearchSinkBuilder<>();
     }
diff --git 
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
 
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
index 11a17fc..deacd6c 100644
--- 
a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
+++ 
b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java
@@ -266,7 +266,8 @@ class OpensearchWriterITCase {
                         null,
                         true),
                 metricGroup,
-                new TestMailbox());
+                new TestMailbox(),
+                new DefaultRestClientFactory());
     }
 
     private static class UpdatingEmitter implements 
OpensearchEmitter<Tuple2<Integer, String>> {

Reply via email to