This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d1f8d0  [FLINK-34369][connectors/elasticsearch] Elasticsearch 
connector supports SSL context
5d1f8d0 is described below

commit 5d1f8d03e3cff197ed7fe30b79951e44808b48fe
Author: Mingliang Liu <lium...@apache.org>
AuthorDate: Fri May 3 00:12:52 2024 -0700

    [FLINK-34369][connectors/elasticsearch] Elasticsearch connector supports 
SSL context
---
 .../elasticsearch/sink/ElasticsearchSink.java      |  5 ++
 .../sink/ElasticsearchSinkBuilderBase.java         | 61 +++++++++++++++++++++-
 .../elasticsearch/sink/ElasticsearchWriter.java    | 52 ++++++++++++++----
 .../elasticsearch/sink/NetworkClientConfig.java    | 22 +++++++-
 .../sink/ElasticsearchSinkBuilderBaseTest.java     | 14 ++++-
 .../sink/ElasticsearchWriterITCase.java            |  2 +-
 6 files changed, 142 insertions(+), 14 deletions(-)

diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
index 05ac47a..19b8ed2 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java
@@ -104,4 +104,9 @@ public class ElasticsearchSink<IN> implements Sink<IN> {
     BulkResponseInspectorFactory getBulkResponseInspectorFactory() {
         return bulkResponseInspectorFactory;
     }
+
+    @VisibleForTesting
+    NetworkClientConfig getNetworkClientConfig() {
+        return networkClientConfig;
+    }
 }
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
index 2904eff..402b16b 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
@@ -26,9 +26,18 @@ import 
org.apache.flink.connector.elasticsearch.sink.BulkResponseInspector.BulkR
 import 
org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector;
 import 
org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultFailureHandler;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.function.SerializableSupplier;
 
 import org.apache.http.HttpHost;
+import org.apache.http.conn.ssl.TrustAllStrategy;
+import org.apache.http.ssl.SSLContexts;
 
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.List;
 
@@ -60,6 +69,8 @@ public abstract class ElasticsearchSinkBuilderBase<
     private Integer connectionTimeout;
     private Integer connectionRequestTimeout;
     private Integer socketTimeout;
+    private SerializableSupplier<SSLContext> sslContextSupplier;
+    private SerializableSupplier<HostnameVerifier> hostnameVerifierSupplier;
     private FailureHandler failureHandler = new DefaultFailureHandler();
     private BulkResponseInspectorFactory bulkResponseInspectorFactory;
 
@@ -263,6 +274,51 @@ public abstract class ElasticsearchSinkBuilderBase<
         return self();
     }
 
