This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 190238a05bebe9a092e9cec84627127781d4d859
Author: Roc Marshal <flin...@126.com>
AuthorDate: Fri May 10 21:18:11 2024 +0800

    [FLINK-33461][Connector/JDBC] Support streaming related semantics for the 
new JDBC source
---
 .../connections/SimpleJdbcConnectionProvider.java  |   1 +
 .../flink/connector/jdbc/source/JdbcSource.java    |  40 +--
 .../connector/jdbc/source/JdbcSourceBuilder.java   |  87 ++++-
 .../source/enumerator/JdbcSourceEnumerator.java    |  88 ++++-
 .../enumerator/JdbcSqlSplitEnumeratorBase.java     |   6 +-
 .../enumerator/SqlTemplateSplitEnumerator.java     |  36 +-
 .../jdbc/source/reader/JdbcSourceSplitReader.java  | 186 +++++++---
 .../jdbc/source/split/JdbcSourceSplit.java         |  28 +-
 .../source/split/JdbcSourceSplitSerializer.java    |   6 +-
 .../jdbc/source/split/JdbcSourceSplitState.java    |   2 +-
 .../jdbc/split/JdbcParameterValuesProvider.java    |   8 +
 .../split/JdbcSlideTimingParameterProvider.java    |  94 ++++++
 .../jdbc/utils/ContinuousUnBoundingSettings.java   |  86 +++++
 .../jdbc/source/JdbcSourceBuilderTest.java         |  43 ++-
 .../jdbc/source/JdbcSourceStreamRelatedITCase.java | 374 +++++++++++++++++++++
 .../JdbcSourceEnumStateSerializerTest.java         |   5 +-
 .../enumerator/JdbcSourceEnumeratorTest.java       |   8 +-
 .../jdbc/source/reader/JdbcSourceReaderTest.java   |   2 +-
 .../source/reader/JdbcSourceSplitReaderTest.java   |   2 +-
 .../split/JdbcSourceSplitSerializerTest.java       |   3 +-
 20 files changed, 971 insertions(+), 134 deletions(-)

diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java
index 811210af..4c48f799 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java
@@ -79,6 +79,7 @@ public class SimpleJdbcConnectionProvider implements 
JdbcConnectionProvider, Ser
     @Override
     public boolean isConnectionValid() throws SQLException {
         return connection != null
+                && !connection.isClosed()
                 && 
connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds());
     }
 
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java
index fc145feb..08e4a777 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java
@@ -40,9 +40,12 @@ import 
org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader;
 import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
 import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
 import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer;
+import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Objects;
@@ -55,6 +58,7 @@ public class JdbcSource<OUT>
 
     private final Boundedness boundedness;
     private final TypeInformation<OUT> typeInformation;
+    private final @Nullable ContinuousUnBoundingSettings 
continuousUnBoundingSettings;
 
     private final Configuration configuration;
     private final JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> 
sqlSplitEnumeratorProvider;
@@ -69,29 +73,20 @@ public class JdbcSource<OUT>
             JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> 
sqlSplitEnumeratorProvider,
             ResultExtractor<OUT> resultExtractor,
             TypeInformation<OUT> typeInformation,
-            DeliveryGuarantee deliveryGuarantee) {
+            @Nullable DeliveryGuarantee deliveryGuarantee,
+            @Nullable ContinuousUnBoundingSettings 
continuousUnBoundingSettings) {
         this.configuration = Preconditions.checkNotNull(configuration);
         this.connectionProvider = 
Preconditions.checkNotNull(connectionProvider);
         this.sqlSplitEnumeratorProvider = 
Preconditions.checkNotNull(sqlSplitEnumeratorProvider);
         this.resultExtractor = Preconditions.checkNotNull(resultExtractor);
-        this.deliveryGuarantee = Preconditions.checkNotNull(deliveryGuarantee);
+        this.deliveryGuarantee =
+                Objects.isNull(deliveryGuarantee) ? DeliveryGuarantee.NONE : 
deliveryGuarantee;
         this.typeInformation = Preconditions.checkNotNull(typeInformation);
-        this.boundedness = Boundedness.BOUNDED;
-    }
-
-    JdbcSource(
-            Configuration configuration,
-            JdbcConnectionProvider connectionProvider,
-            JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> 
sqlSplitEnumeratorProvider,
-            ResultExtractor<OUT> resultExtractor,
-            TypeInformation<OUT> typeInformation) {
-        this(
-                configuration,
-                connectionProvider,
-                sqlSplitEnumeratorProvider,
-                resultExtractor,
-                typeInformation,
-                DeliveryGuarantee.NONE);
+        this.continuousUnBoundingSettings = continuousUnBoundingSettings;
+        this.boundedness =
+                Objects.isNull(continuousUnBoundingSettings)
+                        ? Boundedness.BOUNDED
+                        : Boundedness.CONTINUOUS_UNBOUNDED;
     }
 
     @Override
@@ -119,7 +114,10 @@ public class JdbcSource<OUT>
     public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumeratorState> 
createEnumerator(
             SplitEnumeratorContext<JdbcSourceSplit> enumContext) throws 
Exception {
         return new JdbcSourceEnumerator(
-                enumContext, sqlSplitEnumeratorProvider.create(), new 
ArrayList<>());
+                enumContext,
+                sqlSplitEnumeratorProvider.create(),
+                continuousUnBoundingSettings,
+                new ArrayList<>());
     }
 
     @Override
@@ -132,6 +130,7 @@ public class JdbcSource<OUT>
         return new JdbcSourceEnumerator(
                 enumContext,
                 
sqlSplitEnumeratorProvider.restore(optionalUserDefinedSplitEnumeratorState),
+                continuousUnBoundingSettings,
                 checkpoint.getRemainingSplits());
     }
 
@@ -193,6 +192,7 @@ public class JdbcSource<OUT>
                 && Objects.equals(sqlSplitEnumeratorProvider, 
that.sqlSplitEnumeratorProvider)
                 && Objects.equals(connectionProvider, that.connectionProvider)
                 && Objects.equals(resultExtractor, that.resultExtractor)
-                && deliveryGuarantee == that.deliveryGuarantee;
+                && deliveryGuarantee == that.deliveryGuarantee
+                && Objects.equals(continuousUnBoundingSettings, 
that.continuousUnBoundingSettings);
     }
 }
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java
index 17e3eb27..8d52f7ac 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java
@@ -28,6 +28,8 @@ import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnecti
 import 
org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator;
 import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
 import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
+import org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider;
+import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
@@ -36,9 +38,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
+import java.io.Serializable;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Objects;
 
 import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT;
 import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE;
