[ 
https://issues.apache.org/jira/browse/FLINK-40072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bharath Reddy Gunapati updated FLINK-40072:
-------------------------------------------
    Description: 
h2. Problem

The HTTP connector's request timeout is exposed as a typed Duration option for
both the lookup source and the sink:

{code:java}
// lookup
HttpLookupConnectorOptions.SOURCE_LOOKUP_REQUEST_TIMEOUT
        .durationType()
        .defaultValue(Duration.ofSeconds(30))

// sink
HttpDynamicSinkConnectorOptions.SINK_REQUEST_TIMEOUT
        .durationType()
        .defaultValue(Duration.ofSeconds(30))
{code}

The documentation added in FLINK-39364 (#31) describes the value as a Flink
Duration (e.g. '30s', '1min'). However, the runtime never reads the declared
Duration option. It reads the raw config string and parses it as an integer:

{code:java}
// lookup - RequestFactoryBase
Integer.parseInt(properties.getProperty(LOOKUP_HTTP_TIMEOUT_SECONDS, "30"));

// sink - AbstractRequestSubmitter
Integer.parseInt(properties.getProperty(SINK_HTTP_TIMEOUT_SECONDS, "30"));
{code}

As a result, the moment a user supplies the documented Duration form, the
connector throws NumberFormatException: For input string: "30s".

h2. Why it is easy to miss

The http.* namespace is skipped by FactoryUtil.validateExcept(...), so
CREATE TABLE succeeds and the value is not validated at planning time. The
failure is deferred to query execution, when the first lookup/sink request is
built and the timeout string is parsed.

h2. Steps to reproduce

Create a lookup table with a Duration-typed timeout and run a lookup join:

{code:sql}
CREATE TABLE Customers (
  id STRING, ...
) WITH (
  'connector' = 'http',
  'lookup-method' = 'GET',
  'format' = 'json',
  'url' = 'http://localhost:8080/client',
  'http.source.lookup.request.timeout' = '30s'
);
-- SELECT ... FROM ... JOIN Customers FOR SYSTEM_TIME AS OF ...
{code}

Expected: the join enriches rows using a 30-second request timeout.
Actual: the job fails at query time with
java.lang.NumberFormatException: For input string: "30s".

The same defect applies to the sink via 'http.sink.request.timeout' = '30s'.

h2. Root cause

FLINK-39364 (#31) added the typed ConfigOption<Duration> declarations, the
factory registrations, and the Duration-based docs, but did not update the
runtime read path (which still parses integer seconds). This left the declared
contract (Duration) and the runtime behavior (integer seconds) out of sync.

h2. Proposed fix

Wire the runtime to read the declared Duration option:

* Lookup: read SOURCE_LOOKUP_REQUEST_TIMEOUT from ReadableConfig as a Duration 
and apply it directly.
* Sink: parse the value with TimeUtils.parseDuration(...) (Flink's standard 
duration parser).
* Remove the now-unused integer-seconds field/constant; rely on the option's 
own Duration.ofSeconds(30) default.

h2. Compatibility note

The value is now parsed with standard Flink Duration semantics (consistent with
the sibling connection.timeout option):

* Unit-suffixed values ('30s', '1min', '500ms') now work (previously crashed).
* A bare number such as '30' is now interpreted as 30 milliseconds (Flink's 
default duration unit), whereas the pre-fix code treated '30' as 30 seconds.

This affects users who configured a bare integer (the original usage inherited
from the getindata connector): '30' must be written as '30s'. Any unit-suffixed
value was previously non-functional (it threw), so no working configuration
silently changes meaning - bare integers are the only overlap, and the migration
is a one-character change. This is documented.

  was:
h2. Problem

The HTTP connector's request timeout is exposed as a typed Duration option for
both the lookup source and the sink:

* \{{HttpLookupConnectorOptions.SOURCE_LOOKUP_REQUEST_TIMEOUT}} — 
\{{.durationType()}}, default \{{Duration.ofSeconds(30)}}
* \{{HttpDynamicSinkConnectorOptions.SINK_REQUEST_TIMEOUT}} — 
\{{.durationType()}}, default \{{Duration.ofSeconds(30)}}

The documentation added in FLINK-39364 (#31) describes the value as a Flink
Duration (e.g. \{{'30s'}}, \{{'1min'}}). However, the runtime never reads the
declared Duration option. It reads the raw config string and parses it with
{\{Integer.parseInt(...)}}:

* Lookup: \{{RequestFactoryBase}} → 
\{{Integer.parseInt(properties.getProperty(LOOKUP_HTTP_TIMEOUT_SECONDS, "30"))}}
* Sink: \{{AbstractRequestSubmitter}} → 
\{{Integer.parseInt(properties.getProperty(SINK_HTTP_TIMEOUT_SECONDS, "30"))}}

As a result, the moment a user supplies the documented Duration form, the
connector throws \{{NumberFormatException: For input string: "30s"}}.

h2. Why it is easy to miss

The \{{http.*}} namespace is skipped by \{{FactoryUtil#validateExcept(...)}}, so
{\{CREATE TABLE}} succeeds and the value is not validated at planning time. The
failure is deferred to query execution, when the first lookup/sink request is
built and the timeout string is parsed.

h2. Steps to reproduce

Create a lookup table with a Duration-typed timeout and run a lookup join:

{code:sql}
CREATE TABLE Customers (
  id STRING, ...
) WITH (
  'connector' = 'http',
  'lookup-method' = 'GET',
  'format' = 'json',
  'url' = 'http://localhost:8080/client',
  'http.source.lookup.request.timeout' = '30s'
);
-- SELECT ... FROM ... JOIN Customers FOR SYSTEM_TIME AS OF ...
{code}

Expected: the join enriches rows using a 30-second request timeout.
Actual: the job fails at query time with
{\{java.lang.NumberFormatException: For input string: "30s"}}.

The same defect applies to the sink via \{{'http.sink.request.timeout' = 
'30s'}}.

h2. Root cause

FLINK-39364 (#31) added the typed \{{ConfigOption<Duration>}} declarations, the
factory registrations, and the Duration-based docs, but did not update the
runtime read path (which still parses integer seconds). This left the declared
contract (Duration) and the runtime behavior (integer seconds) out of sync.

h2. Proposed fix

Wire the runtime to read the declared Duration option:

* Lookup: read \{{SOURCE_LOOKUP_REQUEST_TIMEOUT}} from \{{ReadableConfig}} as a 
Duration and apply it directly.
* Sink: parse the value with \{{TimeUtils.parseDuration(...)}} (Flink's 
standard duration parser).
* Remove the now-unused integer-seconds field/constant; rely on the option's 
own \{{Duration.ofSeconds(30)}} default.

h2. Compatibility note

The value is now parsed with standard Flink Duration semantics (consistent with
the sibling \{{connection.timeout}} option):

* Unit-suffixed values (\{{'30s'}}, \{{'1min'}}, \{{'500ms'}}) now work 
(previously crashed).
* A bare number such as \{{'30'}} is now interpreted as 30 milliseconds (Flink's
  default duration unit), whereas the pre-fix code treated \{{'30'}} as 30 
seconds.

This affects users who configured a bare integer (the original usage inherited
from the getindata connector): \{{'30'}} must be written as \{{'30s'}}. Any
unit-suffixed value was previously non-functional (it threw), so no working
configuration silently changes meaning — bare integers are the only overlap, and
the migration is a one-character change. This is documented.


> request.timeout is declared as a Duration option but parsed as integer 
> seconds, crashing at query time on any unit-suffixed value
> ---------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-40072
>                 URL: https://issues.apache.org/jira/browse/FLINK-40072
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / HTTP
>            Reporter: Bharath Reddy Gunapati
>            Priority: Major
>
> h2. Problem
> The HTTP connector's request timeout is exposed as a typed Duration option for
> both the lookup source and the sink:
> {code:java}
> // lookup
> HttpLookupConnectorOptions.SOURCE_LOOKUP_REQUEST_TIMEOUT
>         .durationType()
>         .defaultValue(Duration.ofSeconds(30))
> // sink
> HttpDynamicSinkConnectorOptions.SINK_REQUEST_TIMEOUT
>         .durationType()
>         .defaultValue(Duration.ofSeconds(30))
> {code}
> The documentation added in FLINK-39364 (#31) describes the value as a Flink
> Duration (e.g. '30s', '1min'). However, the runtime never reads the declared
> Duration option. It reads the raw config string and parses it as an integer:
> {code:java}
> // lookup - RequestFactoryBase
> Integer.parseInt(properties.getProperty(LOOKUP_HTTP_TIMEOUT_SECONDS, "30"));
> // sink - AbstractRequestSubmitter
> Integer.parseInt(properties.getProperty(SINK_HTTP_TIMEOUT_SECONDS, "30"));
> {code}
> As a result, the moment a user supplies the documented Duration form, the
> connector throws NumberFormatException: For input string: "30s".
> h2. Why it is easy to miss
> The http.* namespace is skipped by FactoryUtil.validateExcept(...), so
> CREATE TABLE succeeds and the value is not validated at planning time. The
> failure is deferred to query execution, when the first lookup/sink request is
> built and the timeout string is parsed.
> h2. Steps to reproduce
> Create a lookup table with a Duration-typed timeout and run a lookup join:
> {code:sql}
> CREATE TABLE Customers (
>   id STRING, ...
> ) WITH (
>   'connector' = 'http',
>   'lookup-method' = 'GET',
>   'format' = 'json',
>   'url' = 'http://localhost:8080/client',
>   'http.source.lookup.request.timeout' = '30s'
> );
> -- SELECT ... FROM ... JOIN Customers FOR SYSTEM_TIME AS OF ...
> {code}
> Expected: the join enriches rows using a 30-second request timeout.
> Actual: the job fails at query time with
> java.lang.NumberFormatException: For input string: "30s".
> The same defect applies to the sink via 'http.sink.request.timeout' = '30s'.
> h2. Root cause
> FLINK-39364 (#31) added the typed ConfigOption<Duration> declarations, the
> factory registrations, and the Duration-based docs, but did not update the
> runtime read path (which still parses integer seconds). This left the declared
> contract (Duration) and the runtime behavior (integer seconds) out of sync.
> h2. Proposed fix
> Wire the runtime to read the declared Duration option:
> * Lookup: read SOURCE_LOOKUP_REQUEST_TIMEOUT from ReadableConfig as a 
> Duration and apply it directly.
> * Sink: parse the value with TimeUtils.parseDuration(...) (Flink's standard 
> duration parser).
> * Remove the now-unused integer-seconds field/constant; rely on the option's 
> own Duration.ofSeconds(30) default.
> h2. Compatibility note
> The value is now parsed with standard Flink Duration semantics (consistent 
> with
> the sibling connection.timeout option):
> * Unit-suffixed values ('30s', '1min', '500ms') now work (previously crashed).
> * A bare number such as '30' is now interpreted as 30 milliseconds (Flink's 
> default duration unit), whereas the pre-fix code treated '30' as 30 seconds.
> This affects users who configured a bare integer (the original usage inherited
> from the getindata connector): '30' must be written as '30s'. Any 
> unit-suffixed
> value was previously non-functional (it threw), so no working configuration
> silently changes meaning - bare integers are the only overlap, and the 
> migration
> is a one-character change. This is documented.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to