Bharath Reddy Gunapati created FLINK-40072:
----------------------------------------------

             Summary: request.timeout is declared as a Duration option but 
parsed as integer seconds, crashing at query time on any unit-suffixed value
                 Key: FLINK-40072
                 URL: https://issues.apache.org/jira/browse/FLINK-40072
             Project: Flink
          Issue Type: Bug
          Components: Connectors / HTTP
            Reporter: Bharath Reddy Gunapati


h2. Problem

The HTTP connector's request timeout is exposed as a typed Duration option for
both the lookup source and the sink:

* \{{HttpLookupConnectorOptions.SOURCE_LOOKUP_REQUEST_TIMEOUT}} — 
\{{.durationType()}}, default \{{Duration.ofSeconds(30)}}
* \{{HttpDynamicSinkConnectorOptions.SINK_REQUEST_TIMEOUT}} — 
\{{.durationType()}}, default \{{Duration.ofSeconds(30)}}

The documentation added in FLINK-39364 (#31) describes the value as a Flink
Duration (e.g. \{{'30s'}}, \{{'1min'}}). However, the runtime never reads the
declared Duration option. It reads the raw config string and parses it with
{\{Integer.parseInt(...)}}:

* Lookup: \{{RequestFactoryBase}} → 
\{{Integer.parseInt(properties.getProperty(LOOKUP_HTTP_TIMEOUT_SECONDS, "30"))}}
* Sink: \{{AbstractRequestSubmitter}} → 
\{{Integer.parseInt(properties.getProperty(SINK_HTTP_TIMEOUT_SECONDS, "30"))}}

As a result, the moment a user supplies the documented Duration form, the
connector throws \{{NumberFormatException: For input string: "30s"}}.

h2. Why it is easy to miss

The \{{http.*}} namespace is skipped by \{{FactoryUtil#validateExcept(...)}}, so
{\{CREATE TABLE}} succeeds and the value is not validated at planning time. The
failure is deferred to query execution, when the first lookup/sink request is
built and the timeout string is parsed.

h2. Steps to reproduce

Create a lookup table with a Duration-typed timeout and run a lookup join:

{code:sql}
CREATE TABLE Customers (
  id STRING, ...
) WITH (
  'connector' = 'http',
  'lookup-method' = 'GET',
  'format' = 'json',
  'url' = 'http://localhost:8080/client',
  'http.source.lookup.request.timeout' = '30s'
);
-- SELECT ... FROM ... JOIN Customers FOR SYSTEM_TIME AS OF ...
{code}

Expected: the join enriches rows using a 30-second request timeout.
Actual: the job fails at query time with
{\{java.lang.NumberFormatException: For input string: "30s"}}.

The same defect applies to the sink via \{{'http.sink.request.timeout' = 
'30s'}}.

h2. Root cause

FLINK-39364 (#31) added the typed \{{ConfigOption<Duration>}} declarations, the
factory registrations, and the Duration-based docs, but did not update the
runtime read path (which still parses integer seconds). This left the declared
contract (Duration) and the runtime behavior (integer seconds) out of sync.

h2. Proposed fix

Wire the runtime to read the declared Duration option:

* Lookup: read \{{SOURCE_LOOKUP_REQUEST_TIMEOUT}} from \{{ReadableConfig}} as a 
Duration and apply it directly.
* Sink: parse the value with \{{TimeUtils.parseDuration(...)}} (Flink's 
standard duration parser).
* Remove the now-unused integer-seconds field/constant; rely on the option's 
own \{{Duration.ofSeconds(30)}} default.

h2. Compatibility note

The value is now parsed with standard Flink Duration semantics (consistent with
the sibling \{{connection.timeout}} option):

* Unit-suffixed values (\{{'30s'}}, \{{'1min'}}, \{{'500ms'}}) now work 
(previously crashed).
* A bare number such as \{{'30'}} is now interpreted as 30 milliseconds (Flink's
  default duration unit), whereas the pre-fix code treated \{{'30'}} as 30 
seconds.

This affects users who configured a bare integer (the original usage inherited
from the getindata connector): \{{'30'}} must be written as \{{'30s'}}. Any
unit-suffixed value was previously non-functional (it threw), so no working
configuration silently changes meaning — bare integers are the only overlap, and
the migration is a one-character change. This is documented.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to