Bharath Reddy Gunapati created FLINK-40072:
----------------------------------------------
Summary: request.timeout is declared as a Duration option but
parsed as integer seconds, crashing at query time on any unit-suffixed value
Key: FLINK-40072
URL: https://issues.apache.org/jira/browse/FLINK-40072
Project: Flink
Issue Type: Bug
Components: Connectors / HTTP
Reporter: Bharath Reddy Gunapati
h2. Problem
The HTTP connector's request timeout is exposed as a typed Duration option for
both the lookup source and the sink:
* \{{HttpLookupConnectorOptions.SOURCE_LOOKUP_REQUEST_TIMEOUT}} —
\{{.durationType()}}, default \{{Duration.ofSeconds(30)}}
* \{{HttpDynamicSinkConnectorOptions.SINK_REQUEST_TIMEOUT}} —
\{{.durationType()}}, default \{{Duration.ofSeconds(30)}}
The documentation added in FLINK-39364 (#31) describes the value as a Flink
Duration (e.g. \{{'30s'}}, \{{'1min'}}). However, the runtime never reads the
declared Duration option. It reads the raw config string and parses it with
{\{Integer.parseInt(...)}}:
* Lookup: \{{RequestFactoryBase}} →
\{{Integer.parseInt(properties.getProperty(LOOKUP_HTTP_TIMEOUT_SECONDS, "30"))}}
* Sink: \{{AbstractRequestSubmitter}} →
\{{Integer.parseInt(properties.getProperty(SINK_HTTP_TIMEOUT_SECONDS, "30"))}}
As a result, the moment a user supplies the documented Duration form, the
connector throws \{{NumberFormatException: For input string: "30s"}}.
h2. Why it is easy to miss
The \{{http.*}} namespace is skipped by \{{FactoryUtil#validateExcept(...)}}, so
{\{CREATE TABLE}} succeeds and the value is not validated at planning time. The
failure is deferred to query execution, when the first lookup/sink request is
built and the timeout string is parsed.
h2. Steps to reproduce
Create a lookup table with a Duration-typed timeout and run a lookup join:
{code:sql}
CREATE TABLE Customers (
id STRING, ...
) WITH (
'connector' = 'http',
'lookup-method' = 'GET',
'format' = 'json',
'url' = 'http://localhost:8080/client',
'http.source.lookup.request.timeout' = '30s'
);
-- SELECT ... FROM ... JOIN Customers FOR SYSTEM_TIME AS OF ...
{code}
Expected: the join enriches rows using a 30-second request timeout.
Actual: the job fails at query time with
{\{java.lang.NumberFormatException: For input string: "30s"}}.
The same defect applies to the sink via \{{'http.sink.request.timeout' =
'30s'}}.
h2. Root cause
FLINK-39364 (#31) added the typed \{{ConfigOption<Duration>}} declarations, the
factory registrations, and the Duration-based docs, but did not update the
runtime read path (which still parses integer seconds). This left the declared
contract (Duration) and the runtime behavior (integer seconds) out of sync.
h2. Proposed fix
Wire the runtime to read the declared Duration option:
* Lookup: read \{{SOURCE_LOOKUP_REQUEST_TIMEOUT}} from \{{ReadableConfig}} as a
Duration and apply it directly.
* Sink: parse the value with \{{TimeUtils.parseDuration(...)}} (Flink's
standard duration parser).
* Remove the now-unused integer-seconds field/constant; rely on the option's
own \{{Duration.ofSeconds(30)}} default.
h2. Compatibility note
The value is now parsed with standard Flink Duration semantics (consistent with
the sibling \{{connection.timeout}} option):
* Unit-suffixed values (\{{'30s'}}, \{{'1min'}}, \{{'500ms'}}) now work
(previously crashed).
* A bare number such as \{{'30'}} is now interpreted as 30 milliseconds (Flink's
default duration unit), whereas the pre-fix code treated \{{'30'}} as 30
seconds.
This affects users who configured a bare integer (the original usage inherited
from the getindata connector): \{{'30'}} must be written as \{{'30s'}}. Any
unit-suffixed value was previously non-functional (it threw), so no working
configuration silently changes meaning — bare integers are the only overlap, and
the migration is a one-character change. This is documented.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)