+    /**
+     * Allows to bypass the certificates chain validation and connect to 
insecure network endpoints
+     * (for example, servers which use self-signed certificates).
+     *
+     * @return this builder
+     */
+    public B allowInsecure() {
+        this.sslContextSupplier =
+                () -> {
+                    try {
+                        return SSLContexts.custom()
+                                .loadTrustMaterial(TrustAllStrategy.INSTANCE)
+                                .build();
+                    } catch (final NoSuchAlgorithmException
+                            | KeyStoreException
+                            | KeyManagementException ex) {
+                        throw new IllegalStateException("Unable to create 
custom SSL context", ex);
+                    }
+                };
+        return self();
+    }
+
+    /**
+     * Sets the supplier for getting an {@link SSLContext} instance.
+     *
+     * @param sslContextSupplier the serializable SSLContext supplier function
+     * @return this builder
+     */
+    public B setSslContextSupplier(SerializableSupplier<SSLContext> 
sslContextSupplier) {
+        this.sslContextSupplier = checkNotNull(sslContextSupplier);
+        return self();
+    }
+
+    /**
+     * Sets the supplier for getting an SSL {@link HostnameVerifier} instance.
+     *
+     * @param sslHostnameVerifierSupplier the serializable hostname verifier 
supplier function
+     * @return this builder
+     */
+    public B setSslHostnameVerifier(
+            SerializableSupplier<HostnameVerifier> 
sslHostnameVerifierSupplier) {
+        this.hostnameVerifierSupplier = sslHostnameVerifierSupplier;
+        return self();
+    }
+
     /**
      * Overrides the default {@link FailureHandler}. A custom failure handler 
can handle partial
      * failures gracefully. See {@link #bulkResponseInspectorFactory} for more 
extensive control.
@@ -329,14 +385,15 @@ public abstract class ElasticsearchSinkBuilderBase<
 
     private NetworkClientConfig buildNetworkClientConfig() {
         checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
-
         return new NetworkClientConfig(
                 username,
                 password,
                 connectionPathPrefix,
                 connectionRequestTimeout,
                 connectionTimeout,
-                socketTimeout);
+                socketTimeout,
+                sslContextSupplier,
+                hostnameVerifierSupplier);
     }
 
     private BulkProcessorConfig buildBulkProcessorConfig() {
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
index 8f84d87..c525c97 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
@@ -161,17 +161,32 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN> {
         if (networkClientConfig.getConnectionPathPrefix() != null) {
             
builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix());
         }
-        if (networkClientConfig.getPassword() != null
-                && networkClientConfig.getUsername() != null) {
-            final CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
-            credentialsProvider.setCredentials(
-                    AuthScope.ANY,
-                    new UsernamePasswordCredentials(
-                            networkClientConfig.getUsername(), 
networkClientConfig.getPassword()));
+
+        final CredentialsProvider credentialsProvider = 
getCredentialsProvider(networkClientConfig);
+        if (credentialsProvider != null
+                || networkClientConfig.getSSLContextSupplier() != null
+                || networkClientConfig.getSslHostnameVerifier() != null) {
             builder.setHttpClientConfigCallback(
-                    httpClientBuilder ->
-                            
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+                    httpClientBuilder -> {
+                        if (credentialsProvider != null) {
+                            
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+                        }
+
+                        if (networkClientConfig.getSSLContextSupplier() != 
null) {
+                            // client creates SSL context using the configured 
supplier
+                            httpClientBuilder.setSSLContext(
+                                    
networkClientConfig.getSSLContextSupplier().get());
+                        }
+
+                        if (networkClientConfig.getSslHostnameVerifier() != 
null) {
+                            httpClientBuilder.setSSLHostnameVerifier(
+                                    
networkClientConfig.getSslHostnameVerifier().get());
+                        }
+
+                        return httpClientBuilder;
+                    });
         }
+
         if (networkClientConfig.getConnectionRequestTimeout() != null
                 || networkClientConfig.getConnectionTimeout() != null
                 || networkClientConfig.getSocketTimeout() != null) {
@@ -195,6 +210,25 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN> {
         return builder;
     }
 
+    /**
+     * Get an http client credentials provider given network client config.
+     *
+     * <p>If network client config is not configured with username or 
password, return null.
+     */
+    private static CredentialsProvider getCredentialsProvider(
+            NetworkClientConfig networkClientConfig) {
+        CredentialsProvider credentialsProvider = null;
+        if (networkClientConfig.getPassword() != null
+                && networkClientConfig.getUsername() != null) {
+            credentialsProvider = new BasicCredentialsProvider();
+            credentialsProvider.setCredentials(
+                    AuthScope.ANY,
+                    new UsernamePasswordCredentials(
+                            networkClientConfig.getUsername(), 
networkClientConfig.getPassword()));
+        }
+        return credentialsProvider;
+    }
+
     private BulkProcessor createBulkProcessor(
             BulkProcessorBuilderFactory bulkProcessorBuilderFactory,
             BulkProcessorConfig bulkProcessorConfig,
diff --git 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java
 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java
index 5ae0510..fb63ab3 100644
--- 
a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java
+++ 
b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java
@@ -18,7 +18,11 @@
 
 package org.apache.flink.connector.elasticsearch.sink;
 
+import org.apache.flink.util.function.SerializableSupplier;
+
 import javax.annotation.Nullable;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
 
 import java.io.Serializable;
 
@@ -30,6 +34,8 @@ class NetworkClientConfig implements Serializable {
     @Nullable private final Integer connectionRequestTimeout;
     @Nullable private final Integer connectionTimeout;
     @Nullable private final Integer socketTimeout;
+    @Nullable private final SerializableSupplier<SSLContext> 
sslContextSupplier;
+    @Nullable private final SerializableSupplier<HostnameVerifier> 
sslHostnameVerifier;
 
     NetworkClientConfig(
             @Nullable String username,
@@ -37,13 +43,17 @@ class NetworkClientConfig implements Serializable {
             @Nullable String connectionPathPrefix,
             @Nullable Integer connectionRequestTimeout,
             @Nullable Integer connectionTimeout,
-            @Nullable Integer socketTimeout) {
+            @Nullable Integer socketTimeout,
+            @Nullable SerializableSupplier<SSLContext> sslContextSupplier,
+            @Nullable SerializableSupplier<HostnameVerifier> 
sslHostnameVerifier) {
         this.username = username;
         this.password = password;
         this.connectionPathPrefix = connectionPathPrefix;
         this.connectionRequestTimeout = connectionRequestTimeout;
         this.connectionTimeout = connectionTimeout;
         this.socketTimeout = socketTimeout;
+        this.sslContextSupplier = sslContextSupplier;
+        this.sslHostnameVerifier = sslHostnameVerifier;
     }
 
     @Nullable
@@ -75,4 +85,14 @@ class NetworkClientConfig implements Serializable {
     public String getConnectionPathPrefix() {
         return connectionPathPrefix;
     }
+
+    @Nullable
+    public SerializableSupplier<SSLContext> getSSLContextSupplier() {
+        return sslContextSupplier;
+    }
+
+    @Nullable
+    public SerializableSupplier<HostnameVerifier> getSslHostnameVerifier() {
+        return sslHostnameVerifier;
+    }
 }
diff --git 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
index 3799fbb..253de82 100644
--- 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
+++ 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
@@ -54,7 +54,8 @@ abstract class ElasticsearchSinkBuilderBaseTest<B extends 
ElasticsearchSinkBuild
                                 
.setBulkFlushBackoffStrategy(FlushBackoffType.CONSTANT, 1, 1),
                         createMinimalBuilder()
                                 .setConnectionUsername("username")
-                                .setConnectionPassword("password"));
+                                .setConnectionPassword("password"),
+                        createMinimalBuilder().allowInsecure());
 
         return DynamicTest.stream(
                 validBuilders,
@@ -68,6 +69,17 @@ abstract class ElasticsearchSinkBuilderBaseTest<B extends 
ElasticsearchSinkBuild
                 .isEqualTo(DeliveryGuarantee.AT_LEAST_ONCE);
     }
 
+    @Test
+    void testAllowInsecureSetSslContextSupplier() {
+        assertThat(
+                        createMinimalBuilder()
+                                .allowInsecure()
+                                .build()
+                                .getNetworkClientConfig()
+                                .getSSLContextSupplier())
+                .isNotNull();
+    }
+
     @Test
     void testThrowIfExactlyOnceConfigured() {
         assertThatThrownBy(
diff --git 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
index 0e12e97..248f6a8 100644
--- 
a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
+++ 
b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
@@ -327,7 +327,7 @@ class ElasticsearchWriterITCase {
                 bulkProcessorConfig,
                 new TestBulkProcessorBuilderFactory(),
                 new DefaultBulkResponseInspector(),
-                new NetworkClientConfig(null, null, null, null, null, null),
+                new NetworkClientConfig(null, null, null, null, null, null, 
null, null),
                 metricGroup,
                 new TestMailbox());
     }

Reply via email to