This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c707401  [FLINK-21068][connectors/elasticsearch] Support 
'connection.request-timeout','connection.timeout', 'socket.timeout' options for 
elasticsearch connector.
c707401 is described below

commit c70740179f4e89139407fabafc808a4350d5c44b
Author: jinfeng <jinfeng1...@gmail.com>
AuthorDate: Mon Dec 13 19:52:37 2021 +0800

    [FLINK-21068][connectors/elasticsearch] Support 
'connection.request-timeout','connection.timeout', 'socket.timeout' options for 
elasticsearch connector.
---
 .../docs/connectors/table/elasticsearch.md         | 22 ++++++++++
 .../content/docs/connectors/table/elasticsearch.md | 31 ++++++++++++++
 .../sink/ElasticsearchSinkBuilderBase.java         | 49 +++++++++++++++++++++-
 .../elasticsearch/sink/ElasticsearchWriter.java    | 20 +++++++++
 .../elasticsearch/sink/NetworkClientConfig.java    | 26 +++++++++++-
 .../table/ElasticsearchConfiguration.java          | 15 +++++++
 .../table/ElasticsearchConnectorOptions.java       | 21 ++++++++++
 .../table/ElasticsearchDynamicSink.java            | 13 ++++++
 .../table/ElasticsearchDynamicSinkFactoryBase.java |  6 +++
 .../sink/ElasticsearchSinkBuilderBaseTest.java     | 13 ++++++
 .../sink/ElasticsearchWriterITCase.java            |  2 +-
 11 files changed, 215 insertions(+), 3 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/elasticsearch.md 
b/docs/content.zh/docs/connectors/table/elasticsearch.md
index 6bafab6..061ca27 100644
--- a/docs/content.zh/docs/connectors/table/elasticsearch.md
+++ b/docs/content.zh/docs/connectors/table/elasticsearch.md
@@ -218,6 +218,28 @@ CREATE TABLE myUserTable (
       <td>添加到每个 REST 通信中的前缀字符串,例如,<code>'/v1'</code>。</td>
     </tr>
     <tr>
+      <td><h5>connection.request-timeout</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>从连接管理器请求连接的超时时间。超时时间必须大于或者等于 0,如果设置为 0 则是无限超时。</td>
+    </tr>
+    <tr>
+      <td><h5>connection.timeout</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>建立请求的超时时间 。超时时间必须大于或者等于 0 ,如果设置为 0 则是无限超时。</td>
+    </tr>
+    <tr>
+      <td><h5>socket.timeout</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>等待数据的 socket 的超时时间 (SO_TIMEOUT)。超时时间必须大于或者等于 0,如果设置为 0 则是无限超时。
+      </td>
+    </tr>
+    <tr>
       <td><h5>format</h5></td>
       <td>可选</td>
       <td style="word-wrap: break-word;">json</td>
diff --git a/docs/content/docs/connectors/table/elasticsearch.md 
b/docs/content/docs/connectors/table/elasticsearch.md
index a9c7b83..0879f41 100644
--- a/docs/content/docs/connectors/table/elasticsearch.md
+++ b/docs/content/docs/connectors/table/elasticsearch.md
@@ -198,6 +198,37 @@ Connector Options
       <td>Prefix string to be added to every REST communication, e.g., 
<code>'/v1'</code>.</td>
     </tr>
     <tr>
+      <td><h5>connection.request-timeout</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>The timeout in milliseconds for requesting a connection from the 
connection manager.
+        The timeout must be larger than or equal to 0.
+        A timeout value of zero is interpreted as an infinite timeout.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>connection.timeout</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>The timeout in milliseconds for establishing a connection.
+        The timeout must be larger than or equal to 0.
+        A timeout value of zero is interpreted as an infinite timeout.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>socket.timeout</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>The socket timeout (SO_TIMEOUT) for waiting for data or, put 
differently,
+        a maximum period inactivity between two consecutive data packets.
+        The timeout must be larger than or equal to 0.
+        A timeout value of zero is interpreted as an infinite timeout.
+      </td>
+    </tr>
+    <tr>
       <td><h5>format</h5></td>
       <td>optional</td>
       <td style="word-wrap: break-word;">json</td>
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
index f187cb6..fe64c94 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java
@@ -54,6 +54,9 @@ public abstract class ElasticsearchSinkBuilderBase<
     private String username;
     private String password;
     private String connectionPathPrefix;
+    private Integer connectionTimeout;
+    private Integer connectionRequestTimeout;
+    private Integer socketTimeout;
 
     protected ElasticsearchSinkBuilderBase() {}
 
@@ -217,6 +220,44 @@ public abstract class ElasticsearchSinkBuilderBase<
         return self();
     }
 
