This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-elasticsearch.git
The following commit(s) were added to refs/heads/main by this push: new 5d1f8d0 [FLINK-34369][connectors/elasticsearch] Elasticsearch connector supports SSL context 5d1f8d0 is described below commit 5d1f8d03e3cff197ed7fe30b79951e44808b48fe Author: Mingliang Liu <lium...@apache.org> AuthorDate: Fri May 3 00:12:52 2024 -0700 [FLINK-34369][connectors/elasticsearch] Elasticsearch connector supports SSL context --- .../elasticsearch/sink/ElasticsearchSink.java | 5 ++ .../sink/ElasticsearchSinkBuilderBase.java | 61 +++++++++++++++++++++- .../elasticsearch/sink/ElasticsearchWriter.java | 52 ++++++++++++++---- .../elasticsearch/sink/NetworkClientConfig.java | 22 +++++++- .../sink/ElasticsearchSinkBuilderBaseTest.java | 14 ++++- .../sink/ElasticsearchWriterITCase.java | 2 +- 6 files changed, 142 insertions(+), 14 deletions(-) diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java index 05ac47a..19b8ed2 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSink.java @@ -104,4 +104,9 @@ public class ElasticsearchSink<IN> implements Sink<IN> { BulkResponseInspectorFactory getBulkResponseInspectorFactory() { return bulkResponseInspectorFactory; } + + @VisibleForTesting + NetworkClientConfig getNetworkClientConfig() { + return networkClientConfig; + } } diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java index 2904eff..402b16b 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java @@ -26,9 +26,18 @@ import org.apache.flink.connector.elasticsearch.sink.BulkResponseInspector.BulkR import org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector; import org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultFailureHandler; import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.function.SerializableSupplier; import org.apache.http.HttpHost; +import org.apache.http.conn.ssl.TrustAllStrategy; +import org.apache.http.ssl.SSLContexts; +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; + +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.List; @@ -60,6 +69,8 @@ public abstract class ElasticsearchSinkBuilderBase< private Integer connectionTimeout; private Integer connectionRequestTimeout; private Integer socketTimeout; + private SerializableSupplier<SSLContext> sslContextSupplier; + private SerializableSupplier<HostnameVerifier> hostnameVerifierSupplier; private FailureHandler failureHandler = new DefaultFailureHandler(); private BulkResponseInspectorFactory bulkResponseInspectorFactory; @@ -263,6 +274,51 @@ public abstract class ElasticsearchSinkBuilderBase< return self(); } + /** + * Allows to bypass the certificates chain validation and connect to insecure network endpoints + * (for example, servers which use self-signed certificates). + * + * @return this builder + */ + public B allowInsecure() { + this.sslContextSupplier = + () -> { + try { + return SSLContexts.custom() + .loadTrustMaterial(TrustAllStrategy.INSTANCE) + .build(); + } catch (final NoSuchAlgorithmException + | KeyStoreException + | KeyManagementException ex) { + throw new IllegalStateException("Unable to create custom SSL context", ex); + } + }; + return self(); + } + + /** + * Sets the supplier for getting an {@link SSLContext} instance. + * + * @param sslContextSupplier the serializable SSLContext supplier function + * @return this builder + */ + public B setSslContextSupplier(SerializableSupplier<SSLContext> sslContextSupplier) { + this.sslContextSupplier = checkNotNull(sslContextSupplier); + return self(); + } + + /** + * Sets the supplier for getting an SSL {@link HostnameVerifier} instance. + * + * @param sslHostnameVerifierSupplier the serializable hostname verifier supplier function + * @return this builder + */ + public B setSslHostnameVerifier( + SerializableSupplier<HostnameVerifier> sslHostnameVerifierSupplier) { + this.hostnameVerifierSupplier = sslHostnameVerifierSupplier; + return self(); + } + /** * Overrides the default {@link FailureHandler}. A custom failure handler can handle partial * failures gracefully. See {@link #bulkResponseInspectorFactory} for more extensive control. @@ -329,14 +385,15 @@ public abstract class ElasticsearchSinkBuilderBase< private NetworkClientConfig buildNetworkClientConfig() { checkArgument(!hosts.isEmpty(), "Hosts cannot be empty."); - return new NetworkClientConfig( username, password, connectionPathPrefix, connectionRequestTimeout, connectionTimeout, - socketTimeout); + socketTimeout, + sslContextSupplier, + hostnameVerifierSupplier); } private BulkProcessorConfig buildBulkProcessorConfig() { diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java index 8f84d87..c525c97 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java @@ -161,17 +161,32 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN> { if (networkClientConfig.getConnectionPathPrefix() != null) { builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix()); } - if (networkClientConfig.getPassword() != null - && networkClientConfig.getUsername() != null) { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials( - networkClientConfig.getUsername(), networkClientConfig.getPassword())); + + final CredentialsProvider credentialsProvider = getCredentialsProvider(networkClientConfig); + if (credentialsProvider != null + || networkClientConfig.getSSLContextSupplier() != null + || networkClientConfig.getSslHostnameVerifier() != null) { builder.setHttpClientConfigCallback( - httpClientBuilder -> - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + httpClientBuilder -> { + if (credentialsProvider != null) { + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } + + if (networkClientConfig.getSSLContextSupplier() != null) { + // client creates SSL context using the configured supplier + httpClientBuilder.setSSLContext( + networkClientConfig.getSSLContextSupplier().get()); + } + + if (networkClientConfig.getSslHostnameVerifier() != null) { + httpClientBuilder.setSSLHostnameVerifier( + networkClientConfig.getSslHostnameVerifier().get()); + } + + return httpClientBuilder; + }); } + if (networkClientConfig.getConnectionRequestTimeout() != null || networkClientConfig.getConnectionTimeout() != null || networkClientConfig.getSocketTimeout() != null) { @@ -195,6 +210,25 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN> { return builder; } + /** + * Get an http client credentials provider given network client config. + * + * <p>If network client config is not configured with username or password, return null. + */ + private static CredentialsProvider getCredentialsProvider( + NetworkClientConfig networkClientConfig) { + CredentialsProvider credentialsProvider = null; + if (networkClientConfig.getPassword() != null + && networkClientConfig.getUsername() != null) { + credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials( + networkClientConfig.getUsername(), networkClientConfig.getPassword())); + } + return credentialsProvider; + } + private BulkProcessor createBulkProcessor( BulkProcessorBuilderFactory bulkProcessorBuilderFactory, BulkProcessorConfig bulkProcessorConfig, diff --git a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java index 5ae0510..fb63ab3 100644 --- a/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java +++ b/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java @@ -18,7 +18,11 @@ package org.apache.flink.connector.elasticsearch.sink; +import org.apache.flink.util.function.SerializableSupplier; + import javax.annotation.Nullable; +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; import java.io.Serializable; @@ -30,6 +34,8 @@ class NetworkClientConfig implements Serializable { @Nullable private final Integer connectionRequestTimeout; @Nullable private final Integer connectionTimeout; @Nullable private final Integer socketTimeout; + @Nullable private final SerializableSupplier<SSLContext> sslContextSupplier; + @Nullable private final SerializableSupplier<HostnameVerifier> sslHostnameVerifier; NetworkClientConfig( @Nullable String username, @@ -37,13 +43,17 @@ class NetworkClientConfig implements Serializable { @Nullable String connectionPathPrefix, @Nullable Integer connectionRequestTimeout, @Nullable Integer connectionTimeout, - @Nullable Integer socketTimeout) { + @Nullable Integer socketTimeout, + @Nullable SerializableSupplier<SSLContext> sslContextSupplier, + @Nullable SerializableSupplier<HostnameVerifier> sslHostnameVerifier) { this.username = username; this.password = password; this.connectionPathPrefix = connectionPathPrefix; this.connectionRequestTimeout = connectionRequestTimeout; this.connectionTimeout = connectionTimeout; this.socketTimeout = socketTimeout; + this.sslContextSupplier = sslContextSupplier; + this.sslHostnameVerifier = sslHostnameVerifier; } @Nullable @@ -75,4 +85,14 @@ class NetworkClientConfig implements Serializable { public String getConnectionPathPrefix() { return connectionPathPrefix; } + + @Nullable + public SerializableSupplier<SSLContext> getSSLContextSupplier() { + return sslContextSupplier; + } + + @Nullable + public SerializableSupplier<HostnameVerifier> getSslHostnameVerifier() { + return sslHostnameVerifier; + } } diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java index 3799fbb..253de82 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java @@ -54,7 +54,8 @@ abstract class ElasticsearchSinkBuilderBaseTest<B extends ElasticsearchSinkBuild .setBulkFlushBackoffStrategy(FlushBackoffType.CONSTANT, 1, 1), createMinimalBuilder() .setConnectionUsername("username") - .setConnectionPassword("password")); + .setConnectionPassword("password"), + createMinimalBuilder().allowInsecure()); return DynamicTest.stream( validBuilders, @@ -68,6 +69,17 @@ abstract class ElasticsearchSinkBuilderBaseTest<B extends ElasticsearchSinkBuild .isEqualTo(DeliveryGuarantee.AT_LEAST_ONCE); } + @Test + void testAllowInsecureSetSslContextSupplier() { + assertThat( + createMinimalBuilder() + .allowInsecure() + .build() + .getNetworkClientConfig() + .getSSLContextSupplier()) + .isNotNull(); + } + @Test void testThrowIfExactlyOnceConfigured() { assertThatThrownBy( diff --git a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java index 0e12e97..248f6a8 100644 --- a/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java +++ b/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java @@ -327,7 +327,7 @@ class ElasticsearchWriterITCase { bulkProcessorConfig, new TestBulkProcessorBuilderFactory(), new DefaultBulkResponseInspector(), - new NetworkClientConfig(null, null, null, null, null, null), + new NetworkClientConfig(null, null, null, null, null, null, null, null), metricGroup, new TestMailbox()); }