This is an automated email from the ASF dual-hosted git repository.
davidradl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-http.git
The following commit(s) were added to refs/heads/main by this push:
new 813fd5d [FLINK-39364][connector-http] Add request timeout config
option (#31)
813fd5d is described below
commit 813fd5d196ed6e2541f960a1f669486698a6ee82
Author: Feat Zhang <[email protected]>
AuthorDate: Wed Apr 29 21:51:34 2026 +0800
[FLINK-39364][connector-http] Add request timeout config option (#31)
* [connector-http] Add request timeout ConfigOption for sink and lookup
source
Add typed ConfigOption<Duration> definitions for the HTTP request timeout
configuration keys that were previously only available as raw string
constants
in HttpConnectorConfigConstants:
- Add HttpLookupConnectorOptions.SOURCE_LOOKUP_REQUEST_TIMEOUT
(key: http.source.lookup.request.timeout, default: 30s)
- Add HttpDynamicSinkConnectorOptions.SINK_REQUEST_TIMEOUT
(key: http.sink.request.timeout, default: 30s)
- Register both options in their respective DynamicTableFactory
optionalOptions() methods so they are properly validated by Flink
- Remove the TODO comment in HttpLookupTableSourceFactory
Add unit tests verifying that both options are accepted by their factories
without a ValidationException.
* [connector-http] Update docs to reflect Duration type for request timeout
options
Update documentation for http.source.lookup.request.timeout and
http.sink.request.timeout to accurately describe the Duration type
introduced in PR2 (aaef46e):
- Change description from 'in seconds' to 'as a Duration'
- Add examples: '30s', '1min'
- Update default value notation from '30 seconds' to '30s'
- Apply changes to both EN and ZH docs
---
docs/content.zh/docs/connectors/table/http.md | 6 +++---
docs/content/docs/connectors/table/http.md | 6 +++---
.../table/lookup/HttpLookupConnectorOptions.java | 16 +++++++++++++++
.../table/lookup/HttpLookupTableSourceFactory.java | 5 +++--
.../sink/HttpDynamicSinkConnectorOptions.java | 17 +++++++++++++++
.../table/sink/HttpDynamicTableSinkFactory.java | 2 ++
.../lookup/HttpLookupTableSourceFactoryTest.java | 22 ++++++++++++++++++++
.../sink/HttpDynamicTableSinkFactoryTest.java | 24 ++++++++++++++++++++++
8 files changed, 90 insertions(+), 8 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/http.md
b/docs/content.zh/docs/connectors/table/http.md
index 2ae6abe..8599e46 100644
--- a/docs/content.zh/docs/connectors/table/http.md
+++ b/docs/content.zh/docs/connectors/table/http.md
@@ -185,7 +185,7 @@ Note the options with the prefix _http_ are the HTTP
connector specific options,
| http.security.oidc.token.endpoint.url |
optional | OIDC `Token Endpoint` url, to which the token request will be issued
[...]
| http.security.oidc.token.expiry.reduction |
optional | OIDC tokens will be requested if the current time is later than the
cached token expiry time minus this value.
[...]
| http.source.lookup.continue-on-error |
optional | When true, the flow will continue on errors, returning row content.
When false (the default) the job ends on errors.
[...]
-| http.source.lookup.request.timeout |
optional | Sets HTTP request timeout in seconds. If not specified, the default
value of 30 seconds will be used.
[...]
+| http.source.lookup.request.timeout |
optional | Sets HTTP request timeout for the lookup source as a Duration (e.g.
`'30s'`, `'1min'`). Controls how long the HTTP client waits for a response
before timing out a single request. If not specified, the default value of
`30s` will be used.
[...]
| http.source.lookup.request.thread-pool.size |
optional | Sets the size of pool thread for HTTP lookup request processing.
Increasing this value would mean that more concurrent requests can be processed
in the same time. If not specified, the default value of 8 threads will be
used.
[...]
| http.source.lookup.response.thread-pool.size |
optional | Sets the size of pool thread for HTTP lookup response processing.
Increasing this value would mean that more concurrent requests can be processed
in the same time. If not specified, the default value of 4 threads will be
used.
[...]
| http.source.lookup.use-raw-authorization-header |
optional | If set to `'true'`, uses the raw value set for the `Authorization`
header, without transformation for Basic Authentication (base64, addition of
"Basic " prefix). If not specified, defaults to `'false'`.
[...]
@@ -452,7 +452,7 @@ an existing `Authorization` header specified in
configuration.
Lookup Source is guarded by two timeout timers. First one is specified by
Flink's AsyncIO operator that executes `AsyncTableFunction`.
The default value of this timer is set to 3 minutes and can be changed via
`table.exec.async-lookup.timeout`
[option](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-async-lookup-timeout).
-The second one is set per individual HTTP requests by HTTP client. Its default
value is set currently to 30 seconds and can be changed via
`http.source.lookup.request.timeout` option.
+The second one is set per individual HTTP requests by HTTP client. Its default
value is `30s` and can be changed via `http.source.lookup.request.timeout`
option (accepts a Duration value, e.g. `'30s'` or `'1min'`).
Flink's current implementation of `AsyncTableFunction` does not allow
specifying custom logic for handling Flink AsyncIO timeouts as it is for Java
API.
Because of that, if AsyncIO timer passes, Flink will throw TimeoutException
which will cause job restart.
@@ -543,7 +543,7 @@ another format name.
| http.security.cert.client | optional | Path to trusted
certificate that should be used by connector's HTTP client for mTLS
communication.
|
| http.security.key.client | optional | Path to trusted
private key that should be used by connector's HTTP client for mTLS
communication.
|
| http.security.cert.server.allowSelfSigned | optional | Accept untrusted
certificates for TLS communication.
|
-| http.sink.request.timeout | optional | Sets HTTP request
timeout in seconds. If not specified, the default value of 30 seconds will be
used.
|
+| http.sink.request.timeout | optional | Sets HTTP request
timeout for the HTTP sink as a Duration (e.g. `'30s'`, `'1min'`). If not
specified, the default value of `30s` will be used.
|
| http.sink.writer.thread-pool.size | optional | Sets the size of pool
thread for HTTP Sink request processing. Increasing this value would mean that
more concurrent requests can be processed in the same time. If not specified,
the default value of 1 thread will be used. |
| http.sink.writer.request.mode | optional | Sets the Http Sink
request submission mode. Two modes are available: `single` and `batch`.
Defaults to `batch` if not specified. |
| http.sink.request.batch.size | optional | Applicable only for
`http.sink.writer.request.mode = batch`. Sets number of individual
events/requests that will be submitted as one HTTP request by HTTP sink. The
default value is 500 which is same as HTTP Sink `maxBatchSize` |
diff --git a/docs/content/docs/connectors/table/http.md
b/docs/content/docs/connectors/table/http.md
index 2ae6abe..8599e46 100644
--- a/docs/content/docs/connectors/table/http.md
+++ b/docs/content/docs/connectors/table/http.md
@@ -185,7 +185,7 @@ Note the options with the prefix _http_ are the HTTP
connector specific options,
| http.security.oidc.token.endpoint.url |
optional | OIDC `Token Endpoint` url, to which the token request will be issued
[...]
| http.security.oidc.token.expiry.reduction |
optional | OIDC tokens will be requested if the current time is later than the
cached token expiry time minus this value.
[...]
| http.source.lookup.continue-on-error |
optional | When true, the flow will continue on errors, returning row content.
When false (the default) the job ends on errors.
[...]
-| http.source.lookup.request.timeout |
optional | Sets HTTP request timeout in seconds. If not specified, the default
value of 30 seconds will be used.
[...]
+| http.source.lookup.request.timeout |
optional | Sets HTTP request timeout for the lookup source as a Duration (e.g.
`'30s'`, `'1min'`). Controls how long the HTTP client waits for a response
before timing out a single request. If not specified, the default value of
`30s` will be used.
[...]
| http.source.lookup.request.thread-pool.size |
optional | Sets the size of pool thread for HTTP lookup request processing.
Increasing this value would mean that more concurrent requests can be processed
in the same time. If not specified, the default value of 8 threads will be
used.
[...]
| http.source.lookup.response.thread-pool.size |
optional | Sets the size of pool thread for HTTP lookup response processing.
Increasing this value would mean that more concurrent requests can be processed
in the same time. If not specified, the default value of 4 threads will be
used.
[...]
| http.source.lookup.use-raw-authorization-header |
optional | If set to `'true'`, uses the raw value set for the `Authorization`
header, without transformation for Basic Authentication (base64, addition of
"Basic " prefix). If not specified, defaults to `'false'`.
[...]
@@ -452,7 +452,7 @@ an existing `Authorization` header specified in
configuration.
Lookup Source is guarded by two timeout timers. First one is specified by
Flink's AsyncIO operator that executes `AsyncTableFunction`.
The default value of this timer is set to 3 minutes and can be changed via
`table.exec.async-lookup.timeout`
[option](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-async-lookup-timeout).
-The second one is set per individual HTTP requests by HTTP client. Its default
value is set currently to 30 seconds and can be changed via
`http.source.lookup.request.timeout` option.
+The second one is set per individual HTTP requests by HTTP client. Its default
value is `30s` and can be changed via `http.source.lookup.request.timeout`
option (accepts a Duration value, e.g. `'30s'` or `'1min'`).
Flink's current implementation of `AsyncTableFunction` does not allow
specifying custom logic for handling Flink AsyncIO timeouts as it is for Java
API.
Because of that, if AsyncIO timer passes, Flink will throw TimeoutException
which will cause job restart.
@@ -543,7 +543,7 @@ another format name.
| http.security.cert.client | optional | Path to trusted
certificate that should be used by connector's HTTP client for mTLS
communication.
|
| http.security.key.client | optional | Path to trusted
private key that should be used by connector's HTTP client for mTLS
communication.
|
| http.security.cert.server.allowSelfSigned | optional | Accept untrusted
certificates for TLS communication.
|
-| http.sink.request.timeout | optional | Sets HTTP request
timeout in seconds. If not specified, the default value of 30 seconds will be
used.
|
+| http.sink.request.timeout | optional | Sets HTTP request
timeout for the HTTP sink as a Duration (e.g. `'30s'`, `'1min'`). If not
specified, the default value of `30s` will be used.
|
| http.sink.writer.thread-pool.size | optional | Sets the size of pool
thread for HTTP Sink request processing. Increasing this value would mean that
more concurrent requests can be processed in the same time. If not specified,
the default value of 1 thread will be used. |
| http.sink.writer.request.mode | optional | Sets the Http Sink
request submission mode. Two modes are available: `single` and `batch`.
Defaults to `batch` if not specified. |
| http.sink.request.batch.size | optional | Applicable only for
`http.sink.writer.request.mode = batch`. Sets number of individual
events/requests that will be submitted as one HTTP request by HTTP sink. The
default value is 500 which is same as HTTP Sink `maxBatchSize` |
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
index e5e7d58..0a3843c 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupConnectorOptions.java
@@ -26,6 +26,7 @@ import java.time.Duration;
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.CONTINUE_ON_ERROR;
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.HTTP_LOGGING_LEVEL;
+import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.LOOKUP_HTTP_TIMEOUT_SECONDS;
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_USE_RAW;
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.OIDC_AUTH_TOKEN_ENDPOINT_URL;
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.OIDC_AUTH_TOKEN_EXPIRY_REDUCTION;
@@ -131,6 +132,21 @@ public class HttpLookupConnectorOptions {
.noDefaultValue()
.withDescription("Http client connection timeout.");
+ /**
+ * HTTP request timeout for lookup source. Controls how long the HTTP
client waits for a
+ * response before timing out a single request. If not set, defaults to 30
seconds (matching the
+ * default behaviour of the connector).
+ */
+ public static final ConfigOption<Duration> SOURCE_LOOKUP_REQUEST_TIMEOUT =
+ ConfigOptions.key(LOOKUP_HTTP_TIMEOUT_SECONDS)
+ .durationType()
+ .defaultValue(Duration.ofSeconds(30))
+ .withDescription(
+ "HTTP request timeout for lookup source. "
+ + "Controls how long the HTTP client waits
for a response "
+ + "before timing out a single request. "
+ + "Specified as a Duration, e.g. '30s' or
'1min'.");
+
public static final ConfigOption<Boolean> SOURCE_LOOKUP_CONTINUE_ON_ERROR =
ConfigOptions.key(CONTINUE_ON_ERROR)
.booleanType()
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
index 3a1e0e7..45d3b26 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactory.java
@@ -63,6 +63,7 @@ import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOp
import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_PROXY_PASSWORD;
import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_PROXY_PORT;
import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_PROXY_USERNAME;
+import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_REQUEST_TIMEOUT;
import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_INITIAL_BACKOFF;
import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MAX_BACKOFF;
import static
org.apache.flink.connector.http.table.lookup.HttpLookupConnectorOptions.SOURCE_LOOKUP_RETRY_EXPONENTIAL_DELAY_MULTIPLIER;
@@ -204,8 +205,8 @@ public class HttpLookupTableSourceFactory implements
DynamicTableSourceFactory {
SOURCE_LOOKUP_PROXY_PORT,
SOURCE_LOOKUP_PROXY_USERNAME,
SOURCE_LOOKUP_PROXY_PASSWORD,
- SOURCE_LOOKUP_CONNECTION_TIMEOUT // TODO: add request timeout
from properties
- );
+ SOURCE_LOOKUP_CONNECTION_TIMEOUT,
+ SOURCE_LOOKUP_REQUEST_TIMEOUT);
}
private HttpLookupConfig getHttpLookupOptions(Context context,
ReadableConfig readableConfig) {
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/sink/HttpDynamicSinkConnectorOptions.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/sink/HttpDynamicSinkConnectorOptions.java
index 368447c..ca10cd0 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/sink/HttpDynamicSinkConnectorOptions.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/sink/HttpDynamicSinkConnectorOptions.java
@@ -20,6 +20,9 @@ package org.apache.flink.connector.http.table.sink;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import java.time.Duration;
+
+import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.SINK_HTTP_TIMEOUT_SECONDS;
import static
org.apache.flink.connector.http.config.HttpConnectorConfigConstants.SINK_REQUEST_CALLBACK_IDENTIFIER;
/** Table API options for {@link HttpDynamicSink}. */
@@ -37,6 +40,20 @@ public class HttpDynamicSinkConnectorOptions {
.defaultValue("POST")
.withDescription("Method used for requests built from
SQL's INSERT.");
+ /**
+ * HTTP request timeout for sink. Controls how long the HTTP client waits
for a response before
+ * timing out a single request. Defaults to 30 seconds.
+ */
+ public static final ConfigOption<Duration> SINK_REQUEST_TIMEOUT =
+ ConfigOptions.key(SINK_HTTP_TIMEOUT_SECONDS)
+ .durationType()
+ .defaultValue(Duration.ofSeconds(30))
+ .withDescription(
+ "HTTP request timeout for sink. "
+ + "Controls how long the HTTP client waits
for a response "
+ + "before timing out a single request. "
+ + "Specified as a Duration, e.g. '30s' or
'1min'.");
+
public static final ConfigOption<String> REQUEST_CALLBACK_IDENTIFIER =
ConfigOptions.key(SINK_REQUEST_CALLBACK_IDENTIFIER)
.stringType()
diff --git
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/sink/HttpDynamicTableSinkFactory.java
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/sink/HttpDynamicTableSinkFactory.java
index d3ef903..28eefae 100644
---
a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/sink/HttpDynamicTableSinkFactory.java
+++
b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/sink/HttpDynamicTableSinkFactory.java
@@ -33,6 +33,7 @@ import java.util.Set;
import static
org.apache.flink.connector.http.table.sink.HttpDynamicSinkConnectorOptions.INSERT_METHOD;
import static
org.apache.flink.connector.http.table.sink.HttpDynamicSinkConnectorOptions.REQUEST_CALLBACK_IDENTIFIER;
+import static
org.apache.flink.connector.http.table.sink.HttpDynamicSinkConnectorOptions.SINK_REQUEST_TIMEOUT;
import static
org.apache.flink.connector.http.table.sink.HttpDynamicSinkConnectorOptions.URL;
/** Factory for creating {@link HttpDynamicSink}. */
@@ -95,6 +96,7 @@ public class HttpDynamicTableSinkFactory extends
AsyncDynamicTableSinkFactory {
public Set<ConfigOption<?>> optionalOptions() {
var options = super.optionalOptions();
options.add(INSERT_METHOD);
+ options.add(SINK_REQUEST_TIMEOUT);
options.add(REQUEST_CALLBACK_IDENTIFIER);
return options;
}
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactoryTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactoryTest.java
index 94b9b4a..a5f3558 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactoryTest.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceFactoryTest.java
@@ -118,6 +118,28 @@ public class HttpLookupTableSourceFactoryTest {
assertThat(source).isInstanceOf(HttpLookupTableSource.class);
}
+ @Test
+ void shouldAcceptWithRequestTimeout() {
+ Map<String, String> options =
+ getOptions(
+ Map.of(
+
HttpLookupConnectorOptions.SOURCE_LOOKUP_REQUEST_TIMEOUT.key(),
+ "60s"));
+ DynamicTableSource source = createTableSource(SCHEMA, options);
+ assertThat(source).isNotNull();
+ assertThat(source).isInstanceOf(HttpLookupTableSource.class);
+ }
+
+ @Test
+ void shouldUseDefaultRequestTimeoutWhenNotConfigured() {
+ // When no request timeout is configured, the source should still be
created
+ // successfully and the default of 30s applies at runtime.
+ Map<String, String> options = getMandatoryOptions();
+ DynamicTableSource source = createTableSource(SCHEMA, options);
+ assertThat(source).isNotNull();
+ assertThat(source).isInstanceOf(HttpLookupTableSource.class);
+ }
+
private Map<String, String> getMandatoryOptions() {
return Map.of(
"connector", "http",
diff --git
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/sink/HttpDynamicTableSinkFactoryTest.java
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/sink/HttpDynamicTableSinkFactoryTest.java
index 5a835a8..b6569e4 100644
---
a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/sink/HttpDynamicTableSinkFactoryTest.java
+++
b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/sink/HttpDynamicTableSinkFactoryTest.java
@@ -106,4 +106,28 @@ public class HttpDynamicTableSinkFactoryTest {
assertThatThrownBy(() -> tEnv.executeSql("INSERT INTO http VALUES
(1)").await())
.isInstanceOf(ValidationException.class);
}
+
+ @Test
+ public void requestTimeoutOptionTest() {
+ // Verify that http.sink.request.timeout is a valid recognized option
(no
+ // ValidationException)
+ final String withRequestTimeout =
+ String.format(
+ "CREATE TABLE httpTimeout (\n"
+ + " id bigint\n"
+ + ") with (\n"
+ + " 'connector' = '%s',\n"
+ + " 'url' = '%s',\n"
+ + " 'format' = 'json',\n"
+ + " '%s' = '60s'\n"
+ + ")",
+ HttpDynamicTableSinkFactory.IDENTIFIER,
+ "http://localhost/",
+
HttpDynamicSinkConnectorOptions.SINK_REQUEST_TIMEOUT.key());
+ tEnv.executeSql(withRequestTimeout);
+ // Should not throw ValidationException for the option itself
+ // (a RuntimeException for actual HTTP calls is fine in test)
+ assertThatThrownBy(() -> tEnv.executeSql("INSERT INTO httpTimeout
VALUES (1)").await())
+
.isNotInstanceOf(org.apache.flink.table.api.ValidationException.class);
+ }
}