This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
commit f9167c3da0af3c9890f3e0c9ff49faa1868e9f40 Author: lvyanquan <[email protected]> AuthorDate: Tue Mar 25 17:37:17 2025 +0800 [FLINK-36659] Bump version of flink to 2.0.0. --- .github/workflows/backwards_compatibility.yml | 4 +-- .github/workflows/push_pr.yml | 4 +-- .github/workflows/weekly.yml | 15 ++++---- .../pom.xml | 18 ++++++++-- .../backward/compatibility/DataStreamSinkTest.java | 8 ++--- .../compatibility/DataStreamSourceTest.java | 16 ++++----- .../core/database/catalog/AbstractJdbcCatalog.java | 5 ++- .../jdbc/core/datastream/sink/JdbcSink.java | 41 +++++++++++++--------- .../core/datastream/sink/writer/JdbcWriter.java | 22 ++++++------ .../source/reader/JdbcSourceSplitReader.java | 10 +++--- .../jdbc/core/table/sink/JdbcDynamicTableSink.java | 2 +- .../jdbc/internal/GenericJdbcSinkFunction.java | 14 ++++---- .../apache/flink/connector/jdbc/JdbcITCase.java | 8 ++--- .../apache/flink/connector/jdbc/JdbcTestBase.java | 13 +++++-- .../core/datastream/sink/BaseJdbcSinkTest.java | 6 ++-- .../datastream/sink/writer/BaseJdbcWriterTest.java | 17 ++++++--- .../core/datastream/source/JdbcSourceITCase.java | 12 +++---- .../source/JdbcSourceStreamRelatedITCase.java | 20 ++++++----- .../table/sink/JdbcDynamicTableSinkITCase.java | 6 ++-- .../connector/jdbc/internal/JdbcFullTest.java | 14 ++++---- .../jdbc/internal/JdbcOutputSerializerTest.java | 2 +- pom.xml | 3 +- 22 files changed, 151 insertions(+), 109 deletions(-) diff --git a/.github/workflows/backwards_compatibility.yml b/.github/workflows/backwards_compatibility.yml index 139b3d9..94d27b1 100644 --- a/.github/workflows/backwards_compatibility.yml +++ b/.github/workflows/backwards_compatibility.yml @@ -29,8 +29,8 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - flink: [1.18-SNAPSHOT, 1.19-SNAPSHOT] - jdk: [8, 11, 17] + flink: [2.0-SNAPSHOT, 2.1-SNAPSHOT] + jdk: [17] env: MVN_CONNECTION_OPTIONS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index 88cb645..daee806 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -28,8 +28,8 @@ jobs: compile_and_test: strategy: matrix: - flink: [1.20.0] - jdk: [ '8, 11, 17, 21' ] + flink: [2.0.0] + jdk: [ '17' ] uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: flink_version: ${{ matrix.flink }} diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml index 9a27cda..91864b1 100644 --- a/.github/workflows/weekly.yml +++ b/.github/workflows/weekly.yml @@ -30,13 +30,16 @@ jobs: strategy: matrix: flink_branches: [{ - flink: 1.19-SNAPSHOT, - jdk: '8, 11, 17, 21', + flink: 2.1-SNAPSHOT, + jdk: '17', branch: main - }, - { - flink: 1.20-SNAPSHOT, - jdk: '8, 11, 17, 21', + }, { + flink: 2.0-SNAPSHOT, + jdk: '17', + branch: main + }, { + flink: 2.0.0, + jdk: '17', branch: main }, { flink: 1.19.1, diff --git a/flink-connector-jdbc-backward-compatibility/pom.xml b/flink-connector-jdbc-backward-compatibility/pom.xml index 2b14ecf..7937aa6 100644 --- a/flink-connector-jdbc-backward-compatibility/pom.xml +++ b/flink-connector-jdbc-backward-compatibility/pom.xml @@ -17,6 +17,7 @@ <packaging>jar</packaging> <properties> + <postgres.version>42.7.3</postgres.version> <surefire.module.config> --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED @@ -58,17 +59,30 @@ </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-jdbc</artifactId> + <artifactId>flink-connector-jdbc-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-jdbc-postgres</artifactId> <version>${project.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-jdbc</artifactId> + <artifactId>flink-connector-jdbc-postgres</artifactId> <version>${project.version}</version> <type>test-jar</type> <scope>test</scope> </dependency> + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + <version>${postgres.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.testcontainers</groupId> <artifactId>postgresql</artifactId> diff --git a/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSinkTest.java b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSinkTest.java index dcf4dd4..9d26b20 100644 --- a/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSinkTest.java +++ b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSinkTest.java @@ -18,16 +18,16 @@ package org.apache.flink.connector.jdbc.backward.compatibility; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; -import org.apache.flink.connector.jdbc.JdbcSink; +import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink; import org.apache.flink.connector.jdbc.postgres.PostgresTestBase; import org.apache.flink.connector.jdbc.testutils.TableManaged; import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.RestartStrategyUtils; import org.apache.flink.test.junit5.MiniClusterExtension; import org.junit.jupiter.api.Test; @@ -72,7 +72,7 @@ public class DataStreamSinkTest implements PostgresTestBase { @Test public void testAtLeastOnce() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + RestartStrategyUtils.configureNoRestartStrategy(env); env.setParallelism(1); assertResult(new ArrayList<>()); @@ -98,7 +98,7 @@ public class DataStreamSinkTest implements PostgresTestBase { @Test public void testExactlyOnce() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + RestartStrategyUtils.configureNoRestartStrategy(env); env.setParallelism(1); assertResult(new ArrayList<>()); diff --git a/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSourceTest.java b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSourceTest.java index 14a144b..10bd87a 100644 --- a/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSourceTest.java +++ b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSourceTest.java @@ -19,18 +19,18 @@ package org.apache.flink.connector.jdbc.backward.compatibility; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.jdbc.JdbcTestFixture; +import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource; +import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; import org.apache.flink.connector.jdbc.postgres.PostgresTestBase; -import org.apache.flink.connector.jdbc.source.JdbcSource; -import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider; import org.apache.flink.connector.jdbc.testutils.TableManaged; import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; +import org.apache.flink.streaming.util.RestartStrategyUtils; import org.apache.flink.test.junit5.MiniClusterExtension; import org.junit.jupiter.api.BeforeEach; @@ -101,7 +101,7 @@ public class DataStreamSourceTest implements PostgresTestBase { @Test void testReadWithoutParallelismWithoutParamsProvider() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + RestartStrategyUtils.configureNoRestartStrategy(env); env.setParallelism(1); JdbcSource<JdbcTestFixture.TestEntry> jdbcSource = JdbcSource.<JdbcTestFixture.TestEntry>builder() @@ -122,7 +122,7 @@ public class DataStreamSourceTest implements PostgresTestBase { @Test void testReadWithoutParallelismWithParamsProvider() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + RestartStrategyUtils.configureNoRestartStrategy(env); env.setParallelism(1); JdbcSource<JdbcTestFixture.TestEntry> jdbcSource = JdbcSource.<JdbcTestFixture.TestEntry>builder() @@ -146,7 +146,7 @@ public class DataStreamSourceTest implements PostgresTestBase { @Test void testReadWithParallelismWithoutParamsProvider() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + RestartStrategyUtils.configureNoRestartStrategy(env); env.setParallelism(2); JdbcSource<JdbcTestFixture.TestEntry> jdbcSource = JdbcSource.<JdbcTestFixture.TestEntry>builder() @@ -167,7 +167,7 @@ public class DataStreamSourceTest implements PostgresTestBase { @Test void testReadWithParallelismWithParamsProvider() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + RestartStrategyUtils.configureNoRestartStrategy(env); env.setParallelism(2); JdbcSource<JdbcTestFixture.TestEntry> jdbcSource = JdbcSource.<JdbcTestFixture.TestEntry>builder() diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java index 7342459..0eedcf8 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java @@ -305,7 +305,10 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog implements Jdb pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns())); Schema tableSchema = schemaBuilder.build(); - return CatalogTable.of(tableSchema, null, Lists.newArrayList(), getOptions(tablePath)); + return CatalogTable.newBuilder() + .schema(tableSchema) + .options(getOptions(tablePath)) + .build(); } catch (Exception e) { throw new CatalogException( String.format("Failed getting table %s", tablePath.getFullName()), e); diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java index 8c0cf99..5753eb9 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java @@ -20,8 +20,11 @@ package org.apache.flink.connector.jdbc.core.datastream.sink; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.connector.sink2.Committer; -import org.apache.flink.api.connector.sink2.StatefulSink; -import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.api.connector.sink2.CommitterInitContext; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SupportsCommitter; +import org.apache.flink.api.connector.sink2.SupportsWriterState; +import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; @@ -47,7 +50,9 @@ import java.util.Collections; */ @PublicEvolving public class JdbcSink<IN> - implements StatefulSink<IN, JdbcWriterState>, TwoPhaseCommittingSink<IN, JdbcCommitable> { + implements Sink<IN>, + SupportsWriterState<IN, JdbcWriterState>, + SupportsCommitter<JdbcCommitable> { private final DeliveryGuarantee deliveryGuarantee; private final JdbcConnectionProvider connectionProvider; @@ -74,14 +79,28 @@ public class JdbcSink<IN> @Override @Internal - public JdbcWriter<IN> createWriter(InitContext context) throws IOException { + public JdbcWriter<IN> createWriter(WriterInitContext context) throws IOException { return restoreWriter(context, Collections.emptyList()); } + @Override + @Internal + public Committer<JdbcCommitable> createCommitter(CommitterInitContext committerInitContext) + throws IOException { + return new JdbcCommitter(deliveryGuarantee, connectionProvider, exactlyOnceOptions); + } + + @Override + @Internal + public SimpleVersionedSerializer<JdbcCommitable> getCommittableSerializer() { + return new JdbcCommitableSerializer(); + } + @Override @Internal public JdbcWriter<IN> restoreWriter( - InitContext context, Collection<JdbcWriterState> recoveredState) throws IOException { + WriterInitContext context, Collection<JdbcWriterState> recoveredState) + throws IOException { JdbcOutputSerializer<IN> outputSerializer = JdbcOutputSerializer.of( context.createInputSerializer(), context.isObjectReuseEnabled()); @@ -96,18 +115,6 @@ public class JdbcSink<IN> context); } - @Override - @Internal - public Committer<JdbcCommitable> createCommitter() throws IOException { - return new JdbcCommitter(deliveryGuarantee, connectionProvider, exactlyOnceOptions); - } - - @Override - @Internal - public SimpleVersionedSerializer<JdbcCommitable> getCommittableSerializer() { - return new JdbcCommitableSerializer(); - } - @Override @Internal public SimpleVersionedSerializer<JdbcWriterState> getWriterStateSerializer() { diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriter.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriter.java index be7aa49..c412b92 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriter.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriter.java @@ -18,9 +18,9 @@ package org.apache.flink.connector.jdbc.core.datastream.sink.writer; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.api.connector.sink2.StatefulSink; -import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.api.connector.sink2.CommittingSinkWriter; +import org.apache.flink.api.connector.sink2.InitContext; +import org.apache.flink.api.connector.sink2.StatefulSinkWriter; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; @@ -55,8 +55,8 @@ import static org.apache.flink.util.Preconditions.checkState; */ @Internal public class JdbcWriter<IN> - implements StatefulSink.StatefulSinkWriter<IN, JdbcWriterState>, - TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, JdbcCommitable> { + implements StatefulSinkWriter<IN, JdbcWriterState>, + CommittingSinkWriter<IN, JdbcCommitable> { private static final Logger LOG = LoggerFactory.getLogger(JdbcWriter.class); @@ -75,7 +75,7 @@ public class JdbcWriter<IN> JdbcOutputSerializer<IN> outputSerializer, DeliveryGuarantee deliveryGuarantee, Collection<JdbcWriterState> recoveredState, - Sink.InitContext initContext) + InitContext initContext) throws IOException { this.deliveryGuarantee = @@ -85,9 +85,7 @@ public class JdbcWriter<IN> pendingRecords = false; this.lastCheckpointId = - initContext - .getRestoredCheckpointId() - .orElse(Sink.InitContext.INITIAL_CHECKPOINT_ID - 1); + initContext.getRestoredCheckpointId().orElse(InitContext.INITIAL_CHECKPOINT_ID - 1); checkNotNull(connectionProvider, "connectionProvider must be defined"); @@ -106,9 +104,9 @@ public class JdbcWriter<IN> TransactionId transactionId = TransactionId.create( - initContext.getJobId().getBytes(), - initContext.getSubtaskId(), - initContext.getNumberOfParallelSubtasks()); + initContext.getJobInfo().getJobId().getBytes(), + initContext.getTaskInfo().getIndexOfThisSubtask(), + initContext.getTaskInfo().getNumberOfParallelSubtasks()); this.jdbcTransaction = new XaTransaction( diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReader.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReader.java index 7e1af42..2ffca4b 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReader.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReader.java @@ -105,15 +105,15 @@ public class JdbcSourceSplitReader<T> this.config = Preconditions.checkNotNull(config); this.typeInformation = Preconditions.checkNotNull(typeInformation); this.connectionProvider = Preconditions.checkNotNull(connectionProvider); - this.resultSetType = config.getInteger(RESULTSET_TYPE); - this.resultSetConcurrency = config.getInteger(RESULTSET_CONCURRENCY); - this.resultSetFetchSize = config.getInteger(RESULTSET_FETCH_SIZE); - this.autoCommit = config.getBoolean(AUTO_COMMIT); + this.resultSetType = config.get(RESULTSET_TYPE); + this.resultSetConcurrency = config.get(RESULTSET_CONCURRENCY); + this.resultSetFetchSize = config.get(RESULTSET_FETCH_SIZE); + this.autoCommit = config.get(AUTO_COMMIT); this.deliveryGuarantee = Preconditions.checkNotNull(deliveryGuarantee); this.splits = new ArrayDeque<>(); this.hasNextRecordCurrentSplit = false; this.currentSplit = null; - int splitReaderFetchBatchSize = config.getInteger(READER_FETCH_BATCH_SIZE); + int splitReaderFetchBatchSize = config.get(READER_FETCH_BATCH_SIZE); Preconditions.checkArgument( splitReaderFetchBatchSize > 0 && splitReaderFetchBatchSize < Integer.MAX_VALUE); this.splitReaderFetchBatchSize = splitReaderFetchBatchSize; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSink.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSink.java index 7b2c2f7..bf85091 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSink.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSink.java @@ -25,7 +25,7 @@ import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOp import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; -import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.connector.sink.legacy.SinkFunctionProvider; import org.apache.flink.table.types.DataType; import org.apache.flink.types.RowKind; diff --git a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java index 43e08fe..c031419 100644 --- a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java +++ b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java @@ -20,13 +20,13 @@ package org.apache.flink.connector.jdbc.internal; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; -import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction; import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; @@ -45,11 +45,10 @@ public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T> } @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); + public void open(OpenContext openContext) throws Exception { + super.open(openContext); // Recheck if execution config change - serializer.withObjectReuseEnabled( - getRuntimeContext().getExecutionConfig().isObjectReuseEnabled()); + serializer.withObjectReuseEnabled(getRuntimeContext().isObjectReuseEnabled()); outputFormat.open(serializer); } @@ -76,6 +75,7 @@ public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T> public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { this.serializer = JdbcOutputSerializer.of( - ((TypeInformation<T>) type).createSerializer(executionConfig)); + ((TypeInformation<T>) type) + .createSerializer(executionConfig.getSerializerConfig())); } } diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java index 3b4c60e..ccc4b5f 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java @@ -17,7 +17,6 @@ package org.apache.flink.connector.jdbc; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider; import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction; @@ -25,7 +24,8 @@ import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat; import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; +import org.apache.flink.streaming.util.RestartStrategyUtils; import org.apache.flink.util.function.FunctionWithException; import org.junit.jupiter.api.Test; @@ -70,7 +70,7 @@ public class JdbcITCase extends JdbcTestBase implements JdbcITCaseBase { @Test void testInsert() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + RestartStrategyUtils.configureNoRestartStrategy(env); env.setParallelism(1); env.fromElements(TEST_DATA) .addSink( @@ -92,7 +92,7 @@ public class JdbcITCase extends JdbcTestBase implements JdbcITCaseBase { configuration.set(OBJECT_REUSE, true); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); - env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + RestartStrategyUtils.configureNoRestartStrategy(env); env.setParallelism(1); AtomicInteger counter = new AtomicInteger(0); diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java index e016ad1..5bb49ae 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java @@ -19,6 +19,10 @@ package org.apache.flink.connector.jdbc; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobInfo; +import org.apache.flink.api.common.JobInfoImpl; +import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.api.common.TaskInfoImpl; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -49,13 +53,16 @@ public abstract class JdbcTestBase implements DerbyTestBase { public static RuntimeContext getRuntimeContext(Boolean reused) { ExecutionConfig config = getExecutionConfig(reused); RuntimeContext context = Mockito.mock(RuntimeContext.class); - doReturn(config).when(context).getExecutionConfig(); + doReturn(config.isObjectReuseEnabled()).when(context).isObjectReuseEnabled(); return context; } public static RuntimeContext getRuntimeContext(JobID jobId) { RuntimeContext context = getRuntimeContext(false); - doReturn(jobId).when(context).getJobId(); + JobInfo jobInfo = new JobInfoImpl(jobId, "test_jobName"); + TaskInfo taskInfo = new TaskInfoImpl("test_taskName", 4, 1, 4, 0); + doReturn(jobInfo).when(context).getJobInfo(); + doReturn(taskInfo).when(context).getTaskInfo(); return context; } @@ -71,6 +78,6 @@ public abstract class JdbcTestBase implements DerbyTestBase { public static <T> TypeSerializer<T> getSerializer( TypeInformation<T> type, Boolean objectReused) { - return type.createSerializer(getExecutionConfig(objectReused)); + return type.createSerializer(getExecutionConfig(objectReused).getSerializerConfig()); } } diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java index 219b374..48fe94f 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java @@ -17,11 +17,11 @@ package org.apache.flink.connector.jdbc.core.datastream.sink; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.connector.jdbc.derby.DerbyTestBase; import org.apache.flink.connector.jdbc.testutils.TableManaged; import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.RestartStrategyUtils; import org.junit.jupiter.api.Test; @@ -62,7 +62,7 @@ public abstract class BaseJdbcSinkTest implements DerbyTestBase { @Test public void testInsert() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + RestartStrategyUtils.configureNoRestartStrategy(env); env.setParallelism(1); assertResult(new ArrayList<>()); @@ -82,7 +82,7 @@ public abstract class BaseJdbcSinkTest implements DerbyTestBase { @Test public void testInsertWithObjectReuse() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + RestartStrategyUtils.configureNoRestartStrategy(env); env.setParallelism(1); assertResult(new ArrayList<>()); diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/BaseJdbcWriterTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/BaseJdbcWriterTest.java index cfcc203..301a64d 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/BaseJdbcWriterTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/BaseJdbcWriterTest.java @@ -1,8 +1,10 @@ package org.apache.flink.connector.jdbc.core.datastream.sink.writer; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.common.JobInfo; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; @@ -14,6 +16,7 @@ import org.apache.flink.connector.jdbc.derby.DerbyTestBase; import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer; import org.apache.flink.connector.jdbc.testutils.TableManaged; import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable; +import org.apache.flink.connector.testutils.source.TestingTaskInfo; import org.apache.flink.util.StringUtils; import org.junit.jupiter.api.AfterEach; @@ -38,7 +41,7 @@ abstract class BaseJdbcWriterTest implements DerbyTestBase { private static final String JOBID = "6b64d8a9a951e2e8767ae952ad951706"; private static final String GLOBAL_TID = - String.format("%s000000000000000000000000000000000000", JOBID); + String.format("%s000000010000000000000000000000000000", JOBID); protected static final BooksTable TEST_TABLE = new BooksTable("WriterTable"); protected static final List<BooksTable.BookEntry> BOOKS = @@ -68,8 +71,12 @@ abstract class BaseJdbcWriterTest implements DerbyTestBase { @BeforeEach void init() throws Exception { // We have to mock this because we have changes between 1.18 and 1.19 - Sink.InitContext sinkContext = Mockito.mock(Sink.InitContext.class); - doReturn(JobID.fromHexString(JOBID)).when(sinkContext).getJobId(); + WriterInitContext sinkContext = Mockito.mock(WriterInitContext.class); + JobInfo jobInfo = Mockito.mock(JobInfo.class); + doReturn(jobInfo).when(sinkContext).getJobInfo(); + doReturn(JobID.fromHexString(JOBID)).when(jobInfo).getJobId(); + TaskInfo taskInfo = new TestingTaskInfo("test_task", 4, 1, 4, 0, "test_subTask", "id"); + doReturn(taskInfo).when(sinkContext).getTaskInfo(); JdbcOutputSerializer<BooksTable.BookEntry> outputSerializer = JdbcOutputSerializer.of( @@ -97,7 +104,7 @@ abstract class BaseJdbcWriterTest implements DerbyTestBase { } protected String withBranch(long checkpointId) { - return String.format("00000000000000000000000%s00", checkpointId); + return String.format("00000004000000000000000%s00", checkpointId); } protected void checkCommitable(JdbcCommitable actual, String branchExpected) { diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java index 2021094..a22e685 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java @@ -19,13 +19,13 @@ package org.apache.flink.connector.jdbc.core.datastream.source; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.jdbc.JdbcDataTestBase; import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider; import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; +import org.apache.flink.streaming.util.RestartStrategyUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -56,7 +56,7 @@ class JdbcSourceITCase extends JdbcDataTestBase implements JdbcITCaseBase { @Test void testReadWithoutParallelismWithoutParamsProvider() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + RestartStrategyUtils.configureNoRestartStrategy(env); env.setParallelism(1); JdbcSource<TestEntry> jdbcSource = JdbcSource.<TestEntry>builder() @@ -75,7 +75,7 @@ class JdbcSourceITCase extends JdbcDataTestBase implements JdbcITCaseBase { @Test void testReadWithoutParallelismWithParamsProvider() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + RestartStrategyUtils.configureNoRestartStrategy(env); env.setParallelism(1); JdbcSource<TestEntry> jdbcSource = JdbcSource.<TestEntry>builder() @@ -97,7 +97,7 @@ class JdbcSourceITCase extends JdbcDataTestBase implements JdbcITCaseBase { @Test void testReadWithParallelismWithoutParamsProvider() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + RestartStrategyUtils.configureNoRestartStrategy(env); env.setParallelism(2); JdbcSource<TestEntry> jdbcSource = JdbcSource.<TestEntry>builder() @@ -116,7 +116,7 @@ class JdbcSourceITCase extends JdbcDataTestBase implements JdbcITCaseBase { @Test void testReadWithParallelismWithParamsProvider() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + RestartStrategyUtils.configureNoRestartStrategy(env); env.setParallelism(2); JdbcSource<TestEntry> jdbcSource = JdbcSource.<TestEntry>builder() diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceStreamRelatedITCase.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceStreamRelatedITCase.java index fffc749..23814ee 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceStreamRelatedITCase.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceStreamRelatedITCase.java @@ -20,12 +20,13 @@ package org.apache.flink.connector.jdbc.core.datastream.source; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.jdbc.core.datastream.source.config.ContinuousUnBoundingSettings; import org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor; @@ -34,7 +35,7 @@ import org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider; import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; import org.apache.flink.test.junit5.InjectClusterClient; import org.apache.flink.util.Collector; @@ -288,8 +289,10 @@ class JdbcSourceStreamRelatedITCase implements DerbyTestBase, JdbcITCaseBase { @Nonnull private static StreamExecutionEnvironment getEnvWithRestartStrategyParallelism() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRestartStrategy(new RestartStrategies.FallbackRestartStrategyConfiguration()); + Configuration configuration = new Configuration(); + configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay"); + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); env.setParallelism(TESTING_PARALLELISM); env.enableCheckpointing(MINIMAL_CHECKPOINT_TIME); @@ -337,8 +340,8 @@ class JdbcSourceStreamRelatedITCase implements DerbyTestBase, JdbcITCaseBase { private boolean errorOccurred = false; @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); + public void open(OpenContext openContext) throws Exception { + super.open(openContext); listState = getRuntimeContext() .getListState( @@ -352,11 +355,12 @@ class JdbcSourceStreamRelatedITCase implements DerbyTestBase, JdbcITCaseBase { KeyedProcessFunction<Long, TestEntry, TestEntry>.Context ctx, Collector<TestEntry> out) throws Exception { - if (value.id == testEntries.size() / 2 && getRuntimeContext().getAttemptNumber() < 1) { + if (value.id == testEntries.size() / 2 + && getRuntimeContext().getTaskInfo().getAttemptNumber() < 1) { throw new RuntimeException(); } listState.add(value); - if (getRuntimeContext().getAttemptNumber() != 0) { + if (getRuntimeContext().getTaskInfo().getAttemptNumber() != 0) { errorOccurred = true; } if (errorOccurred) { diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSinkITCase.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSinkITCase.java index f95fd3c..963d01d 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSinkITCase.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSinkITCase.java @@ -18,8 +18,8 @@ package org.apache.flink.connector.jdbc.core.table.sink; +import org.apache.flink.api.common.functions.DefaultOpenContext; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.jdbc.JdbcTestBase; import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction; import org.apache.flink.connector.jdbc.testutils.DatabaseTest; @@ -37,7 +37,7 @@ import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.sink.DynamicTableSink; -import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.connector.sink.legacy.SinkFunctionProvider; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.factories.TestValuesTableFactory; @@ -386,7 +386,7 @@ public abstract class JdbcDynamicTableSinkITCase extends AbstractTestBase implem sinkFunction.setRuntimeContext(new MockStreamingRuntimeContext(true, 1, 0)); sinkFunction.setInputType( TypeInformation.of(GenericRowData.class), JdbcTestBase.getExecutionConfig(false)); - sinkFunction.open(new Configuration()); + sinkFunction.open(new DefaultOpenContext()); sinkFunction.invoke(GenericRowData.of(1L), SinkContextUtil.forTimestamp(1)); sinkFunction.invoke(GenericRowData.of(2L), SinkContextUtil.forTimestamp(1)); diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java index 43846b0..9e54c44 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java @@ -20,8 +20,6 @@ package org.apache.flink.connector.jdbc.internal; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.jdbc.JdbcDataTestBase; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; @@ -31,6 +29,8 @@ import org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnecti import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; import org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions; import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; import org.junit.jupiter.api.AfterEach; @@ -107,7 +107,7 @@ class JdbcFullTest extends JdbcDataTestBase { } private void runTest(boolean exploitParallelism) throws Exception { - ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); JdbcInputFormat.JdbcInputFormatBuilder inputBuilder = JdbcInputFormat.buildJdbcInputFormat() .setDrivername(getMetadata().getDriverClass()) @@ -127,7 +127,7 @@ class JdbcFullTest extends JdbcDataTestBase { new JdbcNumericBetweenParametersProvider(min, max) .ofBatchSize(fetchSize)); } - DataSet<Row> source = environment.createInput(inputBuilder.finish()); + DataStreamSource<Row> source = env.createInput(inputBuilder.finish()); // NOTE: in this case (with Derby driver) setSqlTypes could be skipped, but // some databases don't null values correctly when no column type was specified @@ -153,8 +153,8 @@ class JdbcFullTest extends JdbcDataTestBase { Types.DOUBLE, Types.INTEGER })); - source.output(new TestOutputFormat(jdbcOutputFormat)); - environment.execute(); + source.forward().writeUsingOutputFormat(new TestOutputFormat(jdbcOutputFormat)); + env.execute(); try (Connection dbConn = DriverManager.getConnection(getMetadata().getJdbcUrl()); PreparedStatement statement = dbConn.prepareStatement(SELECT_ALL_NEWBOOKS); @@ -193,7 +193,7 @@ class JdbcFullTest extends JdbcDataTestBase { public void configure(Configuration configuration) {} @Override - public void open(int i, int i1) throws IOException { + public void open(InitializationContext initializationContext) throws IOException { JdbcOutputSerializer<Row> serializer = JdbcOutputSerializer.of(getSerializer(TypeInformation.of(Row.class), true)); this.jdbcOutputFormat.open(serializer); diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcOutputSerializerTest.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcOutputSerializerTest.java index 3360401..1030c29 100644 --- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcOutputSerializerTest.java +++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcOutputSerializerTest.java @@ -15,7 +15,7 @@ class JdbcOutputSerializerTest { void testSerializer() { TypeInformation<Row> typeInformation = TypeInformation.of(Row.class); TypeSerializer<Row> typeSerializer = - typeInformation.createSerializer(new ExecutionConfig()); + typeInformation.createSerializer(new ExecutionConfig().getSerializerConfig()); JdbcOutputSerializer<Row> serializer = JdbcOutputSerializer.of(typeSerializer); Row original = Row.of(123); diff --git a/pom.xml b/pom.xml index f1d63f6..e083a2d 100644 --- a/pom.xml +++ b/pom.xml @@ -43,7 +43,6 @@ under the License. </scm> <modules> - <module>flink-connector-jdbc</module> <module>flink-connector-jdbc-architecture</module> <module>flink-connector-jdbc-core</module> <module>flink-connector-jdbc-cratedb</module> @@ -57,7 +56,7 @@ under the License. </modules> <properties> - <flink.version>1.20.0</flink.version> + <flink.version>2.0.0</flink.version> <scala.binary.version>2.12</scala.binary.version> <scala-library.version>2.12.7</scala-library.version> <jackson-bom.version>2.13.4.20221013</jackson-bom.version>
