hailin0 commented on code in PR #3997:
URL: 
https://github.com/apache/incubator-seatunnel/pull/3997#discussion_r1090613419


##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java:
##########
@@ -62,52 +69,115 @@ public class EsRestClient {
 
     private final RestClient restClient;
 
-    private final ObjectMapper mapper = new ObjectMapper();
-
     private EsRestClient(RestClient restClient) {
         this.restClient = restClient;
     }
 
     public static EsRestClient createInstance(Config pluginConfig) {
         List<String> hosts = 
pluginConfig.getStringList(EsClusterConnectionConfig.HOSTS.key());
-        String username = null;
-        String password = null;
+        Optional<String> username = Optional.empty();
+        Optional<String> password = Optional.empty();
         if (pluginConfig.hasPath(EsClusterConnectionConfig.USERNAME.key())) {
-            username = 
pluginConfig.getString(EsClusterConnectionConfig.USERNAME.key());
+            username = 
Optional.of(pluginConfig.getString(EsClusterConnectionConfig.USERNAME.key()));
             if 
(pluginConfig.hasPath(EsClusterConnectionConfig.PASSWORD.key())) {
-                password = 
pluginConfig.getString(EsClusterConnectionConfig.PASSWORD.key());
+                password = 
Optional.of(pluginConfig.getString(EsClusterConnectionConfig.PASSWORD.key()));
+            }
+        }
+        Optional<String> keystorePath = Optional.empty();
+        Optional<String> keystorePassword = Optional.empty();
+        Optional<String> truststorePath = Optional.empty();
+        Optional<String> truststorePassword = Optional.empty();
+        boolean tlsVerifyCertificate = 
EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE.defaultValue();
+        if 
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE.key())) {
+            tlsVerifyCertificate = 
pluginConfig.getBoolean(EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE.key());
+        }
+        if (tlsVerifyCertificate) {
+            if 
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_KEY_STORE_PATH.key())) {
+                keystorePath = 
Optional.of(pluginConfig.getString(EsClusterConnectionConfig.TLS_KEY_STORE_PATH.key()));
             }
+            if 
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD.key())) {
+                keystorePassword = 
Optional.of(pluginConfig.getString(EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD.key()));
+            }
+            if 
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_TRUST_STORE_PATH.key())) {
+                truststorePath = 
Optional.of(pluginConfig.getString(EsClusterConnectionConfig.TLS_TRUST_STORE_PATH.key()));
+            }
+            if 
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD.key()))
 {
+                truststorePassword = 
Optional.of(pluginConfig.getString(EsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD.key()));
+            }
+        }
+        boolean tlsVerifyHostnames = 
EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME.defaultValue();
+        if 
(pluginConfig.hasPath(EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME.key())) {
+            tlsVerifyHostnames = 
pluginConfig.getBoolean(EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME.key());
         }
-        return createInstance(hosts, username, password);
+        return createInstance(hosts, username, password, tlsVerifyCertificate, 
tlsVerifyHostnames,
+            keystorePath, keystorePassword, truststorePath, 
truststorePassword);
     }
 
-    public static EsRestClient createInstance(List<String> hosts, String 
username, String password) {
-        RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, 
username, password);
+    public static EsRestClient createInstance(List<String> hosts,
+                                              Optional<String> username,
+                                              Optional<String> password,
+                                              boolean tlsVerifyCertificate,
+                                              boolean tlsVerifyHostnames,
+                                              Optional<String> keystorePath,
+                                              Optional<String> 
keystorePassword,
+                                              Optional<String> truststorePath,
+                                              Optional<String> 
truststorePassword) {
+        RestClientBuilder restClientBuilder = getRestClientBuilder(
+            hosts, username, password, tlsVerifyCertificate, 
tlsVerifyHostnames,
+            keystorePath, keystorePassword, truststorePath, 
truststorePassword);
         return new EsRestClient(restClientBuilder.build());
     }
 
-    private static RestClientBuilder getRestClientBuilder(List<String> hosts, 
String username, String password) {
+    private static RestClientBuilder getRestClientBuilder(List<String> hosts,
+                                                          Optional<String> 
username,
+                                                          Optional<String> 
password,
+                                                          boolean 
tlsVerifyCertificate,
+                                                          boolean 
tlsVerifyHostnames,
+                                                          Optional<String> 
keystorePath,
+                                                          Optional<String> 
keystorePassword,
+                                                          Optional<String> 
truststorePath,
+                                                          Optional<String> 
truststorePassword) {
         HttpHost[] httpHosts = new HttpHost[hosts.size()];
         for (int i = 0; i < hosts.size(); i++) {
-            String[] hostInfo = hosts.get(i).replace("http://";, "").split(":");
-            httpHosts[i] = new HttpHost(hostInfo[0], 
Integer.parseInt(hostInfo[1]));
+            httpHosts[i] = HttpHost.create(hosts.get(i));
         }
 
-        RestClientBuilder builder = RestClient.builder(httpHosts)
+        RestClientBuilder restClientBuilder = RestClient.builder(httpHosts)
             .setRequestConfigCallback(requestConfigBuilder -> 
requestConfigBuilder
                 .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT)
                 .setSocketTimeout(SOCKET_TIMEOUT));
 
-        if (StringUtils.isNotEmpty(username)) {
-            CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
-            credentialsProvider.setCredentials(AuthScope.ANY, new 
UsernamePasswordCredentials(username, password));
-            builder.setHttpClientConfigCallback(httpAsyncClientBuilder -> 
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
-        }
-        return builder;
+        restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
+            if (username.isPresent()) {
+                CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
+                credentialsProvider.setCredentials(AuthScope.ANY,
+                    new UsernamePasswordCredentials(username.get(), 
password.get()));
+                
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+            }
+
+            try {
+                if (tlsVerifyCertificate) {
+                    Optional<SSLContext> sslContext = 
SSLUtils.buildSSLContext(keystorePath,
+                        keystorePassword, truststorePath, truststorePassword);
+                    sslContext.ifPresent(e -> 
httpClientBuilder.setSSLContext(e));
+                } else {
+                    SSLContext sslContext = SSLContexts.custom()
+                        .loadTrustMaterial(new TrustAllStrategy()).build();
+                    httpClientBuilder.setSSLContext(sslContext);
+                }
+                if (!tlsVerifyHostnames) {
+                    
httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            return httpClientBuilder;
+        });
+        return restClientBuilder;
     }
 
     public BulkResponse bulk(String requestBody) {
-        Request request = new Request("POST", "_bulk");
+        Request request = new Request("POST", "/_bulk");

Review Comment:
   compatible with aws opensearch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to