This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
commit 190238a05bebe9a092e9cec84627127781d4d859 Author: Roc Marshal <flin...@126.com> AuthorDate: Fri May 10 21:18:11 2024 +0800 [FLINK-33461][Connector/JDBC] Support streaming related semantics for the new JDBC source --- .../connections/SimpleJdbcConnectionProvider.java | 1 + .../flink/connector/jdbc/source/JdbcSource.java | 40 +-- .../connector/jdbc/source/JdbcSourceBuilder.java | 87 ++++- .../source/enumerator/JdbcSourceEnumerator.java | 88 ++++- .../enumerator/JdbcSqlSplitEnumeratorBase.java | 6 +- .../enumerator/SqlTemplateSplitEnumerator.java | 36 +- .../jdbc/source/reader/JdbcSourceSplitReader.java | 186 +++++++--- .../jdbc/source/split/JdbcSourceSplit.java | 28 +- .../source/split/JdbcSourceSplitSerializer.java | 6 +- .../jdbc/source/split/JdbcSourceSplitState.java | 2 +- .../jdbc/split/JdbcParameterValuesProvider.java | 8 + .../split/JdbcSlideTimingParameterProvider.java | 94 ++++++ .../jdbc/utils/ContinuousUnBoundingSettings.java | 86 +++++ .../jdbc/source/JdbcSourceBuilderTest.java | 43 ++- .../jdbc/source/JdbcSourceStreamRelatedITCase.java | 374 +++++++++++++++++++++ .../JdbcSourceEnumStateSerializerTest.java | 5 +- .../enumerator/JdbcSourceEnumeratorTest.java | 8 +- .../jdbc/source/reader/JdbcSourceReaderTest.java | 2 +- .../source/reader/JdbcSourceSplitReaderTest.java | 2 +- .../split/JdbcSourceSplitSerializerTest.java | 3 +- 20 files changed, 971 insertions(+), 134 deletions(-) diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java index 811210af..4c48f799 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/datasource/connections/SimpleJdbcConnectionProvider.java @@ -79,6 +79,7 @@ public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, Ser @Override public boolean isConnectionValid() throws SQLException { return connection != null + && !connection.isClosed() && connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds()); } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java index fc145feb..08e4a777 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java @@ -40,9 +40,12 @@ import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader; import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer; +import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + import java.io.Serializable; import java.util.ArrayList; import java.util.Objects; @@ -55,6 +58,7 @@ public class JdbcSource<OUT> private final Boundedness boundedness; private final TypeInformation<OUT> typeInformation; + private final @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings; private final Configuration configuration; private final JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider; @@ -69,29 +73,20 @@ public class JdbcSource<OUT> JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider, ResultExtractor<OUT> resultExtractor, TypeInformation<OUT> typeInformation, - DeliveryGuarantee deliveryGuarantee) { + @Nullable DeliveryGuarantee deliveryGuarantee, + @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings) { this.configuration = Preconditions.checkNotNull(configuration); this.connectionProvider = Preconditions.checkNotNull(connectionProvider); this.sqlSplitEnumeratorProvider = Preconditions.checkNotNull(sqlSplitEnumeratorProvider); this.resultExtractor = Preconditions.checkNotNull(resultExtractor); - this.deliveryGuarantee = Preconditions.checkNotNull(deliveryGuarantee); + this.deliveryGuarantee = + Objects.isNull(deliveryGuarantee) ? DeliveryGuarantee.NONE : deliveryGuarantee; this.typeInformation = Preconditions.checkNotNull(typeInformation); - this.boundedness = Boundedness.BOUNDED; - } - - JdbcSource( - Configuration configuration, - JdbcConnectionProvider connectionProvider, - JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider, - ResultExtractor<OUT> resultExtractor, - TypeInformation<OUT> typeInformation) { - this( - configuration, - connectionProvider, - sqlSplitEnumeratorProvider, - resultExtractor, - typeInformation, - DeliveryGuarantee.NONE); + this.continuousUnBoundingSettings = continuousUnBoundingSettings; + this.boundedness = + Objects.isNull(continuousUnBoundingSettings) + ? Boundedness.BOUNDED + : Boundedness.CONTINUOUS_UNBOUNDED; } @Override @@ -119,7 +114,10 @@ public class JdbcSource<OUT> public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumeratorState> createEnumerator( SplitEnumeratorContext<JdbcSourceSplit> enumContext) throws Exception { return new JdbcSourceEnumerator( - enumContext, sqlSplitEnumeratorProvider.create(), new ArrayList<>()); + enumContext, + sqlSplitEnumeratorProvider.create(), + continuousUnBoundingSettings, + new ArrayList<>()); } @Override @@ -132,6 +130,7 @@ public class JdbcSource<OUT> return new JdbcSourceEnumerator( enumContext, sqlSplitEnumeratorProvider.restore(optionalUserDefinedSplitEnumeratorState), + continuousUnBoundingSettings, checkpoint.getRemainingSplits()); } @@ -193,6 +192,7 @@ public class JdbcSource<OUT> && Objects.equals(sqlSplitEnumeratorProvider, that.sqlSplitEnumeratorProvider) && Objects.equals(connectionProvider, that.connectionProvider) && Objects.equals(resultExtractor, that.resultExtractor) - && deliveryGuarantee == that.deliveryGuarantee; + && deliveryGuarantee == that.deliveryGuarantee + && Objects.equals(continuousUnBoundingSettings, that.continuousUnBoundingSettings); } } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java index 17e3eb27..8d52f7ac 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java @@ -28,6 +28,8 @@ import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnecti import org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator; import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; +import org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider; +import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; @@ -36,9 +38,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.Serializable; import java.sql.DriverManager; import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.Objects; import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT; import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE; @@ -94,6 +100,11 @@ public class JdbcSourceBuilder<OUT> { public static final Logger LOG = LoggerFactory.getLogger(JdbcSourceBuilder.class); + public static final String INVALID_CONTINUOUS_SLIDE_TIMING_HINT = + "The 'jdbcParameterValuesProvider' must be specified with in type of 'JdbcSlideTimingParameterProvider' when using 'continuousUnBoundingSettings'."; + public static final String INVALID_SLIDE_TIMING_CONTINUOUS_HINT = + "The 'continuousUnBoundingSettings' must be specified with in type of 'continuousUnBoundingSettings' when using 'jdbcParameterValuesProvider' in type of 'JdbcSlideTimingParameterProvider'."; + private final Configuration configuration; private int splitReaderFetchBatchSize; @@ -103,15 +114,15 @@ public class JdbcSourceBuilder<OUT> { // Boolean to distinguish between default value and explicitly set autoCommit mode. private Boolean autoCommit; - // TODO It would be used to introduce streaming semantic and tracked in - // https://issues.apache.org/jira/browse/FLINK-33461 private DeliveryGuarantee deliveryGuarantee; + private @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings; private TypeInformation<OUT> typeInformation; private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder; private String sql; private JdbcParameterValuesProvider jdbcParameterValuesProvider; + private @Nullable Serializable optionalSqlSplitEnumeratorState; private ResultExtractor<OUT> resultExtractor; private JdbcConnectionProvider connectionProvider; @@ -143,9 +154,6 @@ public class JdbcSourceBuilder<OUT> { } public JdbcSourceBuilder<OUT> setUsername(String username) { - Preconditions.checkArgument( - !StringUtils.isNullOrWhitespaceOnly(username), - "It's required to set the 'username'."); connOptionsBuilder.withUsername(username); return this; } @@ -180,19 +188,45 @@ public class JdbcSourceBuilder<OUT> { // ------ Optional ------------------------------------------------------------------ - public JdbcSourceBuilder<OUT> setConnectionProperty(String propKey, String propVal) { - Preconditions.checkNotNull(propKey, "Connection property key mustn't be null"); - Preconditions.checkNotNull(propVal, "Connection property value mustn't be null"); - connOptionsBuilder.withProperty(propKey, propVal); + /** + * The continuousUnBoundingSettings to discovery the next available batch splits. Note: If the + * value was set, the {@link #jdbcParameterValuesProvider} must specified with the {@link + * org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider}. + */ + public JdbcSourceBuilder<OUT> setContinuousUnBoundingSettings( + ContinuousUnBoundingSettings continuousUnBoundingSettings) { + this.continuousUnBoundingSettings = continuousUnBoundingSettings; return this; } + /** + * If the value was set as an instance of {@link JdbcSlideTimingParameterProvider}, it's + * required to specify the {@link #continuousUnBoundingSettings}. + */ public JdbcSourceBuilder<OUT> setJdbcParameterValuesProvider( @Nonnull JdbcParameterValuesProvider parameterValuesProvider) { this.jdbcParameterValuesProvider = Preconditions.checkNotNull(parameterValuesProvider); return this; } + public JdbcSourceBuilder<OUT> setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) { + this.deliveryGuarantee = Preconditions.checkNotNull(deliveryGuarantee); + return this; + } + + public JdbcSourceBuilder<OUT> setConnectionCheckTimeoutSeconds( + int connectionCheckTimeoutSeconds) { + connOptionsBuilder.withConnectionCheckTimeoutSeconds(connectionCheckTimeoutSeconds); + return this; + } + + public JdbcSourceBuilder<OUT> setConnectionProperty(String propKey, String propVal) { + Preconditions.checkNotNull(propKey, "Connection property key mustn't be null"); + Preconditions.checkNotNull(propVal, "Connection property value mustn't be null"); + connOptionsBuilder.withProperty(propKey, propVal); + return this; + } + public JdbcSourceBuilder<OUT> setSplitReaderFetchBatchSize(int splitReaderFetchBatchSize) { Preconditions.checkArgument( splitReaderFetchBatchSize > 0, @@ -232,11 +266,26 @@ public class JdbcSourceBuilder<OUT> { return this; } + public JdbcSourceBuilder<OUT> setOptionalSqlSplitEnumeratorState( + Serializable optionalSqlSplitEnumeratorState) { + this.optionalSqlSplitEnumeratorState = optionalSqlSplitEnumeratorState; + return this; + } + public JdbcSource<OUT> build() { this.connectionProvider = new SimpleJdbcConnectionProvider(connOptionsBuilder.build()); if (resultSetFetchSize > 0) { this.configuration.set(RESULTSET_FETCH_SIZE, resultSetFetchSize); } + + if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) { + Preconditions.checkArgument( + this.resultSetType == ResultSet.TYPE_SCROLL_INSENSITIVE + || this.resultSetType == ResultSet.CONCUR_READ_ONLY, + "The 'resultSetType' must be ResultSet.TYPE_SCROLL_INSENSITIVE or ResultSet.CONCUR_READ_ONLY when using %s", + DeliveryGuarantee.EXACTLY_ONCE); + } + this.configuration.set(RESULTSET_CONCURRENCY, resultSetConcurrency); this.configuration.set(RESULTSET_TYPE, resultSetType); this.configuration.set(READER_FETCH_BATCH_SIZE, splitReaderFetchBatchSize); @@ -247,15 +296,31 @@ public class JdbcSourceBuilder<OUT> { Preconditions.checkNotNull(resultExtractor, "'resultExtractor' mustn't be null."); Preconditions.checkNotNull(typeInformation, "'typeInformation' mustn't be null."); + if (Objects.nonNull(continuousUnBoundingSettings)) { + Preconditions.checkArgument( + Objects.nonNull(jdbcParameterValuesProvider) + && jdbcParameterValuesProvider + instanceof JdbcSlideTimingParameterProvider, + INVALID_SLIDE_TIMING_CONTINUOUS_HINT); + } + + if (Objects.nonNull(jdbcParameterValuesProvider) + && jdbcParameterValuesProvider instanceof JdbcSlideTimingParameterProvider) { + Preconditions.checkArgument( + Objects.nonNull(continuousUnBoundingSettings), + INVALID_CONTINUOUS_SLIDE_TIMING_HINT); + } + return new JdbcSource<>( configuration, connectionProvider, new SqlTemplateSplitEnumerator.TemplateSqlSplitEnumeratorProvider() - .setOptionalSqlSplitEnumeratorState(null) + .setOptionalSqlSplitEnumeratorState(optionalSqlSplitEnumeratorState) .setSqlTemplate(sql) .setParameterValuesProvider(jdbcParameterValuesProvider), resultExtractor, typeInformation, - deliveryGuarantee); + deliveryGuarantee, + continuousUnBoundingSettings); } } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumerator.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumerator.java index a6079723..a26905fc 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumerator.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumerator.java @@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -34,7 +35,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Optional; /** JDBC source enumerator. */ @@ -44,26 +48,43 @@ public class JdbcSourceEnumerator private final SplitEnumeratorContext<JdbcSourceSplit> context; private final Boundedness boundedness; + private final LinkedHashMap<Integer, String> readersAwaitingSplit; private final List<JdbcSourceSplit> unassigned; private final JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> sqlSplitEnumerator; + private final @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings; public JdbcSourceEnumerator( SplitEnumeratorContext<JdbcSourceSplit> context, JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> sqlSplitEnumerator, + ContinuousUnBoundingSettings continuousUnBoundingSettings, List<JdbcSourceSplit> unassigned) { this.context = Preconditions.checkNotNull(context); this.sqlSplitEnumerator = Preconditions.checkNotNull(sqlSplitEnumerator); - this.boundedness = Boundedness.BOUNDED; + this.continuousUnBoundingSettings = continuousUnBoundingSettings; + this.boundedness = + Objects.isNull(continuousUnBoundingSettings) + ? Boundedness.BOUNDED + : Boundedness.CONTINUOUS_UNBOUNDED; this.unassigned = Preconditions.checkNotNull(unassigned); + this.readersAwaitingSplit = new LinkedHashMap<>(); } @Override public void start() { sqlSplitEnumerator.open(); - try { - unassigned.addAll(sqlSplitEnumerator.enumerateSplits()); - } catch (IOException e) { - throw new RuntimeException(e); + if (boundedness == Boundedness.CONTINUOUS_UNBOUNDED + && Objects.nonNull(continuousUnBoundingSettings)) { + context.callAsync( + () -> sqlSplitEnumerator.enumerateSplits(() -> 1024 - unassigned.size() > 0), + this::processNewSplits, + continuousUnBoundingSettings.getInitialDiscoveryDelay().toMillis(), + continuousUnBoundingSettings.getDiscoveryInterval().toMillis()); + } else { + try { + unassigned.addAll(sqlSplitEnumerator.enumerateSplits(() -> true)); + } catch (IOException e) { + throw new RuntimeException(e); + } } } @@ -81,6 +102,9 @@ public class JdbcSourceEnumerator public void handleSplitRequest(int subtask, @Nullable String hostname) { if (boundedness == Boundedness.BOUNDED) { assignSplitsForBounded(subtask, hostname); + } else { + readersAwaitingSplit.put(subtask, hostname); + assignSplitsForUnbounded(); } } @@ -93,6 +117,16 @@ public class JdbcSourceEnumerator public void addSplitsBack(List<JdbcSourceSplit> splits, int subtaskId) { LOG.debug("File Source Enumerator adds splits back: {}", splits); unassigned.addAll(splits); + if (boundedness == Boundedness.BOUNDED) { + context.registeredReaders() + .keySet() + .forEach(subTask -> assignSplitsForBounded(subTask, null)); + } else if (boundedness == Boundedness.CONTINUOUS_UNBOUNDED) { + if (context.registeredReaders().containsKey(subtaskId)) { + readersAwaitingSplit.put(subtaskId, null); + } + assignSplitsForUnbounded(); + } } @Override @@ -110,12 +144,9 @@ public class JdbcSourceEnumerator return Optional.empty(); } Iterator<JdbcSourceSplit> iterator = unassigned.iterator(); - JdbcSourceSplit next = null; - if (iterator.hasNext()) { - next = iterator.next(); - iterator.remove(); - } - return Optional.ofNullable(next); + JdbcSourceSplit next = iterator.next(); + iterator.remove(); + return Optional.of(next); } private void assignSplitsForBounded(int subtask, @Nullable String hostname) { @@ -137,4 +168,39 @@ public class JdbcSourceEnumerator LOG.info("No more splits available for subtask {}", subtask); } } + + private void processNewSplits(List<JdbcSourceSplit> splits, Throwable error) { + if (error != null) { + LOG.error("Failed to enumerate sql splits.", error); + return; + } + this.unassigned.addAll(splits); + + assignSplitsForUnbounded(); + } + + private void assignSplitsForUnbounded() { + final Iterator<Map.Entry<Integer, String>> awaitingReader = + readersAwaitingSplit.entrySet().iterator(); + + while (awaitingReader.hasNext()) { + final Map.Entry<Integer, String> nextAwaiting = awaitingReader.next(); + + // if the reader that requested another split has failed in the meantime, remove + // it from the list of waiting readers + if (!context.registeredReaders().containsKey(nextAwaiting.getKey())) { + awaitingReader.remove(); + continue; + } + + final int awaitingSubtask = nextAwaiting.getKey(); + final Optional<JdbcSourceSplit> nextSplit = getNextSplit(); + if (nextSplit.isPresent()) { + context.assignSplit(nextSplit.get(), awaitingSubtask); + awaitingReader.remove(); + } else { + break; + } + } + } } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSqlSplitEnumeratorBase.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSqlSplitEnumeratorBase.java index 7e0d70d1..25d09a71 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSqlSplitEnumeratorBase.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSqlSplitEnumeratorBase.java @@ -21,11 +21,13 @@ package org.apache.flink.connector.jdbc.source.enumerator; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; import java.util.List; +import java.util.function.Supplier; /** * Base class for jdbc sql split enumerator. @@ -64,10 +66,12 @@ public abstract class JdbcSqlSplitEnumeratorBase<SplitT> implements AutoCloseabl /** * Enumerate the JDBC splits. * + * @param splitGettable If the next batch splits are gettable. * @return The result splits generated by the split enumerator. * @throws IOException IO exception. */ - public abstract List<JdbcSourceSplit> enumerateSplits() throws IOException; + public abstract List<JdbcSourceSplit> enumerateSplits(@Nonnull Supplier<Boolean> splitGettable) + throws IOException; /** * A provider to create or restore a JDBC sql splits enumerator. diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/SqlTemplateSplitEnumerator.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/SqlTemplateSplitEnumerator.java index e4e51455..2099d0e0 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/SqlTemplateSplitEnumerator.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/enumerator/SqlTemplateSplitEnumerator.java @@ -19,22 +19,29 @@ package org.apache.flink.connector.jdbc.source.enumerator; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.jdbc.source.split.CheckpointedOffset; import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.function.Supplier; /** A split enumerator based on sql-parameters grains. */ public final class SqlTemplateSplitEnumerator extends JdbcSqlSplitEnumeratorBase<JdbcSourceSplit> { + public static final Logger LOG = LoggerFactory.getLogger(SqlTemplateSplitEnumerator.class); + private final String sqlTemplate; private final JdbcParameterValuesProvider parameterValuesProvider; @@ -49,21 +56,40 @@ public final class SqlTemplateSplitEnumerator extends JdbcSqlSplitEnumeratorBase } @Override - public List<JdbcSourceSplit> enumerateSplits() throws IOException { + @Nonnull + public List<JdbcSourceSplit> enumerateSplits(@Nonnull Supplier<Boolean> splitGettable) + throws RuntimeException { + + if (!splitGettable.get()) { + LOG.info( + "The current split is over max splits capacity of {}.", + JdbcSourceEnumerator.class.getSimpleName()); + return Collections.emptyList(); + } + if (parameterValuesProvider == null) { return Collections.singletonList( - new JdbcSourceSplit(getNextId(), sqlTemplate, null, 0, null)); + new JdbcSourceSplit(getNextId(), sqlTemplate, null, new CheckpointedOffset())); + } + + if (optionalSqlSplitEnumeratorState != null) { + parameterValuesProvider.setOptionalState(optionalSqlSplitEnumeratorState); } Serializable[][] parameters = parameterValuesProvider.getParameterValues(); + + // update state + optionalSqlSplitEnumeratorState = parameterValuesProvider.getLatestOptionalState(); + if (parameters == null) { return Collections.singletonList( - new JdbcSourceSplit(getNextId(), sqlTemplate, null, 0, null)); + new JdbcSourceSplit(getNextId(), sqlTemplate, null, new CheckpointedOffset())); } List<JdbcSourceSplit> jdbcSourceSplitList = new ArrayList<>(parameters.length); for (Serializable[] paramArr : parameters) { jdbcSourceSplitList.add( - new JdbcSourceSplit(getNextId(), sqlTemplate, paramArr, 0, null)); + new JdbcSourceSplit( + getNextId(), sqlTemplate, paramArr, new CheckpointedOffset())); } return jdbcSourceSplitList; } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java index f31d1644..5011f42b 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java @@ -48,6 +48,7 @@ import java.util.ArrayDeque; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Queue; import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT; @@ -71,25 +72,28 @@ public class JdbcSourceSplitReader<T> private final Queue<JdbcSourceSplit> splits; private final TypeInformation<T> typeInformation; private final JdbcConnectionProvider connectionProvider; + private transient Connection connection; private transient PreparedStatement statement; + // Boolean to distinguish between default value and explicitly set autoCommit mode. + private final Boolean autoCommit; private transient ResultSet resultSet; + protected boolean hasNextRecordCurrentSplit; + private int currentSplitOffset; private final ResultExtractor<T> resultExtractor; - protected boolean hasNextRecordCurrentSplit; + private final DeliveryGuarantee deliveryGuarantee; private final int splitReaderFetchBatchSize; - private final int resultSetType; private final int resultSetConcurrency; private final int resultSetFetchSize; - // Boolean to distinguish between default value and explicitly set autoCommit mode. - private final Boolean autoCommit; - private int currentSplitOffset; private final SourceReaderContext context; + private @Nullable JdbcSourceSplit skippedSplit; + public JdbcSourceSplitReader( SourceReaderContext context, Configuration config, @@ -120,7 +124,10 @@ public class JdbcSourceSplitReader<T> @Override public RecordsWithSplitIds<RecordAndOffset<T>> fetch() throws IOException { - checkSplitOrStartNext(); + boolean couldFetch = checkSplitOrStartNext(); + if (!couldFetch) { + return new RecordsBySplits.Builder<RecordAndOffset<T>>().build(); + } if (!hasNextRecordCurrentSplit) { return finishSplit(); @@ -154,21 +161,35 @@ public class JdbcSourceSplitReader<T> closeResultSetAndStatement(); RecordsBySplits.Builder<RecordAndOffset<T>> builder = new RecordsBySplits.Builder<>(); - Preconditions.checkState(currentSplit != null, "currentSplit"); - builder.addFinishedSplit(currentSplit.splitId()); + JdbcSourceSplit splitToFinish = Objects.nonNull(currentSplit) ? currentSplit : skippedSplit; + Preconditions.checkState(splitToFinish != null, "Split to finish mustn't be null."); + builder.addFinishedSplit(splitToFinish.splitId()); currentSplit = null; + skippedSplit = null; return builder.build(); } private void closeResultSetAndStatement() { + closeResultSetIfNeeded(); + closeStatementIfNeeded(); + } + + private void closeResultSetIfNeeded() { try { if (resultSet != null && !resultSet.isClosed()) { resultSet.close(); } + resultSet = null; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private void closeStatementIfNeeded() { + try { if (statement != null && !statement.isClosed()) { statement.close(); } - resultSet = null; statement = null; } catch (SQLException e) { throw new RuntimeException(e); @@ -206,40 +227,47 @@ public class JdbcSourceSplitReader<T> return typeInformation; } - private void checkSplitOrStartNext() { + @VisibleForTesting + public List<JdbcSourceSplit> getSplits() { + return Collections.unmodifiableList(Arrays.asList(splits.toArray(new JdbcSourceSplit[0]))); + } + + @VisibleForTesting + public Connection getConnection() { + return connection; + } + + @VisibleForTesting + public PreparedStatement getStatement() { + return statement; + } + + @VisibleForTesting + public ResultSet getResultSet() { + return resultSet; + } + + // ---------------- Private methods -------------------------------- + + private boolean checkSplitOrStartNext() { try { if (hasNextRecordCurrentSplit && resultSet != null) { - return; + return true; } final JdbcSourceSplit nextSplit = splits.poll(); - if (nextSplit == null) { - throw new IOException("Cannot fetch from another split - no split remaining"); + if (nextSplit != null) { + currentSplit = nextSplit; + openResultSetForSplit(currentSplit); + return true; } - currentSplit = nextSplit; - openResultSetForSplit(currentSplit); - } catch (SQLException | ClassNotFoundException | IOException e) { + return false; + } catch (SQLException | ClassNotFoundException e) { throw new RuntimeException(e); } } - private void discardSplit(JdbcSourceSplit split) throws SQLException { - if (split.getOffset() != 0) { - hasNextRecordCurrentSplit = false; - currentSplitOffset = 0; - if (resultSet != null && !resultSet.isClosed()) { - resultSet.close(); - } - if (statement != null && !statement.isClosed()) { - statement.close(); - } - resultSet = null; - statement = null; - currentSplit = null; - } - } - private void getOrEstablishConnection() throws SQLException, ClassNotFoundException { connection = connectionProvider.getOrEstablishConnection(); if (autoCommit == null) { @@ -252,7 +280,79 @@ public class JdbcSourceSplitReader<T> private void openResultSetForSplit(JdbcSourceSplit split) throws SQLException, ClassNotFoundException { + switch (deliveryGuarantee) { + case EXACTLY_ONCE: + openResultSetForSplitWhenExactlyOnce(split); + break; + case AT_LEAST_ONCE: + openResultSetForSplitWhenAtLeastOnce(split); + break; + case NONE: + default: + openResultSetForSplitWhenAtMostOnce(split); + break; + } + } + + private void openResultSetForSplitWhenAtMostOnce(JdbcSourceSplit split) + throws SQLException, ClassNotFoundException { + if (split.getReaderPosition() != 0) { + skippedSplit = currentSplit; + currentSplit = null; + hasNextRecordCurrentSplit = false; + currentSplitOffset = 0; + closeResultSetAndStatement(); + } else { + openResultSetForSplitWhenAtLeastOnce(split); + } + } + + private void openResultSetForSplitWhenExactlyOnce(JdbcSourceSplit split) + throws SQLException, ClassNotFoundException { getOrEstablishConnection(); + closeResultSetIfNeeded(); + prepareStatement(split); + resultSet = statement.executeQuery(); + currentSplitOffset = 0; + hasNextRecordCurrentSplit = resultSet.next(); + if (hasNextRecordCurrentSplit) { + moveResultSetCursorByOffset(); + } + } + + private void moveResultSetCursorByOffset() throws SQLException { + int resultSetOffset = currentSplit.getReaderPosition(); + if (resultSetOffset == 0) { + return; + } + resultSet.last(); + int last = resultSet.getRow(); + resultSet.absolute(1); + if (resultSetOffset < last) { + currentSplitOffset = resultSetOffset; + resultSet.absolute(resultSetOffset + 1); + } else { + hasNextRecordCurrentSplit = false; + LOG.warn( + "The offset will not be set from splitState, because the last cursor is {}, the expected cursor is {}.", + last, + resultSetOffset + 1); + } + } + + private void openResultSetForSplitWhenAtLeastOnce(JdbcSourceSplit split) + throws SQLException, ClassNotFoundException { + getOrEstablishConnection(); + closeResultSetIfNeeded(); + prepareStatement(split); + resultSet = statement.executeQuery(); + // AT_LEAST_ONCE + hasNextRecordCurrentSplit = resultSet.next(); + currentSplitOffset = 0; + } + + private void prepareStatement(JdbcSourceSplit split) throws SQLException { + closeStatementIfNeeded(); statement = connection.prepareStatement( split.getSqlTemplate(), resultSetType, resultSetConcurrency); @@ -263,27 +363,5 @@ public class JdbcSourceSplitReader<T> } } statement.setFetchSize(resultSetFetchSize); - resultSet = statement.executeQuery(); - hasNextRecordCurrentSplit = resultSet.next(); - } - - @VisibleForTesting - public List<JdbcSourceSplit> getSplits() { - return Collections.unmodifiableList(Arrays.asList(splits.toArray(new JdbcSourceSplit[0]))); - } - - @VisibleForTesting - public Connection getConnection() { - return connection; - } - - @VisibleForTesting - public PreparedStatement getStatement() { - return statement; - } - - @VisibleForTesting - public ResultSet getResultSet() { - return resultSet; } } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplit.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplit.java index af71a58a..7a690ca9 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplit.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplit.java @@ -20,6 +20,7 @@ package org.apache.flink.connector.jdbc.source.split; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; @@ -38,27 +39,19 @@ public class JdbcSourceSplit implements SourceSplit, Serializable { private final @Nullable Serializable[] parameters; - private final int offset; - private final @Nullable CheckpointedOffset checkpointedOffset; public JdbcSourceSplit( String id, String sqlTemplate, @Nullable Serializable[] parameters, - int offset, @Nullable CheckpointedOffset checkpointedOffset) { this.id = id; this.sqlTemplate = sqlTemplate; this.parameters = parameters; - this.offset = offset; this.checkpointedOffset = checkpointedOffset; } - public int getOffset() { - return offset; - } - @Nullable public CheckpointedOffset getCheckpointedOffset() { return checkpointedOffset; @@ -66,13 +59,21 @@ public class JdbcSourceSplit implements SourceSplit, Serializable { public JdbcSourceSplit updateWithCheckpointedPosition( @Nullable CheckpointedOffset checkpointedOffset) { - return new JdbcSourceSplit(id, sqlTemplate, parameters, offset, checkpointedOffset); + return new JdbcSourceSplit(id, sqlTemplate, parameters, checkpointedOffset); } - public Optional<CheckpointedOffset> getReaderPosition() { + public Optional<CheckpointedOffset> getReaderPositionOptional() { return Optional.ofNullable(checkpointedOffset); } + public int getReaderPosition() { + if (Objects.nonNull(checkpointedOffset)) { + Preconditions.checkState(checkpointedOffset.getOffset() <= Integer.MAX_VALUE); + return (int) checkpointedOffset.getOffset(); + } + return 0; + } + public String getSqlTemplate() { return sqlTemplate; } @@ -96,8 +97,7 @@ public class JdbcSourceSplit implements SourceSplit, Serializable { return false; } JdbcSourceSplit that = (JdbcSourceSplit) o; - return offset == that.offset - && Objects.equals(id, that.id) + return Objects.equals(id, that.id) && Objects.equals(sqlTemplate, that.sqlTemplate) && Arrays.equals(parameters, that.parameters) && Objects.equals(checkpointedOffset, that.checkpointedOffset); @@ -105,7 +105,7 @@ public class JdbcSourceSplit implements SourceSplit, Serializable { @Override public int hashCode() { - int result = Objects.hash(id, sqlTemplate, offset, checkpointedOffset); + int result = Objects.hash(id, sqlTemplate, checkpointedOffset); result = 31 * result + Arrays.hashCode(parameters); return result; } @@ -121,8 +121,6 @@ public class JdbcSourceSplit implements SourceSplit, Serializable { + '\'' + ", parameters=" + Arrays.toString(parameters) - + ", offset=" - + offset + ", checkpointedOffset=" + checkpointedOffset + '}'; diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializer.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializer.java index 4aadfef1..69a23406 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializer.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializer.java @@ -81,8 +81,6 @@ public class JdbcSourceSplitSerializer implements SimpleVersionedSerializer<Jdbc out.writeInt(paramsBytes.length); out.write(paramsBytes); - out.writeInt(sourceSplit.getOffset()); - CheckpointedOffset checkpointedOffset = sourceSplit.getCheckpointedOffset(); byte[] chkOffset = InstantiationUtil.serializeObject(checkpointedOffset); out.writeInt(chkOffset.length); @@ -100,14 +98,12 @@ public class JdbcSourceSplitSerializer implements SimpleVersionedSerializer<Jdbc InstantiationUtil.deserializeObject( parametersBytes, in.getClass().getClassLoader()); - int offset = in.readInt(); - int chkOffsetBytesLen = in.readInt(); byte[] chkOffsetBytes = new byte[chkOffsetBytesLen]; in.read(chkOffsetBytes); CheckpointedOffset chkOffset = InstantiationUtil.deserializeObject(chkOffsetBytes, in.getClass().getClassLoader()); - return new JdbcSourceSplit(id, sqlTemplate, params, offset, chkOffset); + return new JdbcSourceSplit(id, sqlTemplate, params, chkOffset); } } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitState.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitState.java index f74c381b..2e3b2c3d 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitState.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitState.java @@ -37,7 +37,7 @@ public class JdbcSourceSplitState<SplitT extends JdbcSourceSplit> implements Ser public JdbcSourceSplitState(SplitT split) { this.split = checkNotNull(split); - final Optional<CheckpointedOffset> readerPosition = split.getReaderPosition(); + final Optional<CheckpointedOffset> readerPosition = split.getReaderPositionOptional(); if (readerPosition.isPresent()) { this.offset = readerPosition.get().getOffset(); this.recordsToSkipAfterOffset = readerPosition.get().getRecordsAfterOffset(); diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcParameterValuesProvider.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcParameterValuesProvider.java index 8dcceb28..3f1c2aed 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcParameterValuesProvider.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcParameterValuesProvider.java @@ -33,4 +33,12 @@ public interface JdbcParameterValuesProvider extends Serializable { /** Returns the necessary parameters array to use for query in parallel a table. */ Serializable[][] getParameterValues(); + + /** Get the latest optional state data. */ + default Serializable getLatestOptionalState() { + return null; + } + + /** Set the optional state data. */ + default void setOptionalState(Serializable optionalState) {} } diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcSlideTimingParameterProvider.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcSlideTimingParameterProvider.java new file mode 100644 index 00000000..eae5891e --- /dev/null +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/split/JdbcSlideTimingParameterProvider.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.split; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** The parameters provider generate parameters by slide timing window strategy. */ +@PublicEvolving +public class JdbcSlideTimingParameterProvider implements JdbcParameterValuesProvider { + + private final long slideStepMills; + private final long slideSpanMills; + private final long splitGenerateDelayMillis; + + private @Nonnull Long startMills; + + public JdbcSlideTimingParameterProvider( + Long startMills, + long slideSpanMills, + long slideStepMills, + long splitGenerateDelayMillis) { + this.startMills = Preconditions.checkNotNull(startMills); + Preconditions.checkArgument( + startMills > 0L, + "'startMillis' of JdbcSlideTimingParameterProvider must be greater than 0. "); + Preconditions.checkArgument( + slideSpanMills > 0 || slideStepMills > 0, + "JdbcSlideTimingParameterProvider parameters must satisfy " + + "slideSpanMills > 0 and slideStepMills > 0"); + Preconditions.checkArgument( + splitGenerateDelayMillis >= 0L, + "JdbcSlideTimingParameterProvider parameters must satisfy " + + "splitGenerateDelayMillis >= 0"); + this.slideStepMills = slideStepMills; + this.slideSpanMills = slideSpanMills; + this.splitGenerateDelayMillis = splitGenerateDelayMillis; + } + + private boolean nextSplitAvailable(Long nextSpanStartMillis) { + final long delayedNextSpanStartMillis = nextSpanStartMillis + splitGenerateDelayMillis; + final long currentAvailableMillis = currentAvailableMillis(); + return currentAvailableMillis >= delayedNextSpanStartMillis + && (currentAvailableMillis - delayedNextSpanStartMillis >= slideSpanMills); + } + + @Override + public Long getLatestOptionalState() { + return startMills; + } + + @Override + public void setOptionalState(Serializable optionalState) { + Preconditions.checkArgument((Long) optionalState > 0L); + this.startMills = (Long) optionalState; + } + + public Long currentAvailableMillis() { + return System.currentTimeMillis(); + } + + @Override + public Serializable[][] getParameterValues() { + List<Serializable[]> tmpList = new ArrayList<>(); + while (nextSplitAvailable(startMills)) { + Serializable[] params = new Serializable[] {startMills, startMills + slideSpanMills}; + tmpList.add(params); + startMills += slideStepMills; + } + return tmpList.toArray(new Serializable[0][]); + } +} diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/ContinuousUnBoundingSettings.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/ContinuousUnBoundingSettings.java new file mode 100644 index 00000000..db951ab6 --- /dev/null +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/utils/ContinuousUnBoundingSettings.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.utils; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; +import java.time.Duration; +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Settings describing how to do continuous file discovery and enumeration for the file source's + * continuous discovery and streaming mode. + */ +@PublicEvolving +public final class ContinuousUnBoundingSettings implements Serializable { + + private static final long serialVersionUID = 1L; + + private final Duration initialDiscoveryDelay; + private final Duration discoveryInterval; + + public ContinuousUnBoundingSettings( + Duration initialDiscoveryDelay, Duration discoveryInterval) { + this.initialDiscoveryDelay = initialDiscoveryDelay; + this.discoveryInterval = checkNotNull(discoveryInterval); + } + + public Duration getDiscoveryInterval() { + return discoveryInterval; + } + + public Duration getInitialDiscoveryDelay() { + return initialDiscoveryDelay; + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "ContinuousUnBoundingSettings{" + + "initialDiscoveryDelay=" + + initialDiscoveryDelay + + ", discoveryInterval=" + + discoveryInterval + + '}'; + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + + if (object == null || getClass() != object.getClass()) { + return false; + } + + ContinuousUnBoundingSettings that = (ContinuousUnBoundingSettings) object; + return Objects.equals(initialDiscoveryDelay, that.initialDiscoveryDelay) + && Objects.equals(discoveryInterval, that.discoveryInterval); + } + + @Override + public int hashCode() { + return Objects.hash(initialDiscoveryDelay, discoveryInterval); + } +} diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilderTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilderTest.java index e9b15736..a760749a 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilderTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilderTest.java @@ -22,12 +22,18 @@ import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator; import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider; import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider; import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; +import org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider; +import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings; import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; +import java.io.Serializable; +import java.time.Duration; + import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE; import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE; import static org.assertj.core.api.Assertions.assertThat; @@ -116,8 +122,6 @@ class JdbcSourceBuilderTest { void testSetConnectionInfo() { assertThatThrownBy(() -> JdbcSource.builder().setDriverName("")) .isInstanceOf(IllegalArgumentException.class); - assertThatThrownBy(() -> JdbcSource.builder().setUsername("")) - .isInstanceOf(IllegalArgumentException.class); assertThatThrownBy(() -> JdbcSource.builder().setDBUrl("")) .isInstanceOf(IllegalArgumentException.class); } @@ -142,4 +146,39 @@ class JdbcSourceBuilderTest { .build()) .isInstanceOf(NullPointerException.class); } + + @Test + void testSetStreamingSemantic() { + assertThatThrownBy( + () -> + sourceBuilder + .setContinuousUnBoundingSettings( + new ContinuousUnBoundingSettings( + Duration.ofMillis(1L), + Duration.ofMillis(1L))) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage(JdbcSourceBuilder.INVALID_SLIDE_TIMING_CONTINUOUS_HINT); + + assertThatThrownBy( + () -> + sourceBuilder + .setJdbcParameterValuesProvider( + new JdbcGenericParameterValuesProvider( + new Serializable[][] {})) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage(JdbcSourceBuilder.INVALID_SLIDE_TIMING_CONTINUOUS_HINT); + + sourceBuilder.setContinuousUnBoundingSettings(null); + assertThatThrownBy( + () -> + sourceBuilder + .setJdbcParameterValuesProvider( + new JdbcSlideTimingParameterProvider( + 1L, 1L, 1L, 1L)) + .build()) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage(JdbcSourceBuilder.INVALID_CONTINUOUS_SLIDE_TIMING_HINT); + } } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceStreamRelatedITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceStreamRelatedITCase.java new file mode 100644 index 00000000..8e9f61cf --- /dev/null +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/JdbcSourceStreamRelatedITCase.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.jdbc.databases.derby.DerbyTestBase; +import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider; +import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase; +import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.test.junit5.InjectClusterClient; +import org.apache.flink.util.Collector; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import javax.annotation.Nonnull; + +import java.io.Serializable; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.function.Supplier; + +import static org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for streaming semantic related cases of {@link JdbcSource}. */ +class JdbcSourceStreamRelatedITCase implements DerbyTestBase, JdbcITCaseBase { + + private static final long ONE_SECOND = Duration.ofSeconds(1L).toMillis(); + private static final int TESTING_PARALLELISM = 2; + private static final long INTERVAL_OF_GENERATING = 23L; + private static final int TESTING_ENTRIES_SIZE = 200; + private static final int DATA_NUM_PER_SECOND_SPAN_SPLIT = + (int) (ONE_SECOND / INTERVAL_OF_GENERATING + 1); + private static final String testingTable = "t_testing"; + private static final String CREATE_SQL = + "CREATE TABLE " + + testingTable + + " (" + + "id bigint NOT NULL, " + + "ts bigint NOT NULL, " + + "PRIMARY KEY (id))"; + private static final ContinuousUnBoundingSettings CONTINUOUS_SETTINGS = + new ContinuousUnBoundingSettings(Duration.ofMillis(10L), Duration.ofSeconds(1L)); + private static final ResultExtractor<TestEntry> EXTRACTOR = + resultSet -> new TestEntry(resultSet.getLong("id"), resultSet.getLong("ts")); + private static final List<TestEntry> testEntries = new ArrayList<>(TESTING_ENTRIES_SIZE); + + private static Queue<TestEntry> collectedRecords; + private static long globalStartMillis; + private static long globalDataEndMillis; + + private JdbcSourceBuilder<TestEntry> jdbcSourceBuilder; + + @BeforeEach + void initData() { + testEntries.clear(); + quickExecutionSQL(CREATE_SQL); + generateTestEntries(); + String insertSQL = generateInsertSQL(); + quickExecutionSQL(insertSQL); + + JdbcSlideTimingParameterProvider slideTimingParamsProvider = + new JdbcSlideTimingParameterProvider( + globalStartMillis, ONE_SECOND, ONE_SECOND, 100L); + jdbcSourceBuilder = + JdbcSource.<TestEntry>builder() + .setTypeInformation(TypeInformation.of(TestEntry.class)) + .setSql("select * from " + testingTable + " where ts >= ? and ts < ?") + .setDBUrl(getMetadata().getJdbcUrl()) + .setUsername(getMetadata().getUsername()) + .setPassword(getMetadata().getPassword()) + .setContinuousUnBoundingSettings(CONTINUOUS_SETTINGS) + .setJdbcParameterValuesProvider(slideTimingParamsProvider) + .setDriverName(getMetadata().getDriverClass()) + .setResultExtractor(EXTRACTOR); + + collectedRecords = new ConcurrentLinkedDeque<>(); + } + + @AfterEach + void clearData() { + quickExecutionSQL("delete from " + testingTable); + quickExecutionSQL("DROP TABLE " + testingTable); + } + + @ParameterizedTest + @EnumSource(DeliveryGuarantee.class) + void testForNormalCaseWithoutFailure( + DeliveryGuarantee guarantee, @InjectClusterClient ClusterClient<?> client) + throws Exception { + // Test continuous + unbounded splits + StreamExecutionEnvironment env = getEnvWithRestartStrategyParallelism(); + + jdbcSourceBuilder.setDeliveryGuarantee(guarantee); + if (DeliveryGuarantee.EXACTLY_ONCE == guarantee) { + jdbcSourceBuilder.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE); + } + env.fromSource(jdbcSourceBuilder.build(), WatermarkStrategy.noWatermarks(), "TestSource") + .addSink(new TestingSinkFunction()); + waitExpectation(client, env, () -> collectedRecords.size() >= TESTING_ENTRIES_SIZE); + + assertThat(collectedRecords).containsExactlyInAnyOrderElementsOf(testEntries); + } + + @Test + void testExactlyOnceWithFailure(@InjectClusterClient ClusterClient<?> client) throws Exception { + // Test continuous + unbounded splits + StreamExecutionEnvironment env = getEnvWithRestartStrategyParallelism(); + jdbcSourceBuilder + .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE); + + env.fromSource(jdbcSourceBuilder.build(), WatermarkStrategy.noWatermarks(), "TestSource") + .keyBy(testEntry -> 0L) + .process(new TestingKeyProcessFunction()) + .setParallelism(1); + + waitExpectation(client, env, () -> collectedRecords.size() >= TESTING_ENTRIES_SIZE); + + assertThat(collectedRecords).containsExactlyInAnyOrderElementsOf(testEntries); + } + + @Test + void testAtLeastOnceWithFailure(@InjectClusterClient ClusterClient<?> client) throws Exception { + // Test continuous + unbounded splits + StreamExecutionEnvironment env = getEnvWithRestartStrategyParallelism(); + jdbcSourceBuilder.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE); + + env.fromSource(jdbcSourceBuilder.build(), WatermarkStrategy.noWatermarks(), "TestSource") + .keyBy(testEntry -> 0L) + .process(new TestingKeyProcessFunction()) + .setParallelism(1); + + waitExpectation( + client, env, () -> new HashSet<>(collectedRecords).size() >= TESTING_ENTRIES_SIZE); + + assertThat(collectedRecords) + .hasSizeGreaterThanOrEqualTo(testEntries.size()) + .containsAll(testEntries); + } + + @Test + void testAtMostOnceWithFailure(@InjectClusterClient ClusterClient<?> client) throws Exception { + // Test continuous + unbounded splits + StreamExecutionEnvironment env = getEnvWithRestartStrategyParallelism(); + + env.fromSource(jdbcSourceBuilder.build(), WatermarkStrategy.noWatermarks(), "TestSource") + .keyBy(testEntry -> 0L) + .process(new TestingKeyProcessFunction()) + .setParallelism(1); + + waitExpectation( + client, + env, + () -> + Math.abs(collectedRecords.size() - testEntries.size()) + <= DATA_NUM_PER_SECOND_SPAN_SPLIT * TESTING_PARALLELISM); + + assertThat(testEntries) + .hasSizeGreaterThanOrEqualTo(collectedRecords.size()) + .containsAll(collectedRecords); + } + + private static void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private static void waitExpectation( + ClusterClient<?> client, StreamExecutionEnvironment env, Supplier<Boolean> condition) + throws Exception { + JobID jobID = env.executeAsync().getJobID(); + CompletableFuture<Void> future = + CompletableFuture.runAsync( + () -> { + while (true) { + if (condition.get()) { + client.cancel(jobID); + break; + } + sleep(50); + } + }); + future.get(); + } + + private void generateTestEntries() { + // The data is distributed in 1 min. + long millisAnchor = System.currentTimeMillis(); + globalStartMillis = millisAnchor - INTERVAL_OF_GENERATING * TESTING_ENTRIES_SIZE / 2; + globalDataEndMillis = + globalDataEndMillis + millisAnchor + INTERVAL_OF_GENERATING * TESTING_ENTRIES_SIZE; + long startMillis = globalStartMillis; + for (int i = 0; i < TESTING_ENTRIES_SIZE; i++) { + testEntries.add(new TestEntry(i + 1, startMillis)); + startMillis += INTERVAL_OF_GENERATING; + } + } + + @Nonnull + private static String generateInsertSQL() { + StringBuilder sqlQueryBuilder = + new StringBuilder("INSERT INTO " + testingTable + " (id, ts) VALUES "); + for (int i = 0; i < testEntries.size(); i++) { + sqlQueryBuilder + .append("(") + .append(testEntries.get(i).id) + .append(",") + .append(testEntries.get(i).ts) + .append(")"); + if (i < testEntries.size() - 1) { + sqlQueryBuilder.append(","); + } + } + return sqlQueryBuilder.toString(); + } + + private void quickExecutionSQL(String testingTable) { + try (Connection conn = getConnection(); + Statement stat = conn.createStatement()) { + stat.execute(testingTable); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private Connection getConnection() throws SQLException { + return DriverManager.getConnection( + getMetadata().getJdbcUrl(), + getMetadata().getUsername(), + getMetadata().getPassword()); + } + + @Nonnull + private static StreamExecutionEnvironment getEnvWithRestartStrategyParallelism() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(new RestartStrategies.FallbackRestartStrategyConfiguration()); + + env.setParallelism(TESTING_PARALLELISM); + env.enableCheckpointing(MINIMAL_CHECKPOINT_TIME); + return env; + } + + public static class TestEntry implements Serializable { + public long id; + public long ts; + + public TestEntry(long id, long ts) { + this.id = id; + this.ts = ts; + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + + if (object == null || getClass() != object.getClass()) { + return false; + } + + TestEntry testEntry = (TestEntry) object; + return id == testEntry.id && ts == testEntry.ts; + } + + @Override + public int hashCode() { + return Objects.hash(id, ts); + } + + @Override + public String toString() { + return "TestEntry{" + "id=" + id + ", ts=" + ts + '}'; + } + } + + /** A process function for testing. */ + static class TestingKeyProcessFunction + extends KeyedProcessFunction<Long, TestEntry, TestEntry> { + private transient ListState<TestEntry> listState; + private boolean errorOccurred = false; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + listState = + getRuntimeContext() + .getListState( + new ListStateDescriptor<>( + "collectedElements", TestEntry.class)); + } + + @Override + public void processElement( + TestEntry value, + KeyedProcessFunction<Long, TestEntry, TestEntry>.Context ctx, + Collector<TestEntry> out) + throws Exception { + if (value.id == testEntries.size() / 2 && getRuntimeContext().getAttemptNumber() < 1) { + throw new RuntimeException(); + } + listState.add(value); + if (getRuntimeContext().getAttemptNumber() != 0) { + errorOccurred = true; + } + if (errorOccurred) { + collectedRecords.clear(); + listState.get().forEach(collectedRecords::add); + errorOccurred = false; + } else { + collectedRecords.add(value); + } + if (value.id % 17 == 0) { + sleep(MINIMAL_CHECKPOINT_TIME * 2); + } + } + } + + /** A sink function to collect the records. */ + static class TestingSinkFunction implements SinkFunction<TestEntry> { + + @Override + public void invoke(TestEntry value, Context context) throws Exception { + collectedRecords.add(value); + } + } +} diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializerTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializerTest.java index fe25ebbb..ce7548d2 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializerTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumStateSerializerTest.java @@ -37,15 +37,14 @@ class JdbcSourceEnumStateSerializerTest { private final JdbcSourceEnumeratorState state = new JdbcSourceEnumeratorState( - Arrays.asList(new JdbcSourceSplit("1", "select 1", null, 0, null)), + Arrays.asList(new JdbcSourceSplit("1", "select 1", null, null)), Arrays.asList( new JdbcSourceSplit( "1", "select 1", new Serializable[] {new Integer(0)}, - 10, new CheckpointedOffset(0, 10))), - Arrays.asList(new JdbcSourceSplit("1", "select 1", null, 0, null)), + Arrays.asList(new JdbcSourceSplit("1", "select 1", null, null)), null); private final JdbcSourceEnumeratorState mockedState = new MockedJdbcSourceEnumState(state); private final JdbcSourceEnumStateSerializer serializer = diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorTest.java index 97e31f47..443fb05b 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/enumerator/JdbcSourceEnumeratorTest.java @@ -26,11 +26,14 @@ import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumerator import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import javax.annotation.Nonnull; + import java.io.IOException; import java.io.Serializable; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -93,7 +96,6 @@ class JdbcSourceEnumeratorTest { String.valueOf(splitId++), "select 1", new Serializable[] {0}, - 0, new CheckpointedOffset(0, 0)); } @@ -105,10 +107,12 @@ class JdbcSourceEnumeratorTest { context, new JdbcSqlSplitEnumeratorBase<JdbcSourceSplit>(null) { @Override - public List<JdbcSourceSplit> enumerateSplits() throws IOException { + public @Nonnull List<JdbcSourceSplit> enumerateSplits( + @Nonnull Supplier<Boolean> splitGettable) throws IOException { return Collections.emptyList(); } }, + null, Arrays.stream(splits).collect(Collectors.toList())); } } diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReaderTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReaderTest.java index 91a75805..f549869f 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReaderTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceReaderTest.java @@ -51,7 +51,7 @@ class JdbcSourceReaderTest extends JdbcDataTestBase { final TestingReaderContext context = new TestingReaderContext(); final JdbcSourceReader<String> reader = createReader(context); reader.addSplits( - Collections.singletonList(new JdbcSourceSplit("1", "select 1", null, 0, null))); + Collections.singletonList(new JdbcSourceSplit("1", "select 1", null, null))); reader.start(); reader.close(); assertThat(context.getNumSplitRequests()).isEqualTo(0); diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReaderTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReaderTest.java index 2a730b43..7ebe0e12 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReaderTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReaderTest.java @@ -49,7 +49,7 @@ class JdbcSourceSplitReaderTest extends JdbcDataTestBase { private final JdbcSourceSplit split = new JdbcSourceSplit( - "1", "select id, title, author, price, qty from " + INPUT_TABLE, null, 0, null); + "1", "select id, title, author, price, qty from " + INPUT_TABLE, null, null); private final JdbcConnectionProvider connectionProvider = new SimpleJdbcConnectionProvider( new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializerTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializerTest.java index b7b60afe..9d3f6516 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializerTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/source/split/JdbcSourceSplitSerializerTest.java @@ -30,7 +30,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link JdbcSourceSplitSerializer}. */ class JdbcSourceSplitSerializerTest { - private final JdbcSourceSplit split = new JdbcSourceSplit("1", "select 1", null, 0, null); + private final JdbcSourceSplit split = new JdbcSourceSplit("1", "select 1", null, null); private final JdbcSourceSplit mockedSplit = new MockedJdbcSourceSplit(split); private final JdbcSourceSplitSerializer serializer = new JdbcSourceSplitSerializer(); private final JdbcSourceSplitSerializer mockedSerializer = @@ -73,7 +73,6 @@ class JdbcSourceSplitSerializerTest { jdbcSourceSplit.splitId(), jdbcSourceSplit.getSqlTemplate(), (Serializable[]) jdbcSourceSplit.getParameters(), - jdbcSourceSplit.getOffset(), jdbcSourceSplit.getCheckpointedOffset()); } }