This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c707401 [FLINK-21068][connectors/elasticsearch] Support 'connection.request-timeout','connection.timeout', 'socket.timeout' options for elasticsearch connector. c707401 is described below commit c70740179f4e89139407fabafc808a4350d5c44b Author: jinfeng <jinfeng1...@gmail.com> AuthorDate: Mon Dec 13 19:52:37 2021 +0800 [FLINK-21068][connectors/elasticsearch] Support 'connection.request-timeout','connection.timeout', 'socket.timeout' options for elasticsearch connector. --- .../docs/connectors/table/elasticsearch.md | 22 ++++++++++ .../content/docs/connectors/table/elasticsearch.md | 31 ++++++++++++++ .../sink/ElasticsearchSinkBuilderBase.java | 49 +++++++++++++++++++++- .../elasticsearch/sink/ElasticsearchWriter.java | 20 +++++++++ .../elasticsearch/sink/NetworkClientConfig.java | 26 +++++++++++- .../table/ElasticsearchConfiguration.java | 15 +++++++ .../table/ElasticsearchConnectorOptions.java | 21 ++++++++++ .../table/ElasticsearchDynamicSink.java | 13 ++++++ .../table/ElasticsearchDynamicSinkFactoryBase.java | 6 +++ .../sink/ElasticsearchSinkBuilderBaseTest.java | 13 ++++++ .../sink/ElasticsearchWriterITCase.java | 2 +- 11 files changed, 215 insertions(+), 3 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/elasticsearch.md b/docs/content.zh/docs/connectors/table/elasticsearch.md index 6bafab6..061ca27 100644 --- a/docs/content.zh/docs/connectors/table/elasticsearch.md +++ b/docs/content.zh/docs/connectors/table/elasticsearch.md @@ -218,6 +218,28 @@ CREATE TABLE myUserTable ( <td>添加到每个 REST 通信中的前缀字符串,例如,<code>'/v1'</code>。</td> </tr> <tr> + <td><h5>connection.request-timeout</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>Duration</td> + <td>从连接管理器请求连接的超时时间。超时时间必须大于或者等于 0,如果设置为 0 则是无限超时。</td> + </tr> + <tr> + <td><h5>connection.timeout</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>Duration</td> + <td>建立请求的超时时间 。超时时间必须大于或者等于 0 ,如果设置为 0 则是无限超时。</td> + </tr> + <tr> + <td><h5>socket.timeout</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>Duration</td> + <td>等待数据的 socket 的超时时间 (SO_TIMEOUT)。超时时间必须大于或者等于 0,如果设置为 0 则是无限超时。 + </td> + </tr> + <tr> <td><h5>format</h5></td> <td>可选</td> <td style="word-wrap: break-word;">json</td> diff --git a/docs/content/docs/connectors/table/elasticsearch.md b/docs/content/docs/connectors/table/elasticsearch.md index a9c7b83..0879f41 100644 --- a/docs/content/docs/connectors/table/elasticsearch.md +++ b/docs/content/docs/connectors/table/elasticsearch.md @@ -198,6 +198,37 @@ Connector Options <td>Prefix string to be added to every REST communication, e.g., <code>'/v1'</code>.</td> </tr> <tr> + <td><h5>connection.request-timeout</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>Duration</td> + <td>The timeout in milliseconds for requesting a connection from the connection manager. + The timeout must be larger than or equal to 0. + A timeout value of zero is interpreted as an infinite timeout. + </td> + </tr> + <tr> + <td><h5>connection.timeout</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>Duration</td> + <td>The timeout in milliseconds for establishing a connection. + The timeout must be larger than or equal to 0. + A timeout value of zero is interpreted as an infinite timeout. + </td> + </tr> + <tr> + <td><h5>socket.timeout</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">(none)</td> + <td>Duration</td> + <td>The socket timeout (SO_TIMEOUT) for waiting for data or, put differently, + a maximum period inactivity between two consecutive data packets. + The timeout must be larger than or equal to 0. + A timeout value of zero is interpreted as an infinite timeout. + </td> + </tr> + <tr> <td><h5>format</h5></td> <td>optional</td> <td style="word-wrap: break-word;">json</td> diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java index f187cb6..fe64c94 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase.java @@ -54,6 +54,9 @@ public abstract class ElasticsearchSinkBuilderBase< private String username; private String password; private String connectionPathPrefix; + private Integer connectionTimeout; + private Integer connectionRequestTimeout; + private Integer socketTimeout; protected ElasticsearchSinkBuilderBase() {} @@ -217,6 +220,44 @@ public abstract class ElasticsearchSinkBuilderBase< return self(); } + /** + * Sets the timeout for requesting the connection of the Elasticsearch cluster from the + * connection manager. + * + * @param timeout for the connection request + * @return this builder + */ + public B setConnectionRequestTimeout(int timeout) { + checkState(timeout >= 0, "Connection request timeout must be larger than or equal to 0."); + this.connectionRequestTimeout = timeout; + return self(); + } + + /** + * Sets the timeout for establishing a connection of the Elasticsearch cluster. + * + * @param timeout for the connection + * @return this builder + */ + public B setConnectionTimeout(int timeout) { + checkState(timeout >= 0, "Connection timeout must be larger than or equal to 0."); + this.connectionTimeout = timeout; + return self(); + } + + /** + * Sets the timeout for waiting for data or, put differently, a maximum period inactivity + * between two consecutive data packets. + * + * @param timeout for the socket + * @return this builder + */ + public B setSocketTimeout(int timeout) { + checkState(timeout >= 0, "Socket timeout must be larger than or equal to 0."); + this.socketTimeout = timeout; + return self(); + } + protected abstract BulkProcessorBuilderFactory getBulkProcessorBuilderFactory(); /** @@ -247,7 +288,13 @@ public abstract class ElasticsearchSinkBuilderBase< private NetworkClientConfig buildNetworkClientConfig() { checkArgument(!hosts.isEmpty(), "Hosts cannot be empty."); - return new NetworkClientConfig(username, password, connectionPathPrefix); + return new NetworkClientConfig( + username, + password, + connectionPathPrefix, + connectionRequestTimeout, + connectionTimeout, + socketTimeout); } private BulkProcessorConfig buildBulkProcessorConfig() { diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java index f984312..53723bc 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriter.java @@ -169,6 +169,26 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN, Void, Void> { httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); } + if (networkClientConfig.getConnectionRequestTimeout() != null + || networkClientConfig.getConnectionTimeout() != null + || networkClientConfig.getSocketTimeout() != null) { + builder.setRequestConfigCallback( + requestConfigBuilder -> { + if (networkClientConfig.getConnectionRequestTimeout() != null) { + requestConfigBuilder.setConnectionRequestTimeout( + networkClientConfig.getConnectionRequestTimeout()); + } + if (networkClientConfig.getConnectionTimeout() != null) { + requestConfigBuilder.setConnectTimeout( + networkClientConfig.getConnectionTimeout()); + } + if (networkClientConfig.getSocketTimeout() != null) { + requestConfigBuilder.setSocketTimeout( + networkClientConfig.getSocketTimeout()); + } + return requestConfigBuilder; + }); + } return builder; } diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java index e093008..5ae0510 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkClientConfig.java @@ -27,14 +27,23 @@ class NetworkClientConfig implements Serializable { @Nullable private final String username; @Nullable private final String password; @Nullable private final String connectionPathPrefix; + @Nullable private final Integer connectionRequestTimeout; + @Nullable private final Integer connectionTimeout; + @Nullable private final Integer socketTimeout; NetworkClientConfig( @Nullable String username, @Nullable String password, - @Nullable String connectionPathPrefix) { + @Nullable String connectionPathPrefix, + @Nullable Integer connectionRequestTimeout, + @Nullable Integer connectionTimeout, + @Nullable Integer socketTimeout) { this.username = username; this.password = password; this.connectionPathPrefix = connectionPathPrefix; + this.connectionRequestTimeout = connectionRequestTimeout; + this.connectionTimeout = connectionTimeout; + this.socketTimeout = socketTimeout; } @Nullable @@ -48,6 +57,21 @@ class NetworkClientConfig implements Serializable { } @Nullable + public Integer getConnectionRequestTimeout() { + return connectionRequestTimeout; + } + + @Nullable + public Integer getConnectionTimeout() { + return connectionTimeout; + } + + @Nullable + public Integer getSocketTimeout() { + return socketTimeout; + } + + @Nullable public String getConnectionPathPrefix() { return connectionPathPrefix; } diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java index ee6488e..e684b01 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConfiguration.java @@ -39,11 +39,14 @@ import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnec import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_TIMEOUT; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.DELIVERY_GUARANTEE_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -104,6 +107,18 @@ class ElasticsearchConfiguration { return config.getOptional(CONNECTION_PATH_PREFIX_OPTION); } + public Optional<Duration> getConnectionRequestTimeout() { + return config.getOptional(CONNECTION_REQUEST_TIMEOUT); + } + + public Optional<Duration> getConnectionTimeout() { + return config.getOptional(CONNECTION_TIMEOUT); + } + + public Optional<Duration> getSocketTimeout() { + return config.getOptional(SOCKET_TIMEOUT); + } + public List<HttpHost> getHosts() { return config.get(HOSTS_OPTION).stream() .map(ElasticsearchConfiguration::validateAndParseHostsString) diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java index 5cff4ee..672f0727 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchConnectorOptions.java @@ -111,6 +111,27 @@ public class ElasticsearchConnectorOptions { .noDefaultValue() .withDescription("Prefix string to be added to every REST communication."); + public static final ConfigOption<Duration> CONNECTION_REQUEST_TIMEOUT = + ConfigOptions.key("connection.request-timeout") + .durationType() + .noDefaultValue() + .withDescription( + "The timeout for requesting a connection from the connection manager."); + + public static final ConfigOption<Duration> CONNECTION_TIMEOUT = + ConfigOptions.key("connection.timeout") + .durationType() + .noDefaultValue() + .withDescription("The timeout for establishing a connection."); + + public static final ConfigOption<Duration> SOCKET_TIMEOUT = + ConfigOptions.key("socket.timeout") + .durationType() + .noDefaultValue() + .withDescription( + "The socket timeout (SO_TIMEOUT) for waiting for data or, put differently," + + "a maximum period inactivity between two consecutive data packets."); + public static final ConfigOption<String> FORMAT_OPTION = ConfigOptions.key("format") .stringType() diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java index e5f94f1..0938e02 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java @@ -144,6 +144,19 @@ class ElasticsearchDynamicSink implements DynamicTableSink { builder.setConnectionPathPrefix(config.getPathPrefix().get()); } + if (config.getConnectionRequestTimeout().isPresent()) { + builder.setConnectionRequestTimeout( + (int) config.getConnectionRequestTimeout().get().getSeconds()); + } + + if (config.getConnectionTimeout().isPresent()) { + builder.setConnectionTimeout((int) config.getConnectionTimeout().get().getSeconds()); + } + + if (config.getSocketTimeout().isPresent()) { + builder.setSocketTimeout((int) config.getSocketTimeout().get().getSeconds()); + } + return SinkProvider.of(builder.build()); } diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java index e124ad6..8028284 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java @@ -54,12 +54,15 @@ import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnec import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_ACTIONS_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.BULK_FLUSH_MAX_SIZE_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_REQUEST_TIMEOUT; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.CONNECTION_TIMEOUT; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.DELIVERY_GUARANTEE_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.FORMAT_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION; +import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT; import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.elasticsearch.common.Strings.capitalize; @@ -209,6 +212,9 @@ abstract class ElasticsearchDynamicSinkFactoryBase implements DynamicTableSinkFa BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION, BULK_FLUSH_BACKOFF_DELAY_OPTION, CONNECTION_PATH_PREFIX_OPTION, + CONNECTION_REQUEST_TIMEOUT, + CONNECTION_TIMEOUT, + SOCKET_TIMEOUT, FORMAT_OPTION, DELIVERY_GUARANTEE_OPTION, PASSWORD_OPTION, diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java index e65d844..83f7871 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBaseTest.java @@ -77,6 +77,19 @@ abstract class ElasticsearchSinkBuilderBaseTest<B extends ElasticsearchSinkBuild () -> createEmptyBuilder().setHosts(new HttpHost("localhost:3000")).build()); } + @Test + void testThrowIfSetInvalidTimeouts() { + assertThrows( + IllegalStateException.class, + () -> createEmptyBuilder().setConnectionRequestTimeout(-1).build()); + assertThrows( + IllegalStateException.class, + () -> createEmptyBuilder().setConnectionTimeout(-1).build()); + assertThrows( + IllegalStateException.class, + () -> createEmptyBuilder().setSocketTimeout(-1).build()); + } + abstract B createEmptyBuilder(); abstract B createMinimalBuilder(); diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java index 241ef6a..222f8b3 100644 --- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java +++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java @@ -248,7 +248,7 @@ class ElasticsearchWriterITCase { flushOnCheckpoint, bulkProcessorConfig, new TestBulkProcessorBuilderFactory(), - new NetworkClientConfig(null, null, null), + new NetworkClientConfig(null, null, null, null, null, null), metricGroup, new TestMailbox()); }