[ 
https://issues.apache.org/jira/browse/FLINK-39975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuepeng Pan resolved FLINK-39975.
---------------------------------
      Assignee: Tomoyuki NAKAMURA
    Resolution: Fixed

> Flaky DerbyDynamicTableSourceITCase.testLimit: JdbcSourceSplitReader does not 
> recover from a connection closed during source cancellation
> -----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39975
>                 URL: https://issues.apache.org/jira/browse/FLINK-39975
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 2.0.1, 2.1.1
>            Reporter: Tomoyuki NAKAMURA
>            Assignee: Tomoyuki NAKAMURA
>            Priority: Critical
>              Labels: pull-request-available, test-stability
>             Fix For: jdbc-3.4.0, jdbc-5.0.0, jdbc-6.0.0
>
>
> h3. Symptom
> {{DerbyDynamicTableSourceITCase.testLimit}} (and occasionally 
> {{testProject}}) fails intermittently in CI with:
> {code}
> java.lang.RuntimeException: java.sql.SQLNonTransientConnectionException: No 
> current connection.
> Caused by: java.sql.SQLNonTransientConnectionException: No current 
> connection.   (Derby ERROR 08003)
>     at org.apache.derby.impl.jdbc.EmbedConnection.checkIfClosed(...)
>     at org.apache.derby.impl.jdbc.EmbedConnection.setupContextStack(...)
>     at org.apache.derby.impl.jdbc.EmbedStatement.executeStatement(...)
>     at org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeQuery(...)
> {code}
> The fetcher thread reports the exception, and because the job runs with 
> {{NoRestartBackoffTimeStrategy}}, the whole job fails. The failure is on the 
> master branch and is not specific to any one connector PR.
> h3. Root cause
> {{testLimit}} runs {{SELECT * FROM t LIMIT 1}} over a source partitioned into 
> 2 splits ({{scan.partition.num=2}}). With {{LIMIT 1}} the job completes after 
> the first row and cancels the source while the split-fetcher thread is still
> opening the *second* split. Cancellation tears down the JDBC connection, so 
> the in-flight 
> {{JdbcSourceSplitReader.openResultSetForSplitWhenAtLeastOnce()}} -> 
> {{prepareStatement()}}/{{executeQuery()}} runs against an already-closed
> connection and Derby raises {{08003: No current connection}}.
> There is a validate-then-use window: the connection that 
> {{SimpleJdbcConnectionProvider.getOrEstablishConnection()}} validated is 
> closed by the time the reader prepares/executes the statement. The reader 
> does not recover from this —
> the {{SQLException}} is rethrown as a fatal {{RuntimeException}} and fails 
> the job. The error is connection-level ({{08003}}); the embedded {{memory:}} 
> database itself is still up (it is only shut down in {{@AfterAll}}), so the
> connection can simply be re-established.
> Note: the failing stack trace bottoms out at the split-open call 
> ({{prepareStatement}}/{{executeQuery}}), not at {{resultSet.next()}}.
> h3. How to reproduce
> {code}
> mvn test -Dtest=DerbyDynamicTableSourceITCase#testLimit -pl 
> flink-connector-jdbc-core
> {code}
> It is timing-dependent; repeat runs surface it. Observed on master 
> ({{af994651}}) and against both Flink 2.0.1 and 2.1.1.
> h3. Fix
> Make {{JdbcSourceSplitReader}} recover from a connection that was closed 
> while opening a split, instead of failing the job:
> # Wrap the split-open call ({{openResultSetForSplit}}) in a bounded retry 
> ({{openResultSetForSplitWithReconnect}}).
> # On a {{SQLException}}, retry *only* when the connection is actually closed 
> ({{connection.isClosed()}}). A genuine query error on a healthy connection is 
> rethrown immediately.
> # Before retrying, drop the dead connection and the statement/result set left 
> on it (so the provider re-establishes a fresh connection and the close 
> helpers are not run against the closed connection), then re-open the split.
> # Bounded by {{MAX_CONNECTION_RETRIES}} (3); if the database is genuinely 
> unreachable, re-establishing keeps failing and the error propagates once the 
> budget is exhausted.
> This is non-masking: real failures (a query error on a healthy connection, or 
> an unreachable database after the retry budget) still fail the job. 
> {{wakeUp()}} and {{fetch()}} are left unchanged.
> h3. Verification
> A deterministic regression test injects an already-closed connection and 
> reproduces the exact {{08003}} error (the test fails without the fix and 
> passes with it), asserting the reader re-establishes the connection and reads 
> the whole
> split. Two more tests cover the immediate-rethrow branch (query error on a 
> healthy connection) and the retry-exhaustion branch (gives up after the 
> budget instead of looping forever). {{DerbyDynamicTableSourceITCase}} passes 
> repeated
> runs.
> h3. Note
> An earlier cooperative-cancellation attempt ({{wakeUp}} flag + treating a 
> shutdown-time {{SQLException}} as a graceful end-of-split in {{fetch()}}) was 
> discarded: it could mask a genuine mid-read connection drop as a 
> silently-finished
> split, and measurement showed {{wakeUp()}}/thread-interrupt are *not* set 
> when the race fires — only {{connection.isClosed()}} is reliable. 
> ({{wakeUp()}} is also invoked by the fetcher for normal {{addSplits}}, so it 
> is not a
> shutdown signal.)
> h3. Related
> A previous attempt (closed without merge) guarded only 
> {{resultSet.isClosed()}} before {{next()}} / {{extract()}} inside the record 
> loop, which does not cover the split-open call where this actually fails:
> https://github.com/apache/flink-connector-jdbc/pull/191



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to