@@ -94,6 +100,11 @@ public class JdbcSourceBuilder<OUT> {
 
     public static final Logger LOG = 
LoggerFactory.getLogger(JdbcSourceBuilder.class);
 
+    public static final String INVALID_CONTINUOUS_SLIDE_TIMING_HINT =
+            "The 'jdbcParameterValuesProvider' must be specified with in type 
of 'JdbcSlideTimingParameterProvider' when using 
'continuousUnBoundingSettings'.";
+    public static final String INVALID_SLIDE_TIMING_CONTINUOUS_HINT =
+            "The 'continuousUnBoundingSettings' must be specified with in type 
of 'continuousUnBoundingSettings' when using 'jdbcParameterValuesProvider' in 
type of 'JdbcSlideTimingParameterProvider'.";
+
     private final Configuration configuration;
 
     private int splitReaderFetchBatchSize;
@@ -103,15 +114,15 @@ public class JdbcSourceBuilder<OUT> {
     // Boolean to distinguish between default value and explicitly set 
autoCommit mode.
     private Boolean autoCommit;
 
-    // TODO It would be used to introduce streaming semantic and tracked in
-    //  https://issues.apache.org/jira/browse/FLINK-33461
     private DeliveryGuarantee deliveryGuarantee;
+    private @Nullable ContinuousUnBoundingSettings 
continuousUnBoundingSettings;
 
     private TypeInformation<OUT> typeInformation;
 
     private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder 
connOptionsBuilder;
     private String sql;
     private JdbcParameterValuesProvider jdbcParameterValuesProvider;
+    private @Nullable Serializable optionalSqlSplitEnumeratorState;
     private ResultExtractor<OUT> resultExtractor;
 
     private JdbcConnectionProvider connectionProvider;
@@ -143,9 +154,6 @@ public class JdbcSourceBuilder<OUT> {
     }
 
     public JdbcSourceBuilder<OUT> setUsername(String username) {
-        Preconditions.checkArgument(
-                !StringUtils.isNullOrWhitespaceOnly(username),
-                "It's required to set the 'username'.");
         connOptionsBuilder.withUsername(username);
         return this;
     }
@@ -180,19 +188,45 @@ public class JdbcSourceBuilder<OUT> {
 
     // ------ Optional 
------------------------------------------------------------------
 
-    public JdbcSourceBuilder<OUT> setConnectionProperty(String propKey, String 
propVal) {
-        Preconditions.checkNotNull(propKey, "Connection property key mustn't 
be null");
-        Preconditions.checkNotNull(propVal, "Connection property value mustn't 
be null");
-        connOptionsBuilder.withProperty(propKey, propVal);
+    /**
+     * The continuousUnBoundingSettings to discovery the next available batch 
splits. Note: If the
+     * value was set, the {@link #jdbcParameterValuesProvider} must specified 
with the {@link
+     * org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider}.
+     */
+    public JdbcSourceBuilder<OUT> setContinuousUnBoundingSettings(
+            ContinuousUnBoundingSettings continuousUnBoundingSettings) {
+        this.continuousUnBoundingSettings = continuousUnBoundingSettings;
         return this;
     }
 
+    /**
+     * If the value was set as an instance of {@link 
JdbcSlideTimingParameterProvider}, it's
+     * required to specify the {@link #continuousUnBoundingSettings}.
+     */
     public JdbcSourceBuilder<OUT> setJdbcParameterValuesProvider(
             @Nonnull JdbcParameterValuesProvider parameterValuesProvider) {
         this.jdbcParameterValuesProvider = 
Preconditions.checkNotNull(parameterValuesProvider);
         return this;
     }
 
+    public JdbcSourceBuilder<OUT> setDeliveryGuarantee(DeliveryGuarantee 
deliveryGuarantee) {
+        this.deliveryGuarantee = Preconditions.checkNotNull(deliveryGuarantee);
+        return this;
+    }
+
+    public JdbcSourceBuilder<OUT> setConnectionCheckTimeoutSeconds(
+            int connectionCheckTimeoutSeconds) {
+        
connOptionsBuilder.withConnectionCheckTimeoutSeconds(connectionCheckTimeoutSeconds);
+        return this;
+    }
+
+    public JdbcSourceBuilder<OUT> setConnectionProperty(String propKey, String 
propVal) {
+        Preconditions.checkNotNull(propKey, "Connection property key mustn't 
be null");
+        Preconditions.checkNotNull(propVal, "Connection property value mustn't 
be null");
+        connOptionsBuilder.withProperty(propKey, propVal);
+        return this;
+    }
+
     public JdbcSourceBuilder<OUT> setSplitReaderFetchBatchSize(int 
splitReaderFetchBatchSize) {
         Preconditions.checkArgument(
                 splitReaderFetchBatchSize > 0,
@@ -232,11 +266,26 @@ public class JdbcSourceBuilder<OUT> {
         return this;
     }
 
+    public JdbcSourceBuilder<OUT> setOptionalSqlSplitEnumeratorState(
+            Serializable optionalSqlSplitEnumeratorState) {
+        this.optionalSqlSplitEnumeratorState = optionalSqlSplitEnumeratorState;
+        return this;
+    }
+
     public JdbcSource<OUT> build() {
         this.connectionProvider = new 
SimpleJdbcConnectionProvider(connOptionsBuilder.build());
         if (resultSetFetchSize > 0) {
             this.configuration.set(RESULTSET_FETCH_SIZE, resultSetFetchSize);
         }
+
+        if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
+            Preconditions.checkArgument(
+                    this.resultSetType == ResultSet.TYPE_SCROLL_INSENSITIVE
+                            || this.resultSetType == 
ResultSet.CONCUR_READ_ONLY,
+                    "The 'resultSetType' must be 
ResultSet.TYPE_SCROLL_INSENSITIVE or ResultSet.CONCUR_READ_ONLY when using %s",
+                    DeliveryGuarantee.EXACTLY_ONCE);
+        }
+
         this.configuration.set(RESULTSET_CONCURRENCY, resultSetConcurrency);
         this.configuration.set(RESULTSET_TYPE, resultSetType);
         this.configuration.set(READER_FETCH_BATCH_SIZE, 
splitReaderFetchBatchSize);
@@ -247,15 +296,31 @@ public class JdbcSourceBuilder<OUT> {
         Preconditions.checkNotNull(resultExtractor, "'resultExtractor' mustn't 
be null.");
         Preconditions.checkNotNull(typeInformation, "'typeInformation' mustn't 
be null.");
 
+        if (Objects.nonNull(continuousUnBoundingSettings)) {
+            Preconditions.checkArgument(
+                    Objects.nonNull(jdbcParameterValuesProvider)
+                            && jdbcParameterValuesProvider
+                                    instanceof 
JdbcSlideTimingParameterProvider,
+                    INVALID_SLIDE_TIMING_CONTINUOUS_HINT);
+        }
+
+        if (Objects.nonNull(jdbcParameterValuesProvider)
+                && jdbcParameterValuesProvider instanceof 
JdbcSlideTimingParameterProvider) {
+            Preconditions.checkArgument(
+                    Objects.nonNull(continuousUnBoundingSettings),
+                    INVALID_CONTINUOUS_SLIDE_TIMING_HINT);
+        }
+
         return new JdbcSource<>(
                 configuration,
                 connectionProvider,
                 new 
SqlTemplateSplitEnumerator.TemplateSqlSplitEnumeratorProvider()
-                        .setOptionalSqlSplitEnumeratorState(null)
+                        
.setOptionalSqlSplitEnumeratorState(optionalSqlSplitEnumeratorState)
                         .setSqlTemplate(sql)
                         
.setParameterValuesProvider(jdbcParameterValuesProvider),
                 resultExtractor,
                 typeInformation,
-                deliveryGuarantee);
+                deliveryGuarantee,
+                continuousUnBoundingSettings);
     }
 }
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumerator.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumerator.java
index a6079723..a26905fc 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumerator.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumerator.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -34,7 +35,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 
 /** JDBC source enumerator. */
@@ -44,26 +48,43 @@ public class JdbcSourceEnumerator
 
     private final SplitEnumeratorContext<JdbcSourceSplit> context;
     private final Boundedness boundedness;
+    private final LinkedHashMap<Integer, String> readersAwaitingSplit;
     private final List<JdbcSourceSplit> unassigned;
     private final JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> 
sqlSplitEnumerator;
+    private final @Nullable ContinuousUnBoundingSettings 
continuousUnBoundingSettings;
 
     public JdbcSourceEnumerator(
             SplitEnumeratorContext<JdbcSourceSplit> context,
             JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> sqlSplitEnumerator,
+            ContinuousUnBoundingSettings continuousUnBoundingSettings,
             List<JdbcSourceSplit> unassigned) {
         this.context = Preconditions.checkNotNull(context);
         this.sqlSplitEnumerator = 
Preconditions.checkNotNull(sqlSplitEnumerator);
-        this.boundedness = Boundedness.BOUNDED;
+        this.continuousUnBoundingSettings = continuousUnBoundingSettings;
+        this.boundedness =
+                Objects.isNull(continuousUnBoundingSettings)
+                        ? Boundedness.BOUNDED
+                        : Boundedness.CONTINUOUS_UNBOUNDED;
         this.unassigned = Preconditions.checkNotNull(unassigned);
+        this.readersAwaitingSplit = new LinkedHashMap<>();
     }
 
     @Override
     public void start() {
         sqlSplitEnumerator.open();
-        try {
-            unassigned.addAll(sqlSplitEnumerator.enumerateSplits());
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+        if (boundedness == Boundedness.CONTINUOUS_UNBOUNDED
+                && Objects.nonNull(continuousUnBoundingSettings)) {
+            context.callAsync(
+                    () -> sqlSplitEnumerator.enumerateSplits(() -> 1024 - 
unassigned.size() > 0),
+                    this::processNewSplits,
+                    
continuousUnBoundingSettings.getInitialDiscoveryDelay().toMillis(),
+                    
continuousUnBoundingSettings.getDiscoveryInterval().toMillis());
+        } else {
+            try {
+                unassigned.addAll(sqlSplitEnumerator.enumerateSplits(() -> 
true));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
         }
     }
 
@@ -81,6 +102,9 @@ public class JdbcSourceEnumerator
     public void handleSplitRequest(int subtask, @Nullable String hostname) {
         if (boundedness == Boundedness.BOUNDED) {
             assignSplitsForBounded(subtask, hostname);
+        } else {
+            readersAwaitingSplit.put(subtask, hostname);
+            assignSplitsForUnbounded();
         }
     }
 
@@ -93,6 +117,16 @@ public class JdbcSourceEnumerator
     public void addSplitsBack(List<JdbcSourceSplit> splits, int subtaskId) {
         LOG.debug("File Source Enumerator adds splits back: {}", splits);
         unassigned.addAll(splits);
+        if (boundedness == Boundedness.BOUNDED) {
+            context.registeredReaders()
+                    .keySet()
+                    .forEach(subTask -> assignSplitsForBounded(subTask, null));
+        } else if (boundedness == Boundedness.CONTINUOUS_UNBOUNDED) {
+            if (context.registeredReaders().containsKey(subtaskId)) {
+                readersAwaitingSplit.put(subtaskId, null);
+            }
+            assignSplitsForUnbounded();
+        }
     }
 
     @Override
@@ -110,12 +144,9 @@ public class JdbcSourceEnumerator
             return Optional.empty();
         }
         Iterator<JdbcSourceSplit> iterator = unassigned.iterator();
-        JdbcSourceSplit next = null;
-        if (iterator.hasNext()) {
-            next = iterator.next();
-            iterator.remove();
-        }
-        return Optional.ofNullable(next);
+        JdbcSourceSplit next = iterator.next();
+        iterator.remove();
+        return Optional.of(next);
     }
 
     private void assignSplitsForBounded(int subtask, @Nullable String 
hostname) {
@@ -137,4 +168,39 @@ public class JdbcSourceEnumerator
             LOG.info("No more splits available for subtask {}", subtask);
         }
     }
+
+    private void processNewSplits(List<JdbcSourceSplit> splits, Throwable 
error) {
+        if (error != null) {
+            LOG.error("Failed to enumerate sql splits.", error);
+            return;
+        }
+        this.unassigned.addAll(splits);
+
+        assignSplitsForUnbounded();
+    }
+
+    private void assignSplitsForUnbounded() {
+        final Iterator<Map.Entry<Integer, String>> awaitingReader =
+                readersAwaitingSplit.entrySet().iterator();
+
+        while (awaitingReader.hasNext()) {
+            final Map.Entry<Integer, String> nextAwaiting = 
awaitingReader.next();
+
+            // if the reader that requested another split has failed in the 
meantime, remove
+            // it from the list of waiting readers
+            if 
(!context.registeredReaders().containsKey(nextAwaiting.getKey())) {
+                awaitingReader.remove();
+                continue;
+            }
+
+            final int awaitingSubtask = nextAwaiting.getKey();
+            final Optional<JdbcSourceSplit> nextSplit = getNextSplit();
+            if (nextSplit.isPresent()) {
+                context.assignSplit(nextSplit.get(), awaitingSubtask);
+                awaitingReader.remove();
+            } else {
+                break;
+            }
+        }
+    }
 }
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSqlSplitEnumeratorBase.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSqlSplitEnumeratorBase.java
index 7e0d70d1..25d09a71 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSqlSplitEnumeratorBase.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSqlSplitEnumeratorBase.java
@@ -21,11 +21,13 @@ package org.apache.flink.connector.jdbc.source.enumerator;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+import java.util.function.Supplier;
 
 /**
  * Base class for jdbc sql split enumerator.
@@ -64,10 +66,12 @@ public abstract class JdbcSqlSplitEnumeratorBase<SplitT> 
implements AutoCloseabl
     /**
      * Enumerate the JDBC splits.
      *
+     * @param splitGettable If the next batch splits are gettable.
      * @return The result splits generated by the split enumerator.
      * @throws IOException IO exception.
      */
-    public abstract List<JdbcSourceSplit> enumerateSplits() throws IOException;
+    public abstract List<JdbcSourceSplit> enumerateSplits(@Nonnull 
Supplier<Boolean> splitGettable)
+            throws IOException;
 
     /**
      * A provider to create or restore a JDBC sql splits enumerator.
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/SqlTemplateSplitEnumerator.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/SqlTemplateSplitEnumerator.java
index e4e51455..2099d0e0 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/SqlTemplateSplitEnumerator.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/SqlTemplateSplitEnumerator.java
@@ -19,22 +19,29 @@
 package org.apache.flink.connector.jdbc.source.enumerator;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.jdbc.source.split.CheckpointedOffset;
 import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
 import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.function.Supplier;
 
 /** A split enumerator based on sql-parameters grains. */
 public final class SqlTemplateSplitEnumerator extends 
JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> {
 
+    public static final Logger LOG = 
LoggerFactory.getLogger(SqlTemplateSplitEnumerator.class);
+
     private final String sqlTemplate;
 
     private final JdbcParameterValuesProvider parameterValuesProvider;
@@ -49,21 +56,40 @@ public final class SqlTemplateSplitEnumerator extends 
JdbcSqlSplitEnumeratorBase
     }
 
     @Override
-    public List<JdbcSourceSplit> enumerateSplits() throws IOException {
+    @Nonnull
+    public List<JdbcSourceSplit> enumerateSplits(@Nonnull Supplier<Boolean> 
splitGettable)
+            throws RuntimeException {
+
+        if (!splitGettable.get()) {
+            LOG.info(
+                    "The current split is over max splits capacity of {}.",
+                    JdbcSourceEnumerator.class.getSimpleName());
+            return Collections.emptyList();
+        }
+
         if (parameterValuesProvider == null) {
             return Collections.singletonList(
-                    new JdbcSourceSplit(getNextId(), sqlTemplate, null, 0, 
null));
+                    new JdbcSourceSplit(getNextId(), sqlTemplate, null, new 
CheckpointedOffset()));
+        }
+
+        if (optionalSqlSplitEnumeratorState != null) {
+            
parameterValuesProvider.setOptionalState(optionalSqlSplitEnumeratorState);
         }
         Serializable[][] parameters = 
parameterValuesProvider.getParameterValues();
+
+        // update state
+        optionalSqlSplitEnumeratorState = 
parameterValuesProvider.getLatestOptionalState();
+
         if (parameters == null) {
             return Collections.singletonList(
-                    new JdbcSourceSplit(getNextId(), sqlTemplate, null, 0, 
null));
+                    new JdbcSourceSplit(getNextId(), sqlTemplate, null, new 
CheckpointedOffset()));
         }
 
         List<JdbcSourceSplit> jdbcSourceSplitList = new 
ArrayList<>(parameters.length);
         for (Serializable[] paramArr : parameters) {
             jdbcSourceSplitList.add(
-                    new JdbcSourceSplit(getNextId(), sqlTemplate, paramArr, 0, 
null));
+                    new JdbcSourceSplit(
+                            getNextId(), sqlTemplate, paramArr, new 
CheckpointedOffset()));
         }
         return jdbcSourceSplitList;
     }
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java
index f31d1644..5011f42b 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java
@@ -48,6 +48,7 @@ import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Queue;
 
 import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT;
@@ -71,25 +72,28 @@ public class JdbcSourceSplitReader<T>
     private final Queue<JdbcSourceSplit> splits;
     private final TypeInformation<T> typeInformation;
     private final JdbcConnectionProvider connectionProvider;
+
     private transient Connection connection;
     private transient PreparedStatement statement;
+    // Boolean to distinguish between default value and explicitly set 
autoCommit mode.
+    private final Boolean autoCommit;
     private transient ResultSet resultSet;
+    protected boolean hasNextRecordCurrentSplit;
+    private int currentSplitOffset;
 
     private final ResultExtractor<T> resultExtractor;
-    protected boolean hasNextRecordCurrentSplit;
+
     private final DeliveryGuarantee deliveryGuarantee;
 
     private final int splitReaderFetchBatchSize;
-
     private final int resultSetType;
     private final int resultSetConcurrency;
     private final int resultSetFetchSize;
-    // Boolean to distinguish between default value and explicitly set 
autoCommit mode.
-    private final Boolean autoCommit;
-    private int currentSplitOffset;
 
     private final SourceReaderContext context;
 
+    private @Nullable JdbcSourceSplit skippedSplit;
+
     public JdbcSourceSplitReader(
             SourceReaderContext context,
             Configuration config,
@@ -120,7 +124,10 @@ public class JdbcSourceSplitReader<T>
     @Override
     public RecordsWithSplitIds<RecordAndOffset<T>> fetch() throws IOException {
 
-        checkSplitOrStartNext();
+        boolean couldFetch = checkSplitOrStartNext();
+        if (!couldFetch) {
+            return new RecordsBySplits.Builder<RecordAndOffset<T>>().build();
+        }
 
         if (!hasNextRecordCurrentSplit) {
             return finishSplit();
@@ -154,21 +161,35 @@ public class JdbcSourceSplitReader<T>
         closeResultSetAndStatement();
 
         RecordsBySplits.Builder<RecordAndOffset<T>> builder = new 
RecordsBySplits.Builder<>();
-        Preconditions.checkState(currentSplit != null, "currentSplit");
-        builder.addFinishedSplit(currentSplit.splitId());
+        JdbcSourceSplit splitToFinish = Objects.nonNull(currentSplit) ? 
currentSplit : skippedSplit;
+        Preconditions.checkState(splitToFinish != null, "Split to finish 
mustn't be null.");
+        builder.addFinishedSplit(splitToFinish.splitId());
         currentSplit = null;
+        skippedSplit = null;
         return builder.build();
     }
 
     private void closeResultSetAndStatement() {
+        closeResultSetIfNeeded();
+        closeStatementIfNeeded();
+    }
+
+    private void closeResultSetIfNeeded() {
         try {
             if (resultSet != null && !resultSet.isClosed()) {
                 resultSet.close();
             }
+            resultSet = null;
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void closeStatementIfNeeded() {
+        try {
             if (statement != null && !statement.isClosed()) {
                 statement.close();
             }
-            resultSet = null;
             statement = null;
         } catch (SQLException e) {
             throw new RuntimeException(e);
@@ -206,40 +227,47 @@ public class JdbcSourceSplitReader<T>
         return typeInformation;
     }
 
-    private void checkSplitOrStartNext() {
+    @VisibleForTesting
+    public List<JdbcSourceSplit> getSplits() {
+        return Collections.unmodifiableList(Arrays.asList(splits.toArray(new 
JdbcSourceSplit[0])));
+    }
+
+    @VisibleForTesting
+    public Connection getConnection() {
+        return connection;
+    }
+
+    @VisibleForTesting
+    public PreparedStatement getStatement() {
+        return statement;
+    }
+
+    @VisibleForTesting
+    public ResultSet getResultSet() {
+        return resultSet;
+    }
+
+    // ---------------- Private methods --------------------------------
+
+    private boolean checkSplitOrStartNext() {
 
         try {
             if (hasNextRecordCurrentSplit && resultSet != null) {
-                return;
+                return true;
             }
 
             final JdbcSourceSplit nextSplit = splits.poll();
-            if (nextSplit == null) {
-                throw new IOException("Cannot fetch from another split - no 
split remaining");
+            if (nextSplit != null) {
+                currentSplit = nextSplit;
+                openResultSetForSplit(currentSplit);
+                return true;
             }
-            currentSplit = nextSplit;
-            openResultSetForSplit(currentSplit);
-        } catch (SQLException | ClassNotFoundException | IOException e) {
+            return false;
+        } catch (SQLException | ClassNotFoundException e) {
             throw new RuntimeException(e);
         }
     }
 
-    private void discardSplit(JdbcSourceSplit split) throws SQLException {
-        if (split.getOffset() != 0) {
-            hasNextRecordCurrentSplit = false;
-            currentSplitOffset = 0;
-            if (resultSet != null && !resultSet.isClosed()) {
-                resultSet.close();
-            }
-            if (statement != null && !statement.isClosed()) {
-                statement.close();
-            }
-            resultSet = null;
-            statement = null;
-            currentSplit = null;
-        }
-    }
-
     private void getOrEstablishConnection() throws SQLException, 
ClassNotFoundException {
         connection = connectionProvider.getOrEstablishConnection();
         if (autoCommit == null) {
@@ -252,7 +280,79 @@ public class JdbcSourceSplitReader<T>
 
     private void openResultSetForSplit(JdbcSourceSplit split)
             throws SQLException, ClassNotFoundException {
+        switch (deliveryGuarantee) {
+            case EXACTLY_ONCE:
+                openResultSetForSplitWhenExactlyOnce(split);
+                break;
+            case AT_LEAST_ONCE:
+                openResultSetForSplitWhenAtLeastOnce(split);
+                break;
+            case NONE:
+            default:
+                openResultSetForSplitWhenAtMostOnce(split);
+                break;
+        }
+    }
+
+    private void openResultSetForSplitWhenAtMostOnce(JdbcSourceSplit split)
+            throws SQLException, ClassNotFoundException {
+        if (split.getReaderPosition() != 0) {
+            skippedSplit = currentSplit;
+            currentSplit = null;
+            hasNextRecordCurrentSplit = false;
+            currentSplitOffset = 0;
+            closeResultSetAndStatement();
+        } else {
+            openResultSetForSplitWhenAtLeastOnce(split);
+        }
+    }
+
+    private void openResultSetForSplitWhenExactlyOnce(JdbcSourceSplit split)
+            throws SQLException, ClassNotFoundException {
         getOrEstablishConnection();
+        closeResultSetIfNeeded();
+        prepareStatement(split);
+        resultSet = statement.executeQuery();
+        currentSplitOffset = 0;
+        hasNextRecordCurrentSplit = resultSet.next();
+        if (hasNextRecordCurrentSplit) {
+            moveResultSetCursorByOffset();
+        }
+    }
+
+    private void moveResultSetCursorByOffset() throws SQLException {
+        int resultSetOffset = currentSplit.getReaderPosition();
+        if (resultSetOffset == 0) {
+            return;
+        }
+        resultSet.last();
+        int last = resultSet.getRow();
+        resultSet.absolute(1);
+        if (resultSetOffset < last) {
+            currentSplitOffset = resultSetOffset;
+            resultSet.absolute(resultSetOffset + 1);
+        } else {
+            hasNextRecordCurrentSplit = false;
+            LOG.warn(
+                    "The offset will not be set from splitState, because the 
last cursor is {}, the expected cursor is {}.",
+                    last,
+                    resultSetOffset + 1);
+        }
+    }
+
+    private void openResultSetForSplitWhenAtLeastOnce(JdbcSourceSplit split)
+            throws SQLException, ClassNotFoundException {
+        getOrEstablishConnection();
+        closeResultSetIfNeeded();
+        prepareStatement(split);
+        resultSet = statement.executeQuery();
+        // AT_LEAST_ONCE
+        hasNextRecordCurrentSplit = resultSet.next();
+        currentSplitOffset = 0;
+    }
+
+    private void prepareStatement(JdbcSourceSplit split) throws SQLException {
+        closeStatementIfNeeded();
         statement =
                 connection.prepareStatement(
                         split.getSqlTemplate(), resultSetType, 
resultSetConcurrency);
@@ -263,27 +363,5 @@ public class JdbcSourceSplitReader<T>
             }
         }
         statement.setFetchSize(resultSetFetchSize);
-        resultSet = statement.executeQuery();
-        hasNextRecordCurrentSplit = resultSet.next();
-    }
-
-    @VisibleForTesting
-    public List<JdbcSourceSplit> getSplits() {
-        return Collections.unmodifiableList(Arrays.asList(splits.toArray(new 
JdbcSourceSplit[0])));
-    }
-
-    @VisibleForTesting
-    public Connection getConnection() {
-        return connection;
-    }
-
-    @VisibleForTesting
-    public PreparedStatement getStatement() {
-        return statement;
-    }
-
-    @VisibleForTesting
-    public ResultSet getResultSet() {
-        return resultSet;
     }
 }
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplit.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplit.java
index af71a58a..7a690ca9 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplit.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplit.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.jdbc.source.split;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
@@ -38,27 +39,19 @@ public class JdbcSourceSplit implements SourceSplit, 
Serializable {
 
     private final @Nullable Serializable[] parameters;
 
-    private final int offset;
-
     private final @Nullable CheckpointedOffset checkpointedOffset;
 
     public JdbcSourceSplit(
             String id,
             String sqlTemplate,
             @Nullable Serializable[] parameters,
-            int offset,
             @Nullable CheckpointedOffset checkpointedOffset) {
         this.id = id;
         this.sqlTemplate = sqlTemplate;
         this.parameters = parameters;
-        this.offset = offset;
         this.checkpointedOffset = checkpointedOffset;
     }
 
-    public int getOffset() {
-        return offset;
-    }
-
     @Nullable
     public CheckpointedOffset getCheckpointedOffset() {
         return checkpointedOffset;
@@ -66,13 +59,21 @@ public class JdbcSourceSplit implements SourceSplit, 
Serializable {
 
     public JdbcSourceSplit updateWithCheckpointedPosition(
             @Nullable CheckpointedOffset checkpointedOffset) {
-        return new JdbcSourceSplit(id, sqlTemplate, parameters, offset, 
checkpointedOffset);
+        return new JdbcSourceSplit(id, sqlTemplate, parameters, 
checkpointedOffset);
     }
 
-    public Optional<CheckpointedOffset> getReaderPosition() {
+    public Optional<CheckpointedOffset> getReaderPositionOptional() {
         return Optional.ofNullable(checkpointedOffset);
     }
 
+    public int getReaderPosition() {
+        if (Objects.nonNull(checkpointedOffset)) {
+            Preconditions.checkState(checkpointedOffset.getOffset() <= 
Integer.MAX_VALUE);
+            return (int) checkpointedOffset.getOffset();
+        }
+        return 0;
+    }
+
     public String getSqlTemplate() {
         return sqlTemplate;
     }
@@ -96,8 +97,7 @@ public class JdbcSourceSplit implements SourceSplit, 
Serializable {
             return false;
         }
         JdbcSourceSplit that = (JdbcSourceSplit) o;
-        return offset == that.offset
-                && Objects.equals(id, that.id)
+        return Objects.equals(id, that.id)
                 && Objects.equals(sqlTemplate, that.sqlTemplate)
                 && Arrays.equals(parameters, that.parameters)
                 && Objects.equals(checkpointedOffset, that.checkpointedOffset);
@@ -105,7 +105,7 @@ public class JdbcSourceSplit implements SourceSplit, 
Serializable {
 
     @Override
     public int hashCode() {
-        int result = Objects.hash(id, sqlTemplate, offset, checkpointedOffset);
+        int result = Objects.hash(id, sqlTemplate, checkpointedOffset);
         result = 31 * result + Arrays.hashCode(parameters);
         return result;
     }
@@ -121,8 +121,6 @@ public class JdbcSourceSplit implements SourceSplit, 
Serializable {
                 + '\''
                 + ", parameters="
                 + Arrays.toString(parameters)
-                + ", offset="
-                + offset
                 + ", checkpointedOffset="
                 + checkpointedOffset
                 + '}';
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializer.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializer.java
index 4aadfef1..69a23406 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializer.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializer.java
@@ -81,8 +81,6 @@ public class JdbcSourceSplitSerializer implements 
SimpleVersionedSerializer<Jdbc
         out.writeInt(paramsBytes.length);
         out.write(paramsBytes);
 
-        out.writeInt(sourceSplit.getOffset());
-
         CheckpointedOffset checkpointedOffset = 
sourceSplit.getCheckpointedOffset();
         byte[] chkOffset = 
InstantiationUtil.serializeObject(checkpointedOffset);
         out.writeInt(chkOffset.length);
@@ -100,14 +98,12 @@ public class JdbcSourceSplitSerializer implements 
SimpleVersionedSerializer<Jdbc
                 InstantiationUtil.deserializeObject(
                         parametersBytes, in.getClass().getClassLoader());
 
-        int offset = in.readInt();
-
         int chkOffsetBytesLen = in.readInt();
         byte[] chkOffsetBytes = new byte[chkOffsetBytesLen];
         in.read(chkOffsetBytes);
         CheckpointedOffset chkOffset =
                 InstantiationUtil.deserializeObject(chkOffsetBytes, 
in.getClass().getClassLoader());
 
-        return new JdbcSourceSplit(id, sqlTemplate, params, offset, chkOffset);
+        return new JdbcSourceSplit(id, sqlTemplate, params, chkOffset);
     }
 }
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitState.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitState.java
index f74c381b..2e3b2c3d 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitState.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitState.java
@@ -37,7 +37,7 @@ public class JdbcSourceSplitState<SplitT extends 
JdbcSourceSplit> implements Ser
     public JdbcSourceSplitState(SplitT split) {
         this.split = checkNotNull(split);
 
-        final Optional<CheckpointedOffset> readerPosition = 
split.getReaderPosition();
+        final Optional<CheckpointedOffset> readerPosition = 
split.getReaderPositionOptional();
         if (readerPosition.isPresent()) {
             this.offset = readerPosition.get().getOffset();
             this.recordsToSkipAfterOffset = 
readerPosition.get().getRecordsAfterOffset();
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcParameterValuesProvider.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcParameterValuesProvider.java
index 8dcceb28..3f1c2aed 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcParameterValuesProvider.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcParameterValuesProvider.java
@@ -33,4 +33,12 @@ public interface JdbcParameterValuesProvider extends 
Serializable {
 
     /** Returns the necessary parameters array to use for query in parallel a 
table. */
     Serializable[][] getParameterValues();
+
+    /** Get the latest optional state data. */
+    default Serializable getLatestOptionalState() {
+        return null;
+    }
+
+    /** Set the optional state data. */
+    default void setOptionalState(Serializable optionalState) {}
 }
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcSlideTimingParameterProvider.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcSlideTimingParameterProvider.java
new file mode 100644
index 00000000..eae5891e
--- /dev/null
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcSlideTimingParameterProvider.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.split;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/** The parameters provider generate parameters by slide timing window 
strategy. */
+@PublicEvolving
+public class JdbcSlideTimingParameterProvider implements 
JdbcParameterValuesProvider {
+
+    private final long slideStepMills;
+    private final long slideSpanMills;
+    private final long splitGenerateDelayMillis;
+
+    private @Nonnull Long startMills;
+
+    public JdbcSlideTimingParameterProvider(
+            Long startMills,
+            long slideSpanMills,
+            long slideStepMills,
+            long splitGenerateDelayMillis) {
+        this.startMills = Preconditions.checkNotNull(startMills);
+        Preconditions.checkArgument(
+                startMills > 0L,
+                "'startMillis' of JdbcSlideTimingParameterProvider must be 
greater than 0. ");
+        Preconditions.checkArgument(
+                slideSpanMills > 0 || slideStepMills > 0,
+                "JdbcSlideTimingParameterProvider parameters must satisfy "
+                        + "slideSpanMills > 0 and slideStepMills > 0");
+        Preconditions.checkArgument(
+                splitGenerateDelayMillis >= 0L,
+                "JdbcSlideTimingParameterProvider parameters must satisfy "
+                        + "splitGenerateDelayMillis >= 0");
+        this.slideStepMills = slideStepMills;
+        this.slideSpanMills = slideSpanMills;
+        this.splitGenerateDelayMillis = splitGenerateDelayMillis;
+    }
+
+    private boolean nextSplitAvailable(Long nextSpanStartMillis) {
+        final long delayedNextSpanStartMillis = nextSpanStartMillis + 
splitGenerateDelayMillis;
+        final long currentAvailableMillis = currentAvailableMillis();
+        return currentAvailableMillis >= delayedNextSpanStartMillis
+                && (currentAvailableMillis - delayedNextSpanStartMillis >= 
slideSpanMills);
+    }
+
+    @Override
+    public Long getLatestOptionalState() {
+        return startMills;
+    }
+
+    @Override
+    public void setOptionalState(Serializable optionalState) {
+        Preconditions.checkArgument((Long) optionalState > 0L);
+        this.startMills = (Long) optionalState;
+    }
+
+    public Long currentAvailableMillis() {
+        return System.currentTimeMillis();
+    }
+
+    @Override
+    public Serializable[][] getParameterValues() {
+        List<Serializable[]> tmpList = new ArrayList<>();
+        while (nextSplitAvailable(startMills)) {
+            Serializable[] params = new Serializable[] {startMills, startMills 
+ slideSpanMills};
+            tmpList.add(params);
+            startMills += slideStepMills;
+        }
+        return tmpList.toArray(new Serializable[0][]);
+    }
+}
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/ContinuousUnBoundingSettings.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/ContinuousUnBoundingSettings.java
new file mode 100644
index 00000000..db951ab6
--- /dev/null
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/ContinuousUnBoundingSettings.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Settings describing how to do continuous file discovery and enumeration for 
the file source's
+ * continuous discovery and streaming mode.
+ */
+@PublicEvolving
+public final class ContinuousUnBoundingSettings implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Duration initialDiscoveryDelay;
+    private final Duration discoveryInterval;
+
+    public ContinuousUnBoundingSettings(
+            Duration initialDiscoveryDelay, Duration discoveryInterval) {
+        this.initialDiscoveryDelay = initialDiscoveryDelay;
+        this.discoveryInterval = checkNotNull(discoveryInterval);
+    }
+
+    public Duration getDiscoveryInterval() {
+        return discoveryInterval;
+    }
+
+    public Duration getInitialDiscoveryDelay() {
+        return initialDiscoveryDelay;
+    }
+
+    // ------------------------------------------------------------------------
+
+    @Override
+    public String toString() {
+        return "ContinuousUnBoundingSettings{"
+                + "initialDiscoveryDelay="
+                + initialDiscoveryDelay
+                + ", discoveryInterval="
+                + discoveryInterval
+                + '}';
+    }
+
+    @Override
+    public boolean equals(Object object) {
+        if (this == object) {
+            return true;
+        }
+
+        if (object == null || getClass() != object.getClass()) {
+            return false;
+        }
+
+        ContinuousUnBoundingSettings that = (ContinuousUnBoundingSettings) 
object;
+        return Objects.equals(initialDiscoveryDelay, 
that.initialDiscoveryDelay)
+                && Objects.equals(discoveryInterval, that.discoveryInterval);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(initialDiscoveryDelay, discoveryInterval);
+    }
+}
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilderTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilderTest.java
index e9b15736..a760749a 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilderTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilderTest.java
@@ -22,12 +22,18 @@ import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import 
org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator;
 import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import 
org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
 import 
org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
 import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
+import org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider;
+import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
 import org.apache.flink.types.Row;
 
 import org.junit.jupiter.api.Test;
 
+import java.io.Serializable;
+import java.time.Duration;
+
 import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE;
 import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -116,8 +122,6 @@ class JdbcSourceBuilderTest {
     void testSetConnectionInfo() {
         assertThatThrownBy(() -> JdbcSource.builder().setDriverName(""))
                 .isInstanceOf(IllegalArgumentException.class);
-        assertThatThrownBy(() -> JdbcSource.builder().setUsername(""))
-                .isInstanceOf(IllegalArgumentException.class);
         assertThatThrownBy(() -> JdbcSource.builder().setDBUrl(""))
                 .isInstanceOf(IllegalArgumentException.class);
     }
@@ -142,4 +146,39 @@ class JdbcSourceBuilderTest {
                                         .build())
                 .isInstanceOf(NullPointerException.class);
     }
+
+    @Test
+    void testSetStreamingSemantic() {
+        assertThatThrownBy(
+                        () ->
+                                sourceBuilder
+                                        .setContinuousUnBoundingSettings(
+                                                new 
ContinuousUnBoundingSettings(
+                                                        Duration.ofMillis(1L),
+                                                        Duration.ofMillis(1L)))
+                                        .build())
+                .isInstanceOf(IllegalArgumentException.class)
+                
.hasMessage(JdbcSourceBuilder.INVALID_SLIDE_TIMING_CONTINUOUS_HINT);
+
+        assertThatThrownBy(
+                        () ->
+                                sourceBuilder
+                                        .setJdbcParameterValuesProvider(
+                                                new 
JdbcGenericParameterValuesProvider(
+                                                        new Serializable[][] 
{}))
+                                        .build())
+                .isInstanceOf(IllegalArgumentException.class)
+                
.hasMessage(JdbcSourceBuilder.INVALID_SLIDE_TIMING_CONTINUOUS_HINT);
+
+        sourceBuilder.setContinuousUnBoundingSettings(null);
+        assertThatThrownBy(
+                        () ->
+                                sourceBuilder
+                                        .setJdbcParameterValuesProvider(
+                                                new 
JdbcSlideTimingParameterProvider(
+                                                        1L, 1L, 1L, 1L))
+                                        .build())
+                .isInstanceOf(IllegalArgumentException.class)
+                
.hasMessage(JdbcSourceBuilder.INVALID_CONTINUOUS_SLIDE_TIMING_HINT);
+    }
 }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceStreamRelatedITCase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceStreamRelatedITCase.java
new file mode 100644
index 00000000..8e9f61cf
--- /dev/null
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceStreamRelatedITCase.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.databases.derby.DerbyTestBase;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider;
+import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase;
+import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.util.Collector;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for streaming semantic related cases of {@link JdbcSource}. */
+class JdbcSourceStreamRelatedITCase implements DerbyTestBase, JdbcITCaseBase {
+
+    private static final long ONE_SECOND = Duration.ofSeconds(1L).toMillis();
+    private static final int TESTING_PARALLELISM = 2;
+    private static final long INTERVAL_OF_GENERATING = 23L;
+    private static final int TESTING_ENTRIES_SIZE = 200;
+    private static final int DATA_NUM_PER_SECOND_SPAN_SPLIT =
+            (int) (ONE_SECOND / INTERVAL_OF_GENERATING + 1);
+    private static final String testingTable = "t_testing";
+    private static final String CREATE_SQL =
+            "CREATE TABLE "
+                    + testingTable
+                    + " ("
+                    + "id bigint NOT NULL, "
+                    + "ts bigint NOT NULL, "
+                    + "PRIMARY KEY (id))";
+    private static final ContinuousUnBoundingSettings CONTINUOUS_SETTINGS =
+            new ContinuousUnBoundingSettings(Duration.ofMillis(10L), 
Duration.ofSeconds(1L));
+    private static final ResultExtractor<TestEntry> EXTRACTOR =
+            resultSet -> new TestEntry(resultSet.getLong("id"), 
resultSet.getLong("ts"));
+    private static final List<TestEntry> testEntries = new 
ArrayList<>(TESTING_ENTRIES_SIZE);
+
+    private static Queue<TestEntry> collectedRecords;
+    private static long globalStartMillis;
+    private static long globalDataEndMillis;
+
+    private JdbcSourceBuilder<TestEntry> jdbcSourceBuilder;
+
+    @BeforeEach
+    void initData() {
+        testEntries.clear();
+        quickExecutionSQL(CREATE_SQL);
+        generateTestEntries();
+        String insertSQL = generateInsertSQL();
+        quickExecutionSQL(insertSQL);
+
+        JdbcSlideTimingParameterProvider slideTimingParamsProvider =
+                new JdbcSlideTimingParameterProvider(
+                        globalStartMillis, ONE_SECOND, ONE_SECOND, 100L);
+        jdbcSourceBuilder =
+                JdbcSource.<TestEntry>builder()
+                        
.setTypeInformation(TypeInformation.of(TestEntry.class))
+                        .setSql("select * from " + testingTable + " where ts 
>= ? and ts < ?")
+                        .setDBUrl(getMetadata().getJdbcUrl())
+                        .setUsername(getMetadata().getUsername())
+                        .setPassword(getMetadata().getPassword())
+                        .setContinuousUnBoundingSettings(CONTINUOUS_SETTINGS)
+                        
.setJdbcParameterValuesProvider(slideTimingParamsProvider)
+                        .setDriverName(getMetadata().getDriverClass())
+                        .setResultExtractor(EXTRACTOR);
+
+        collectedRecords = new ConcurrentLinkedDeque<>();
+    }
+
+    @AfterEach
+    void clearData() {
+        quickExecutionSQL("delete from " + testingTable);
+        quickExecutionSQL("DROP TABLE " + testingTable);
+    }
+
+    @ParameterizedTest
+    @EnumSource(DeliveryGuarantee.class)
+    void testForNormalCaseWithoutFailure(
+            DeliveryGuarantee guarantee, @InjectClusterClient ClusterClient<?> 
client)
+            throws Exception {
+        // Test continuous + unbounded splits
+        StreamExecutionEnvironment env = 
getEnvWithRestartStrategyParallelism();
+
+        jdbcSourceBuilder.setDeliveryGuarantee(guarantee);
+        if (DeliveryGuarantee.EXACTLY_ONCE == guarantee) {
+            
jdbcSourceBuilder.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE);
+        }
+        env.fromSource(jdbcSourceBuilder.build(), 
WatermarkStrategy.noWatermarks(), "TestSource")
+                .addSink(new TestingSinkFunction());
+        waitExpectation(client, env, () -> collectedRecords.size() >= 
TESTING_ENTRIES_SIZE);
+
+        
assertThat(collectedRecords).containsExactlyInAnyOrderElementsOf(testEntries);
+    }
+
+    @Test
+    void testExactlyOnceWithFailure(@InjectClusterClient ClusterClient<?> 
client) throws Exception {
+        // Test continuous + unbounded splits
+        StreamExecutionEnvironment env = 
getEnvWithRestartStrategyParallelism();
+        jdbcSourceBuilder
+                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
+                .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE);
+
+        env.fromSource(jdbcSourceBuilder.build(), 
WatermarkStrategy.noWatermarks(), "TestSource")
+                .keyBy(testEntry -> 0L)
+                .process(new TestingKeyProcessFunction())
+                .setParallelism(1);
+
+        waitExpectation(client, env, () -> collectedRecords.size() >= 
TESTING_ENTRIES_SIZE);
+
+        
assertThat(collectedRecords).containsExactlyInAnyOrderElementsOf(testEntries);
+    }
+
+    @Test
+    void testAtLeastOnceWithFailure(@InjectClusterClient ClusterClient<?> 
client) throws Exception {
+        // Test continuous + unbounded splits
+        StreamExecutionEnvironment env = 
getEnvWithRestartStrategyParallelism();
+        
jdbcSourceBuilder.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE);
+
+        env.fromSource(jdbcSourceBuilder.build(), 
WatermarkStrategy.noWatermarks(), "TestSource")
+                .keyBy(testEntry -> 0L)
+                .process(new TestingKeyProcessFunction())
+                .setParallelism(1);
+
+        waitExpectation(
+                client, env, () -> new HashSet<>(collectedRecords).size() >= 
TESTING_ENTRIES_SIZE);
+
+        assertThat(collectedRecords)
+                .hasSizeGreaterThanOrEqualTo(testEntries.size())
+                .containsAll(testEntries);
+    }
+
+    @Test
+    void testAtMostOnceWithFailure(@InjectClusterClient ClusterClient<?> 
client) throws Exception {
+        // Test continuous + unbounded splits
+        StreamExecutionEnvironment env = 
getEnvWithRestartStrategyParallelism();
+
+        env.fromSource(jdbcSourceBuilder.build(), 
WatermarkStrategy.noWatermarks(), "TestSource")
+                .keyBy(testEntry -> 0L)
+                .process(new TestingKeyProcessFunction())
+                .setParallelism(1);
+
+        waitExpectation(
+                client,
+                env,
+                () ->
+                        Math.abs(collectedRecords.size() - testEntries.size())
+                                <= DATA_NUM_PER_SECOND_SPAN_SPLIT * 
TESTING_PARALLELISM);
+
+        assertThat(testEntries)
+                .hasSizeGreaterThanOrEqualTo(collectedRecords.size())
+                .containsAll(collectedRecords);
+    }
+
+    private static void sleep(long millis) {
+        try {
+            Thread.sleep(millis);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void waitExpectation(
+            ClusterClient<?> client, StreamExecutionEnvironment env, 
Supplier<Boolean> condition)
+            throws Exception {
+        JobID jobID = env.executeAsync().getJobID();
+        CompletableFuture<Void> future =
+                CompletableFuture.runAsync(
+                        () -> {
+                            while (true) {
+                                if (condition.get()) {
+                                    client.cancel(jobID);
+                                    break;
+                                }
+                                sleep(50);
+                            }
+                        });
+        future.get();
+    }
+
+    private void generateTestEntries() {
+        // The data is distributed in 1 min.
+        long millisAnchor = System.currentTimeMillis();
+        globalStartMillis = millisAnchor - INTERVAL_OF_GENERATING * 
TESTING_ENTRIES_SIZE / 2;
+        globalDataEndMillis =
+                globalDataEndMillis + millisAnchor + INTERVAL_OF_GENERATING * 
TESTING_ENTRIES_SIZE;
+        long startMillis = globalStartMillis;
+        for (int i = 0; i < TESTING_ENTRIES_SIZE; i++) {
+            testEntries.add(new TestEntry(i + 1, startMillis));
+            startMillis += INTERVAL_OF_GENERATING;
+        }
+    }
+
+    @Nonnull
+    private static String generateInsertSQL() {
+        StringBuilder sqlQueryBuilder =
+                new StringBuilder("INSERT INTO " + testingTable + " (id, ts) 
VALUES ");
+        for (int i = 0; i < testEntries.size(); i++) {
+            sqlQueryBuilder
+                    .append("(")
+                    .append(testEntries.get(i).id)
+                    .append(",")
+                    .append(testEntries.get(i).ts)
+                    .append(")");
+            if (i < testEntries.size() - 1) {
+                sqlQueryBuilder.append(",");
+            }
+        }
+        return sqlQueryBuilder.toString();
+    }
+
+    private void quickExecutionSQL(String testingTable) {
+        try (Connection conn = getConnection();
+                Statement stat = conn.createStatement()) {
+            stat.execute(testingTable);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Connection getConnection() throws SQLException {
+        return DriverManager.getConnection(
+                getMetadata().getJdbcUrl(),
+                getMetadata().getUsername(),
+                getMetadata().getPassword());
+    }
+
+    @Nonnull
+    private static StreamExecutionEnvironment 
getEnvWithRestartStrategyParallelism() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(new 
RestartStrategies.FallbackRestartStrategyConfiguration());
+
+        env.setParallelism(TESTING_PARALLELISM);
+        env.enableCheckpointing(MINIMAL_CHECKPOINT_TIME);
+        return env;
+    }
+
+    public static class TestEntry implements Serializable {
+        public long id;
+        public long ts;
+
+        public TestEntry(long id, long ts) {
+            this.id = id;
+            this.ts = ts;
+        }
+
+        @Override
+        public boolean equals(Object object) {
+            if (this == object) {
+                return true;
+            }
+
+            if (object == null || getClass() != object.getClass()) {
+                return false;
+            }
+
+            TestEntry testEntry = (TestEntry) object;
+            return id == testEntry.id && ts == testEntry.ts;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(id, ts);
+        }
+
+        @Override
+        public String toString() {
+            return "TestEntry{" + "id=" + id + ", ts=" + ts + '}';
+        }
+    }
+
+    /** A process function for testing. */
+    static class TestingKeyProcessFunction
+            extends KeyedProcessFunction<Long, TestEntry, TestEntry> {
+        private transient ListState<TestEntry> listState;
+        private boolean errorOccurred = false;
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            super.open(parameters);
+            listState =
+                    getRuntimeContext()
+                            .getListState(
+                                    new ListStateDescriptor<>(
+                                            "collectedElements", 
TestEntry.class));
+        }
+
+        @Override
+        public void processElement(
+                TestEntry value,
+                KeyedProcessFunction<Long, TestEntry, TestEntry>.Context ctx,
+                Collector<TestEntry> out)
+                throws Exception {
+            if (value.id == testEntries.size() / 2 && 
getRuntimeContext().getAttemptNumber() < 1) {
+                throw new RuntimeException();
+            }
+            listState.add(value);
+            if (getRuntimeContext().getAttemptNumber() != 0) {
+                errorOccurred = true;
+            }
+            if (errorOccurred) {
+                collectedRecords.clear();
+                listState.get().forEach(collectedRecords::add);
+                errorOccurred = false;
+            } else {
+                collectedRecords.add(value);
+            }
+            if (value.id % 17 == 0) {
+                sleep(MINIMAL_CHECKPOINT_TIME * 2);
+            }
+        }
+    }
+
+    /** A sink function to collect the records. */
+    static class TestingSinkFunction implements SinkFunction<TestEntry> {
+
+        @Override
+        public void invoke(TestEntry value, Context context) throws Exception {
+            collectedRecords.add(value);
+        }
+    }
+}
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializerTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializerTest.java
index fe25ebbb..ce7548d2 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializerTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializerTest.java
@@ -37,15 +37,14 @@ class JdbcSourceEnumStateSerializerTest {
 
     private final JdbcSourceEnumeratorState state =
             new JdbcSourceEnumeratorState(
-                    Arrays.asList(new JdbcSourceSplit("1", "select 1", null, 
0, null)),
+                    Arrays.asList(new JdbcSourceSplit("1", "select 1", null, 
null)),
                     Arrays.asList(
                             new JdbcSourceSplit(
                                     "1",
                                     "select 1",
                                     new Serializable[] {new Integer(0)},
-                                    10,
                                     new CheckpointedOffset(0, 10))),
-                    Arrays.asList(new JdbcSourceSplit("1", "select 1", null, 
0, null)),
+                    Arrays.asList(new JdbcSourceSplit("1", "select 1", null, 
null)),
                     null);
     private final JdbcSourceEnumeratorState mockedState = new 
MockedJdbcSourceEnumState(state);
     private final JdbcSourceEnumStateSerializer serializer =
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorTest.java
index 97e31f47..443fb05b 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorTest.java
@@ -26,11 +26,14 @@ import 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumerator
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -93,7 +96,6 @@ class JdbcSourceEnumeratorTest {
                 String.valueOf(splitId++),
                 "select 1",
                 new Serializable[] {0},
-                0,
                 new CheckpointedOffset(0, 0));
     }
 
@@ -105,10 +107,12 @@ class JdbcSourceEnumeratorTest {
                 context,
                 new JdbcSqlSplitEnumeratorBase<JdbcSourceSplit>(null) {
                     @Override
-                    public List<JdbcSourceSplit> enumerateSplits() throws 
IOException {
+                    public @Nonnull List<JdbcSourceSplit> enumerateSplits(
+                            @Nonnull Supplier<Boolean> splitGettable) throws 
IOException {
                         return Collections.emptyList();
                     }
                 },
+                null,
                 Arrays.stream(splits).collect(Collectors.toList()));
     }
 }
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReaderTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReaderTest.java
index 91a75805..f549869f 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReaderTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReaderTest.java
@@ -51,7 +51,7 @@ class JdbcSourceReaderTest extends JdbcDataTestBase {
         final TestingReaderContext context = new TestingReaderContext();
         final JdbcSourceReader<String> reader = createReader(context);
         reader.addSplits(
-                Collections.singletonList(new JdbcSourceSplit("1", "select 1", 
null, 0, null)));
+                Collections.singletonList(new JdbcSourceSplit("1", "select 1", 
null, null)));
         reader.start();
         reader.close();
         assertThat(context.getNumSplitRequests()).isEqualTo(0);
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReaderTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReaderTest.java
index 2a730b43..7ebe0e12 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReaderTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReaderTest.java
@@ -49,7 +49,7 @@ class JdbcSourceSplitReaderTest extends JdbcDataTestBase {
 
     private final JdbcSourceSplit split =
             new JdbcSourceSplit(
-                    "1", "select id, title, author, price, qty from " + 
INPUT_TABLE, null, 0, null);
+                    "1", "select id, title, author, price, qty from " + 
INPUT_TABLE, null, null);
     private final JdbcConnectionProvider connectionProvider =
             new SimpleJdbcConnectionProvider(
                     new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializerTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializerTest.java
index b7b60afe..9d3f6516 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializerTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializerTest.java
@@ -30,7 +30,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 /** Test for {@link JdbcSourceSplitSerializer}. */
 class JdbcSourceSplitSerializerTest {
 
-    private final JdbcSourceSplit split = new JdbcSourceSplit("1", "select 1", 
null, 0, null);
+    private final JdbcSourceSplit split = new JdbcSourceSplit("1", "select 1", 
null, null);
     private final JdbcSourceSplit mockedSplit = new 
MockedJdbcSourceSplit(split);
     private final JdbcSourceSplitSerializer serializer = new 
JdbcSourceSplitSerializer();
     private final JdbcSourceSplitSerializer mockedSerializer =
@@ -73,7 +73,6 @@ class JdbcSourceSplitSerializerTest {
                     jdbcSourceSplit.splitId(),
                     jdbcSourceSplit.getSqlTemplate(),
                     (Serializable[]) jdbcSourceSplit.getParameters(),
-                    jdbcSourceSplit.getOffset(),
                     jdbcSourceSplit.getCheckpointedOffset());
         }
     }

Reply via email to