RocMarshal commented on code in PR #200:
URL:
https://github.com/apache/flink-connector-jdbc/pull/200#discussion_r3460184017
##########
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReaderTest.java:
##########
@@ -144,4 +146,120 @@ void testFetch() throws Exception {
assertThat(fetchedRecordsWithSplitIds.nextSplit()).isNull();
splitReader.close();
}
+
+ @Test
+ void testFetchReconnectsWhenConnectionClosedWhileOpeningSplit() throws
Exception {
+ // The provider hands back an already-closed connection the first
time, so the reader's
+ // first use of it fails (mimicking the connection being torn down,
e.g. on source
+ // cancellation, before the reader finishes opening the split). The
reader must
+ // re-establish the connection and read the whole split.
+ CountingConnectionProvider provider = new
CountingConnectionProvider(connectionProvider, 1);
+ JdbcSourceSplitReader<TestEntry> splitReader = newReader(provider,
split);
+ try {
Review Comment:
Could the lines be
```
try(JdbcSourceSplitReader<TestEntry> splitReader = newReader(provider,
split)) {
....
}
// deleted
// xxx.close()
```
##########
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReaderTest.java:
##########
@@ -144,4 +146,120 @@ void testFetch() throws Exception {
assertThat(fetchedRecordsWithSplitIds.nextSplit()).isNull();
splitReader.close();
}
+
+ @Test
+ void testFetchReconnectsWhenConnectionClosedWhileOpeningSplit() throws
Exception {
+ // The provider hands back an already-closed connection the first
time, so the reader's
+ // first use of it fails (mimicking the connection being torn down,
e.g. on source
+ // cancellation, before the reader finishes opening the split). The
reader must
+ // re-establish the connection and read the whole split.
+ CountingConnectionProvider provider = new
CountingConnectionProvider(connectionProvider, 1);
+ JdbcSourceSplitReader<TestEntry> splitReader = newReader(provider,
split);
+ try {
+ RecordsWithSplitIds<RecordAndOffset<TestEntry>> fetched =
splitReader.fetch();
+ assertThat(fetched.nextSplit()).isEqualTo("1");
+ List<TestEntry> records = new ArrayList<>();
+ RecordAndOffset<TestEntry> recordAndOffset =
fetched.nextRecordFromSplit();
+ while (recordAndOffset != null) {
+ records.add(recordAndOffset.record);
+ recordAndOffset = fetched.nextRecordFromSplit();
+ }
+ assertThat(records).hasSize(TEST_DATA.length);
+ // The first (closed) connection plus exactly one re-established
one: a reconnect ran.
+ assertThat(provider.establishCount).isEqualTo(2);
+ } finally {
+ splitReader.close();
+ }
+ }
+
+ @Test
+ void testFetchRethrowsImmediatelyWhenConnectionStaysOpen() throws
Exception {
+ // A query error on a healthy (open) connection must be rethrown
immediately, with no
+ // reconnect attempt.
+ CountingConnectionProvider provider = new
CountingConnectionProvider(connectionProvider, 0);
+ JdbcSourceSplit invalidSplit =
+ new JdbcSourceSplit("1", "select * from NON_EXISTENT_TABLE",
null, null);
+ JdbcSourceSplitReader<TestEntry> splitReader = newReader(provider,
invalidSplit);
+ try {
Review Comment:
if so, ditto
##########
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReaderTest.java:
##########
@@ -144,4 +146,120 @@ void testFetch() throws Exception {
assertThat(fetchedRecordsWithSplitIds.nextSplit()).isNull();
splitReader.close();
}
+
+ @Test
+ void testFetchReconnectsWhenConnectionClosedWhileOpeningSplit() throws
Exception {
+ // The provider hands back an already-closed connection the first
time, so the reader's
+ // first use of it fails (mimicking the connection being torn down,
e.g. on source
+ // cancellation, before the reader finishes opening the split). The
reader must
+ // re-establish the connection and read the whole split.
+ CountingConnectionProvider provider = new
CountingConnectionProvider(connectionProvider, 1);
+ JdbcSourceSplitReader<TestEntry> splitReader = newReader(provider,
split);
+ try {
+ RecordsWithSplitIds<RecordAndOffset<TestEntry>> fetched =
splitReader.fetch();
+ assertThat(fetched.nextSplit()).isEqualTo("1");
+ List<TestEntry> records = new ArrayList<>();
+ RecordAndOffset<TestEntry> recordAndOffset =
fetched.nextRecordFromSplit();
+ while (recordAndOffset != null) {
+ records.add(recordAndOffset.record);
+ recordAndOffset = fetched.nextRecordFromSplit();
+ }
+ assertThat(records).hasSize(TEST_DATA.length);
+ // The first (closed) connection plus exactly one re-established
one: a reconnect ran.
+ assertThat(provider.establishCount).isEqualTo(2);
+ } finally {
+ splitReader.close();
+ }
+ }
+
+ @Test
+ void testFetchRethrowsImmediatelyWhenConnectionStaysOpen() throws
Exception {
+ // A query error on a healthy (open) connection must be rethrown
immediately, with no
+ // reconnect attempt.
+ CountingConnectionProvider provider = new
CountingConnectionProvider(connectionProvider, 0);
+ JdbcSourceSplit invalidSplit =
+ new JdbcSourceSplit("1", "select * from NON_EXISTENT_TABLE",
null, null);
+ JdbcSourceSplitReader<TestEntry> splitReader = newReader(provider,
invalidSplit);
+ try {
+
assertThatThrownBy(splitReader::fetch).isInstanceOf(RuntimeException.class);
+ assertThat(provider.establishCount).isEqualTo(1);
+ } finally {
+ splitReader.close();
+ }
+ }
+
+ @Test
+ void testFetchFailsAfterExhaustingReconnectRetries() throws Exception {
+ // If the connection keeps being closed, the reader gives up after the
retry budget
+ // instead of looping forever.
+ CountingConnectionProvider provider =
+ new CountingConnectionProvider(connectionProvider,
Integer.MAX_VALUE);
+ JdbcSourceSplitReader<TestEntry> splitReader = newReader(provider,
split);
+ try {
Review Comment:
if so, ditto
##########
flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReaderTest.java:
##########
@@ -144,4 +146,120 @@ void testFetch() throws Exception {
assertThat(fetchedRecordsWithSplitIds.nextSplit()).isNull();
splitReader.close();
}
+
+ @Test
+ void testFetchReconnectsWhenConnectionClosedWhileOpeningSplit() throws
Exception {
+ // The provider hands back an already-closed connection the first
time, so the reader's
+ // first use of it fails (mimicking the connection being torn down,
e.g. on source
+ // cancellation, before the reader finishes opening the split). The
reader must
+ // re-establish the connection and read the whole split.
+ CountingConnectionProvider provider = new
CountingConnectionProvider(connectionProvider, 1);
+ JdbcSourceSplitReader<TestEntry> splitReader = newReader(provider,
split);
+ try {
+ RecordsWithSplitIds<RecordAndOffset<TestEntry>> fetched =
splitReader.fetch();
+ assertThat(fetched.nextSplit()).isEqualTo("1");
+ List<TestEntry> records = new ArrayList<>();
+ RecordAndOffset<TestEntry> recordAndOffset =
fetched.nextRecordFromSplit();
+ while (recordAndOffset != null) {
+ records.add(recordAndOffset.record);
+ recordAndOffset = fetched.nextRecordFromSplit();
+ }
+ assertThat(records).hasSize(TEST_DATA.length);
+ // The first (closed) connection plus exactly one re-established
one: a reconnect ran.
+ assertThat(provider.establishCount).isEqualTo(2);
+ } finally {
+ splitReader.close();
Review Comment:
if so, deleted
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]