+    /**
+     * Sets the timeout for requesting the connection of the Elasticsearch 
cluster from the
+     * connection manager.
+     *
+     * @param timeout for the connection request
+     * @return this builder
+     */
+    public B setConnectionRequestTimeout(int timeout) {
+        checkState(timeout >= 0, "Connection request timeout must be larger 
than or equal to 0.");
+        this.connectionRequestTimeout = timeout;
+        return self();
+    }
+
+    /**
+     * Sets the timeout for establishing a connection of the Elasticsearch 
cluster.
+     *
+     * @param timeout for the connection
+     * @return this builder
+     */
+    public B setConnectionTimeout(int timeout) {
+        checkState(timeout >= 0, "Connection timeout must be larger than or 
equal to 0.");
+        this.connectionTimeout = timeout;
+        return self();
+    }
+
+    /**
+     * Sets the timeout for waiting for data or, put differently, a maximum 
period inactivity
+     * between two consecutive data packets.
+     *
+     * @param timeout for the socket
+     * @return this builder
+     */
+    public B setSocketTimeout(int timeout) {
+        checkState(timeout >= 0, "Socket timeout must be larger than or equal 
to 0.");
+        this.socketTimeout = timeout;
+        return self();
+    }
+
     protected abstract BulkProcessorBuilderFactory 
getBulkProcessorBuilderFactory();
 
     /**
@@ -247,7 +288,13 @@ public abstract class ElasticsearchSinkBuilderBase<
     private NetworkClientConfig buildNetworkClientConfig() {
         checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
 
-        return new NetworkClientConfig(username, password, 
connectionPathPrefix);
+        return new NetworkClientConfig(
+                username,
+                password,
+                connectionPathPrefix,
+                connectionRequestTimeout,
+                connectionTimeout,
+                socketTimeout);
     }
 
     private BulkProcessorConfig buildBulkProcessorConfig() {
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
index f984312..53723bc 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java
@@ -169,6 +169,26 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN, 
Void, Void> {
                     httpClientBuilder ->
                             
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
         }
+        if (networkClientConfig.getConnectionRequestTimeout() != null
+                || networkClientConfig.getConnectionTimeout() != null
+                || networkClientConfig.getSocketTimeout() != null) {
+            builder.setRequestConfigCallback(
+                    requestConfigBuilder -> {
+                        if (networkClientConfig.getConnectionRequestTimeout() 
!= null) {
+                            requestConfigBuilder.setConnectionRequestTimeout(
+                                    
networkClientConfig.getConnectionRequestTimeout());
+                        }
+                        if (networkClientConfig.getConnectionTimeout() != 
null) {
+                            requestConfigBuilder.setConnectTimeout(
+                                    
networkClientConfig.getConnectionTimeout());
+                        }
+                        if (networkClientConfig.getSocketTimeout() != null) {
+                            requestConfigBuilder.setSocketTimeout(
+                                    networkClientConfig.getSocketTimeout());
+                        }
+                        return requestConfigBuilder;
+                    });
+        }
         return builder;
     }
 
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java
index e093008..5ae0510 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java
@@ -27,14 +27,23 @@ class NetworkClientConfig implements Serializable {
     @Nullable private final String username;
     @Nullable private final String password;
     @Nullable private final String connectionPathPrefix;
+    @Nullable private final Integer connectionRequestTimeout;
+    @Nullable private final Integer connectionTimeout;
+    @Nullable private final Integer socketTimeout;
 
     NetworkClientConfig(
             @Nullable String username,
             @Nullable String password,
-            @Nullable String connectionPathPrefix) {
+            @Nullable String connectionPathPrefix,
+            @Nullable Integer connectionRequestTimeout,
+            @Nullable Integer connectionTimeout,
+            @Nullable Integer socketTimeout) {
         this.username = username;
         this.password = password;
         this.connectionPathPrefix = connectionPathPrefix;
+        this.connectionRequestTimeout = connectionRequestTimeout;
+        this.connectionTimeout = connectionTimeout;
+        this.socketTimeout = socketTimeout;
     }
 
     @Nullable
@@ -48,6 +57,21 @@ class NetworkClientConfig implements Serializable {
     }
 
     @Nullable
+    public Integer getConnectionRequestTimeout() {
+        return connectionRequestTimeout;
+    }
+
+    @Nullable
+    public Integer getConnectionTimeout() {
+        return connectionTimeout;
+    }
+
+    @Nullable
+    public Integer getSocketTimeout() {
+        return socketTimeout;
+    }
+
+    @Nullable
     public String getConnectionPathPrefix() {
         return connectionPathPrefix;
     }
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
index ee6488e..e684b01 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java
@@ -39,11 +39,14 @@ import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnec
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_TIMEOUT;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.DELIVERY_GUARANTEE_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -104,6 +107,18 @@ class ElasticsearchConfiguration {
         return config.getOptional(CONNECTION_PATH_PREFIX_OPTION);
     }
 
+    public Optional<Duration> getConnectionRequestTimeout() {
+        return config.getOptional(CONNECTION_REQUEST_TIMEOUT);
+    }
+
+    public Optional<Duration> getConnectionTimeout() {
+        return config.getOptional(CONNECTION_TIMEOUT);
+    }
+
+    public Optional<Duration> getSocketTimeout() {
+        return config.getOptional(SOCKET_TIMEOUT);
+    }
+
     public List<HttpHost> getHosts() {
         return config.get(HOSTS_OPTION).stream()
                 .map(ElasticsearchConfiguration::validateAndParseHostsString)
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
index 5cff4ee..672f0727 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java
@@ -111,6 +111,27 @@ public class ElasticsearchConnectorOptions {
                     .noDefaultValue()
                     .withDescription("Prefix string to be added to every REST 
communication.");
 
+    public static final ConfigOption<Duration> CONNECTION_REQUEST_TIMEOUT =
+            ConfigOptions.key("connection.request-timeout")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The timeout for requesting a connection from the 
connection manager.");
+
+    public static final ConfigOption<Duration> CONNECTION_TIMEOUT =
+            ConfigOptions.key("connection.timeout")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription("The timeout for establishing a 
connection.");
+
+    public static final ConfigOption<Duration> SOCKET_TIMEOUT =
+            ConfigOptions.key("socket.timeout")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The socket timeout (SO_TIMEOUT) for waiting for 
data or, put differently,"
+                                    + "a maximum period inactivity between two 
consecutive data packets.");
+
     public static final ConfigOption<String> FORMAT_OPTION =
             ConfigOptions.key("format")
                     .stringType()
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
index e5f94f1..0938e02 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java
@@ -144,6 +144,19 @@ class ElasticsearchDynamicSink implements DynamicTableSink 
{
             builder.setConnectionPathPrefix(config.getPathPrefix().get());
         }
 
+        if (config.getConnectionRequestTimeout().isPresent()) {
+            builder.setConnectionRequestTimeout(
+                    (int) 
config.getConnectionRequestTimeout().get().getSeconds());
+        }
+
+        if (config.getConnectionTimeout().isPresent()) {
+            builder.setConnectionTimeout((int) 
config.getConnectionTimeout().get().getSeconds());
+        }
+
+        if (config.getSocketTimeout().isPresent()) {
+            builder.setSocketTimeout((int) 
config.getSocketTimeout().get().getSeconds());
+        }
+
         return SinkProvider.of(builder.build());
     }
 
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java
index e124ad6..8028284 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java
@@ -54,12 +54,15 @@ import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnec
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_TIMEOUT;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.DELIVERY_GUARANTEE_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.FORMAT_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
+import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT;
 import static 
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.elasticsearch.common.Strings.capitalize;
@@ -209,6 +212,9 @@ abstract class ElasticsearchDynamicSinkFactoryBase 
implements DynamicTableSinkFa
                         BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION,
                         BULK_FLUSH_BACKOFF_DELAY_OPTION,
                         CONNECTION_PATH_PREFIX_OPTION,
+                        CONNECTION_REQUEST_TIMEOUT,
+                        CONNECTION_TIMEOUT,
+                        SOCKET_TIMEOUT,
                         FORMAT_OPTION,
                         DELIVERY_GUARANTEE_OPTION,
                         PASSWORD_OPTION,
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
index e65d844..83f7871 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java
@@ -77,6 +77,19 @@ abstract class ElasticsearchSinkBuilderBaseTest<B extends 
ElasticsearchSinkBuild
                 () -> createEmptyBuilder().setHosts(new 
HttpHost("localhost:3000")).build());
     }
 
+    @Test
+    void testThrowIfSetInvalidTimeouts() {
+        assertThrows(
+                IllegalStateException.class,
+                () -> 
createEmptyBuilder().setConnectionRequestTimeout(-1).build());
+        assertThrows(
+                IllegalStateException.class,
+                () -> createEmptyBuilder().setConnectionTimeout(-1).build());
+        assertThrows(
+                IllegalStateException.class,
+                () -> createEmptyBuilder().setSocketTimeout(-1).build());
+    }
+
     abstract B createEmptyBuilder();
 
     abstract B createMinimalBuilder();
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
index 241ef6a..222f8b3 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java
@@ -248,7 +248,7 @@ class ElasticsearchWriterITCase {
                 flushOnCheckpoint,
                 bulkProcessorConfig,
                 new TestBulkProcessorBuilderFactory(),
-                new NetworkClientConfig(null, null, null),
+                new NetworkClientConfig(null, null, null, null, null, null),
                 metricGroup,
                 new TestMailbox());
     }

Reply via email to