This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit f9167c3da0af3c9890f3e0c9ff49faa1868e9f40
Author: lvyanquan <[email protected]>
AuthorDate: Tue Mar 25 17:37:17 2025 +0800

    [FLINK-36659] Bump version of flink to 2.0.0.
---
 .github/workflows/backwards_compatibility.yml      |  4 +--
 .github/workflows/push_pr.yml                      |  4 +--
 .github/workflows/weekly.yml                       | 15 ++++----
 .../pom.xml                                        | 18 ++++++++--
 .../backward/compatibility/DataStreamSinkTest.java |  8 ++---
 .../compatibility/DataStreamSourceTest.java        | 16 ++++-----
 .../core/database/catalog/AbstractJdbcCatalog.java |  5 ++-
 .../jdbc/core/datastream/sink/JdbcSink.java        | 41 +++++++++++++---------
 .../core/datastream/sink/writer/JdbcWriter.java    | 22 ++++++------
 .../source/reader/JdbcSourceSplitReader.java       | 10 +++---
 .../jdbc/core/table/sink/JdbcDynamicTableSink.java |  2 +-
 .../jdbc/internal/GenericJdbcSinkFunction.java     | 14 ++++----
 .../apache/flink/connector/jdbc/JdbcITCase.java    |  8 ++---
 .../apache/flink/connector/jdbc/JdbcTestBase.java  | 13 +++++--
 .../core/datastream/sink/BaseJdbcSinkTest.java     |  6 ++--
 .../datastream/sink/writer/BaseJdbcWriterTest.java | 17 ++++++---
 .../core/datastream/source/JdbcSourceITCase.java   | 12 +++----
 .../source/JdbcSourceStreamRelatedITCase.java      | 20 ++++++-----
 .../table/sink/JdbcDynamicTableSinkITCase.java     |  6 ++--
 .../connector/jdbc/internal/JdbcFullTest.java      | 14 ++++----
 .../jdbc/internal/JdbcOutputSerializerTest.java    |  2 +-
 pom.xml                                            |  3 +-
 22 files changed, 151 insertions(+), 109 deletions(-)

diff --git a/.github/workflows/backwards_compatibility.yml 
b/.github/workflows/backwards_compatibility.yml
index 139b3d9..94d27b1 100644
--- a/.github/workflows/backwards_compatibility.yml
+++ b/.github/workflows/backwards_compatibility.yml
@@ -29,8 +29,8 @@ jobs:
     runs-on: ubuntu-latest
     strategy:
       matrix:
-        flink: [1.18-SNAPSHOT, 1.19-SNAPSHOT]
-        jdk: [8, 11, 17]
+        flink: [2.0-SNAPSHOT, 2.1-SNAPSHOT]
+        jdk: [17]
 
     env:
       MVN_CONNECTION_OPTIONS: -Dhttp.keepAlive=false 
-Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index 88cb645..daee806 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -28,8 +28,8 @@ jobs:
   compile_and_test:
     strategy:
       matrix:
-        flink: [1.20.0]
-        jdk: [ '8, 11, 17, 21' ]
+        flink: [2.0.0]
+        jdk: [ '17' ]
     uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
     with:
       flink_version: ${{ matrix.flink }}
diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml
index 9a27cda..91864b1 100644
--- a/.github/workflows/weekly.yml
+++ b/.github/workflows/weekly.yml
@@ -30,13 +30,16 @@ jobs:
     strategy:
       matrix:
         flink_branches: [{
-          flink: 1.19-SNAPSHOT,
-          jdk: '8, 11, 17, 21',
+          flink: 2.1-SNAPSHOT,
+          jdk: '17',
           branch: main
-        },
-        {
-          flink: 1.20-SNAPSHOT,
-          jdk: '8, 11, 17, 21',
+        }, {
+          flink: 2.0-SNAPSHOT,
+          jdk: '17',
+          branch: main
+        }, {
+          flink: 2.0.0,
+          jdk: '17',
           branch: main
         }, {
           flink: 1.19.1,
diff --git a/flink-connector-jdbc-backward-compatibility/pom.xml 
b/flink-connector-jdbc-backward-compatibility/pom.xml
index 2b14ecf..7937aa6 100644
--- a/flink-connector-jdbc-backward-compatibility/pom.xml
+++ b/flink-connector-jdbc-backward-compatibility/pom.xml
@@ -17,6 +17,7 @@
     <packaging>jar</packaging>
 
     <properties>
+        <postgres.version>42.7.3</postgres.version>
         <surefire.module.config>
             --add-opens=java.base/java.util=ALL-UNNAMED
             --add-opens=java.base/java.lang=ALL-UNNAMED
@@ -58,17 +59,30 @@
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-jdbc</artifactId>
+            <artifactId>flink-connector-jdbc-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-jdbc-postgres</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-jdbc</artifactId>
+            <artifactId>flink-connector-jdbc-postgres</artifactId>
             <version>${project.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>${postgres.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>postgresql</artifactId>
diff --git 
a/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSinkTest.java
 
b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSinkTest.java
index dcf4dd4..9d26b20 100644
--- 
a/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSinkTest.java
+++ 
b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSinkTest.java
@@ -18,16 +18,16 @@
 
 package org.apache.flink.connector.jdbc.backward.compatibility;
 
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
 import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
-import org.apache.flink.connector.jdbc.JdbcSink;
+import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink;
 import org.apache.flink.connector.jdbc.postgres.PostgresTestBase;
 import org.apache.flink.connector.jdbc.testutils.TableManaged;
 import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.RestartStrategyUtils;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 
 import org.junit.jupiter.api.Test;
@@ -72,7 +72,7 @@ public class DataStreamSinkTest implements PostgresTestBase {
     @Test
     public void testAtLeastOnce() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        RestartStrategyUtils.configureNoRestartStrategy(env);
         env.setParallelism(1);
 
         assertResult(new ArrayList<>());
@@ -98,7 +98,7 @@ public class DataStreamSinkTest implements PostgresTestBase {
     @Test
     public void testExactlyOnce() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        RestartStrategyUtils.configureNoRestartStrategy(env);
         env.setParallelism(1);
 
         assertResult(new ArrayList<>());
diff --git 
a/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSourceTest.java
 
b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSourceTest.java
index 14a144b..10bd87a 100644
--- 
a/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSourceTest.java
+++ 
b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSourceTest.java
@@ -19,18 +19,18 @@
 package org.apache.flink.connector.jdbc.backward.compatibility;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.connector.jdbc.JdbcTestFixture;
+import org.apache.flink.connector.jdbc.core.datastream.source.JdbcSource;
+import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
 import org.apache.flink.connector.jdbc.postgres.PostgresTestBase;
-import org.apache.flink.connector.jdbc.source.JdbcSource;
-import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
 import 
org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
 import org.apache.flink.connector.jdbc.testutils.TableManaged;
 import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
+import org.apache.flink.streaming.util.RestartStrategyUtils;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -101,7 +101,7 @@ public class DataStreamSourceTest implements 
PostgresTestBase {
     @Test
     void testReadWithoutParallelismWithoutParamsProvider() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        RestartStrategyUtils.configureNoRestartStrategy(env);
         env.setParallelism(1);
         JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
                 JdbcSource.<JdbcTestFixture.TestEntry>builder()
@@ -122,7 +122,7 @@ public class DataStreamSourceTest implements 
PostgresTestBase {
     @Test
     void testReadWithoutParallelismWithParamsProvider() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        RestartStrategyUtils.configureNoRestartStrategy(env);
         env.setParallelism(1);
         JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
                 JdbcSource.<JdbcTestFixture.TestEntry>builder()
@@ -146,7 +146,7 @@ public class DataStreamSourceTest implements 
PostgresTestBase {
     @Test
     void testReadWithParallelismWithoutParamsProvider() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        RestartStrategyUtils.configureNoRestartStrategy(env);
         env.setParallelism(2);
         JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
                 JdbcSource.<JdbcTestFixture.TestEntry>builder()
@@ -167,7 +167,7 @@ public class DataStreamSourceTest implements 
PostgresTestBase {
     @Test
     void testReadWithParallelismWithParamsProvider() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        RestartStrategyUtils.configureNoRestartStrategy(env);
         env.setParallelism(2);
         JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
                 JdbcSource.<JdbcTestFixture.TestEntry>builder()
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
index 7342459..0eedcf8 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java
@@ -305,7 +305,10 @@ public abstract class AbstractJdbcCatalog extends 
AbstractCatalog implements Jdb
                     pk -> schemaBuilder.primaryKeyNamed(pk.getName(), 
pk.getColumns()));
             Schema tableSchema = schemaBuilder.build();
 
-            return CatalogTable.of(tableSchema, null, Lists.newArrayList(), 
getOptions(tablePath));
+            return CatalogTable.newBuilder()
+                    .schema(tableSchema)
+                    .options(getOptions(tablePath))
+                    .build();
         } catch (Exception e) {
             throw new CatalogException(
                     String.format("Failed getting table %s", 
tablePath.getFullName()), e);
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java
index 8c0cf99..5753eb9 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/JdbcSink.java
@@ -20,8 +20,11 @@ package org.apache.flink.connector.jdbc.core.datastream.sink;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.CommitterInitContext;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SupportsCommitter;
+import org.apache.flink.api.connector.sink2.SupportsWriterState;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
@@ -47,7 +50,9 @@ import java.util.Collections;
  */
 @PublicEvolving
 public class JdbcSink<IN>
-        implements StatefulSink<IN, JdbcWriterState>, 
TwoPhaseCommittingSink<IN, JdbcCommitable> {
+        implements Sink<IN>,
+                SupportsWriterState<IN, JdbcWriterState>,
+                SupportsCommitter<JdbcCommitable> {
 
     private final DeliveryGuarantee deliveryGuarantee;
     private final JdbcConnectionProvider connectionProvider;
@@ -74,14 +79,28 @@ public class JdbcSink<IN>
 
     @Override
     @Internal
-    public JdbcWriter<IN> createWriter(InitContext context) throws IOException 
{
+    public JdbcWriter<IN> createWriter(WriterInitContext context) throws 
IOException {
         return restoreWriter(context, Collections.emptyList());
     }
 
+    @Override
+    @Internal
+    public Committer<JdbcCommitable> createCommitter(CommitterInitContext 
committerInitContext)
+            throws IOException {
+        return new JdbcCommitter(deliveryGuarantee, connectionProvider, 
exactlyOnceOptions);
+    }
+
+    @Override
+    @Internal
+    public SimpleVersionedSerializer<JdbcCommitable> 
getCommittableSerializer() {
+        return new JdbcCommitableSerializer();
+    }
+
     @Override
     @Internal
     public JdbcWriter<IN> restoreWriter(
-            InitContext context, Collection<JdbcWriterState> recoveredState) 
throws IOException {
+            WriterInitContext context, Collection<JdbcWriterState> 
recoveredState)
+            throws IOException {
         JdbcOutputSerializer<IN> outputSerializer =
                 JdbcOutputSerializer.of(
                         context.createInputSerializer(), 
context.isObjectReuseEnabled());
@@ -96,18 +115,6 @@ public class JdbcSink<IN>
                 context);
     }
 
-    @Override
-    @Internal
-    public Committer<JdbcCommitable> createCommitter() throws IOException {
-        return new JdbcCommitter(deliveryGuarantee, connectionProvider, 
exactlyOnceOptions);
-    }
-
-    @Override
-    @Internal
-    public SimpleVersionedSerializer<JdbcCommitable> 
getCommittableSerializer() {
-        return new JdbcCommitableSerializer();
-    }
-
     @Override
     @Internal
     public SimpleVersionedSerializer<JdbcWriterState> 
getWriterStateSerializer() {
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriter.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriter.java
index be7aa49..c412b92 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriter.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/JdbcWriter.java
@@ -18,9 +18,9 @@
 package org.apache.flink.connector.jdbc.core.datastream.sink.writer;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
+import org.apache.flink.api.connector.sink2.InitContext;
+import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
@@ -55,8 +55,8 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 @Internal
 public class JdbcWriter<IN>
-        implements StatefulSink.StatefulSinkWriter<IN, JdbcWriterState>,
-                TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, 
JdbcCommitable> {
+        implements StatefulSinkWriter<IN, JdbcWriterState>,
+                CommittingSinkWriter<IN, JdbcCommitable> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(JdbcWriter.class);
 
@@ -75,7 +75,7 @@ public class JdbcWriter<IN>
             JdbcOutputSerializer<IN> outputSerializer,
             DeliveryGuarantee deliveryGuarantee,
             Collection<JdbcWriterState> recoveredState,
-            Sink.InitContext initContext)
+            InitContext initContext)
             throws IOException {
 
         this.deliveryGuarantee =
@@ -85,9 +85,7 @@ public class JdbcWriter<IN>
 
         pendingRecords = false;
         this.lastCheckpointId =
-                initContext
-                        .getRestoredCheckpointId()
-                        .orElse(Sink.InitContext.INITIAL_CHECKPOINT_ID - 1);
+                
initContext.getRestoredCheckpointId().orElse(InitContext.INITIAL_CHECKPOINT_ID 
- 1);
 
         checkNotNull(connectionProvider, "connectionProvider must be defined");
 
@@ -106,9 +104,9 @@ public class JdbcWriter<IN>
 
             TransactionId transactionId =
                     TransactionId.create(
-                            initContext.getJobId().getBytes(),
-                            initContext.getSubtaskId(),
-                            initContext.getNumberOfParallelSubtasks());
+                            initContext.getJobInfo().getJobId().getBytes(),
+                            initContext.getTaskInfo().getIndexOfThisSubtask(),
+                            
initContext.getTaskInfo().getNumberOfParallelSubtasks());
 
             this.jdbcTransaction =
                     new XaTransaction(
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReader.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReader.java
index 7e1af42..2ffca4b 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReader.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/reader/JdbcSourceSplitReader.java
@@ -105,15 +105,15 @@ public class JdbcSourceSplitReader<T>
         this.config = Preconditions.checkNotNull(config);
         this.typeInformation = Preconditions.checkNotNull(typeInformation);
         this.connectionProvider = 
Preconditions.checkNotNull(connectionProvider);
-        this.resultSetType = config.getInteger(RESULTSET_TYPE);
-        this.resultSetConcurrency = config.getInteger(RESULTSET_CONCURRENCY);
-        this.resultSetFetchSize = config.getInteger(RESULTSET_FETCH_SIZE);
-        this.autoCommit = config.getBoolean(AUTO_COMMIT);
+        this.resultSetType = config.get(RESULTSET_TYPE);
+        this.resultSetConcurrency = config.get(RESULTSET_CONCURRENCY);
+        this.resultSetFetchSize = config.get(RESULTSET_FETCH_SIZE);
+        this.autoCommit = config.get(AUTO_COMMIT);
         this.deliveryGuarantee = Preconditions.checkNotNull(deliveryGuarantee);
         this.splits = new ArrayDeque<>();
         this.hasNextRecordCurrentSplit = false;
         this.currentSplit = null;
-        int splitReaderFetchBatchSize = 
config.getInteger(READER_FETCH_BATCH_SIZE);
+        int splitReaderFetchBatchSize = config.get(READER_FETCH_BATCH_SIZE);
         Preconditions.checkArgument(
                 splitReaderFetchBatchSize > 0 && splitReaderFetchBatchSize < 
Integer.MAX_VALUE);
         this.splitReaderFetchBatchSize = splitReaderFetchBatchSize;
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSink.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSink.java
index 7b2c2f7..bf85091 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSink.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSink.java
@@ -25,7 +25,7 @@ import 
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOp
 import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.legacy.SinkFunctionProvider;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.RowKind;
 
diff --git 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java
 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java
index 43e08fe..c031419 100644
--- 
a/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java
+++ 
b/flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/internal/GenericJdbcSinkFunction.java
@@ -20,13 +20,13 @@ package org.apache.flink.connector.jdbc.internal;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
@@ -45,11 +45,10 @@ public class GenericJdbcSinkFunction<T> extends 
RichSinkFunction<T>
     }
 
     @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
+    public void open(OpenContext openContext) throws Exception {
+        super.open(openContext);
         // Recheck if execution config change
-        serializer.withObjectReuseEnabled(
-                
getRuntimeContext().getExecutionConfig().isObjectReuseEnabled());
+        
serializer.withObjectReuseEnabled(getRuntimeContext().isObjectReuseEnabled());
         outputFormat.open(serializer);
     }
 
@@ -76,6 +75,7 @@ public class GenericJdbcSinkFunction<T> extends 
RichSinkFunction<T>
     public void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
         this.serializer =
                 JdbcOutputSerializer.of(
-                        ((TypeInformation<T>) 
type).createSerializer(executionConfig));
+                        ((TypeInformation<T>) type)
+                                
.createSerializer(executionConfig.getSerializerConfig()));
     }
 }
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
index 3b4c60e..ccc4b5f 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.connector.jdbc;
 
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnectionProvider;
 import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
@@ -25,7 +24,8 @@ import 
org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
 import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
 import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
+import org.apache.flink.streaming.util.RestartStrategyUtils;
 import org.apache.flink.util.function.FunctionWithException;
 
 import org.junit.jupiter.api.Test;
@@ -70,7 +70,7 @@ public class JdbcITCase extends JdbcTestBase implements 
JdbcITCaseBase {
     @Test
     void testInsert() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        RestartStrategyUtils.configureNoRestartStrategy(env);
         env.setParallelism(1);
         env.fromElements(TEST_DATA)
                 .addSink(
@@ -92,7 +92,7 @@ public class JdbcITCase extends JdbcTestBase implements 
JdbcITCaseBase {
         configuration.set(OBJECT_REUSE, true);
         StreamExecutionEnvironment env =
                 
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
-        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        RestartStrategyUtils.configureNoRestartStrategy(env);
         env.setParallelism(1);
 
         AtomicInteger counter = new AtomicInteger(0);
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java
index e016ad1..5bb49ae 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/JdbcTestBase.java
@@ -19,6 +19,10 @@ package org.apache.flink.connector.jdbc;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobInfo;
+import org.apache.flink.api.common.JobInfoImpl;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.TaskInfoImpl;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -49,13 +53,16 @@ public abstract class JdbcTestBase implements DerbyTestBase 
{
     public static RuntimeContext getRuntimeContext(Boolean reused) {
         ExecutionConfig config = getExecutionConfig(reused);
         RuntimeContext context = Mockito.mock(RuntimeContext.class);
-        doReturn(config).when(context).getExecutionConfig();
+        
doReturn(config.isObjectReuseEnabled()).when(context).isObjectReuseEnabled();
         return context;
     }
 
     public static RuntimeContext getRuntimeContext(JobID jobId) {
         RuntimeContext context = getRuntimeContext(false);
-        doReturn(jobId).when(context).getJobId();
+        JobInfo jobInfo = new JobInfoImpl(jobId, "test_jobName");
+        TaskInfo taskInfo = new TaskInfoImpl("test_taskName", 4, 1, 4, 0);
+        doReturn(jobInfo).when(context).getJobInfo();
+        doReturn(taskInfo).when(context).getTaskInfo();
         return context;
     }
 
@@ -71,6 +78,6 @@ public abstract class JdbcTestBase implements DerbyTestBase {
 
     public static <T> TypeSerializer<T> getSerializer(
             TypeInformation<T> type, Boolean objectReused) {
-        return type.createSerializer(getExecutionConfig(objectReused));
+        return 
type.createSerializer(getExecutionConfig(objectReused).getSerializerConfig());
     }
 }
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java
index 219b374..48fe94f 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/BaseJdbcSinkTest.java
@@ -17,11 +17,11 @@
 
 package org.apache.flink.connector.jdbc.core.datastream.sink;
 
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.connector.jdbc.derby.DerbyTestBase;
 import org.apache.flink.connector.jdbc.testutils.TableManaged;
 import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.RestartStrategyUtils;
 
 import org.junit.jupiter.api.Test;
 
@@ -62,7 +62,7 @@ public abstract class BaseJdbcSinkTest implements 
DerbyTestBase {
     @Test
     public void testInsert() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        RestartStrategyUtils.configureNoRestartStrategy(env);
         env.setParallelism(1);
 
         assertResult(new ArrayList<>());
@@ -82,7 +82,7 @@ public abstract class BaseJdbcSinkTest implements 
DerbyTestBase {
     @Test
     public void testInsertWithObjectReuse() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        RestartStrategyUtils.configureNoRestartStrategy(env);
         env.setParallelism(1);
 
         assertResult(new ArrayList<>());
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/BaseJdbcWriterTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/BaseJdbcWriterTest.java
index cfcc203..301a64d 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/BaseJdbcWriterTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/sink/writer/BaseJdbcWriterTest.java
@@ -1,8 +1,10 @@
 package org.apache.flink.connector.jdbc.core.datastream.sink.writer;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.common.JobInfo;
+import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
@@ -14,6 +16,7 @@ import org.apache.flink.connector.jdbc.derby.DerbyTestBase;
 import org.apache.flink.connector.jdbc.internal.JdbcOutputSerializer;
 import org.apache.flink.connector.jdbc.testutils.TableManaged;
 import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
+import org.apache.flink.connector.testutils.source.TestingTaskInfo;
 import org.apache.flink.util.StringUtils;
 
 import org.junit.jupiter.api.AfterEach;
@@ -38,7 +41,7 @@ abstract class BaseJdbcWriterTest implements DerbyTestBase {
 
     private static final String JOBID = "6b64d8a9a951e2e8767ae952ad951706";
     private static final String GLOBAL_TID =
-            String.format("%s000000000000000000000000000000000000", JOBID);
+            String.format("%s000000010000000000000000000000000000", JOBID);
     protected static final BooksTable TEST_TABLE = new 
BooksTable("WriterTable");
 
     protected static final List<BooksTable.BookEntry> BOOKS =
@@ -68,8 +71,12 @@ abstract class BaseJdbcWriterTest implements DerbyTestBase {
     @BeforeEach
     void init() throws Exception {
         // We have to mock this because we have changes between 1.18 and 1.19
-        Sink.InitContext sinkContext = Mockito.mock(Sink.InitContext.class);
-        doReturn(JobID.fromHexString(JOBID)).when(sinkContext).getJobId();
+        WriterInitContext sinkContext = Mockito.mock(WriterInitContext.class);
+        JobInfo jobInfo = Mockito.mock(JobInfo.class);
+        doReturn(jobInfo).when(sinkContext).getJobInfo();
+        doReturn(JobID.fromHexString(JOBID)).when(jobInfo).getJobId();
+        TaskInfo taskInfo = new TestingTaskInfo("test_task", 4, 1, 4, 0, 
"test_subTask", "id");
+        doReturn(taskInfo).when(sinkContext).getTaskInfo();
 
         JdbcOutputSerializer<BooksTable.BookEntry> outputSerializer =
                 JdbcOutputSerializer.of(
@@ -97,7 +104,7 @@ abstract class BaseJdbcWriterTest implements DerbyTestBase {
     }
 
     protected String withBranch(long checkpointId) {
-        return String.format("00000000000000000000000%s00", checkpointId);
+        return String.format("00000004000000000000000%s00", checkpointId);
     }
 
     protected void checkCommitable(JdbcCommitable actual, String 
branchExpected) {
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java
index 2021094..a22e685 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceITCase.java
@@ -19,13 +19,13 @@
 package org.apache.flink.connector.jdbc.core.datastream.source;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.connector.jdbc.JdbcDataTestBase;
 import 
org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
 import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
+import org.apache.flink.streaming.util.RestartStrategyUtils;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -56,7 +56,7 @@ class JdbcSourceITCase extends JdbcDataTestBase implements 
JdbcITCaseBase {
     @Test
     void testReadWithoutParallelismWithoutParamsProvider() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        RestartStrategyUtils.configureNoRestartStrategy(env);
         env.setParallelism(1);
         JdbcSource<TestEntry> jdbcSource =
                 JdbcSource.<TestEntry>builder()
@@ -75,7 +75,7 @@ class JdbcSourceITCase extends JdbcDataTestBase implements 
JdbcITCaseBase {
     @Test
     void testReadWithoutParallelismWithParamsProvider() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        RestartStrategyUtils.configureNoRestartStrategy(env);
         env.setParallelism(1);
         JdbcSource<TestEntry> jdbcSource =
                 JdbcSource.<TestEntry>builder()
@@ -97,7 +97,7 @@ class JdbcSourceITCase extends JdbcDataTestBase implements 
JdbcITCaseBase {
     @Test
     void testReadWithParallelismWithoutParamsProvider() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        RestartStrategyUtils.configureNoRestartStrategy(env);
         env.setParallelism(2);
         JdbcSource<TestEntry> jdbcSource =
                 JdbcSource.<TestEntry>builder()
@@ -116,7 +116,7 @@ class JdbcSourceITCase extends JdbcDataTestBase implements 
JdbcITCaseBase {
     @Test
     void testReadWithParallelismWithParamsProvider() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        RestartStrategyUtils.configureNoRestartStrategy(env);
         env.setParallelism(2);
         JdbcSource<TestEntry> jdbcSource =
                 JdbcSource.<TestEntry>builder()
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceStreamRelatedITCase.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceStreamRelatedITCase.java
index fffc749..23814ee 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceStreamRelatedITCase.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/JdbcSourceStreamRelatedITCase.java
@@ -20,12 +20,13 @@ package 
org.apache.flink.connector.jdbc.core.datastream.source;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import 
org.apache.flink.connector.jdbc.core.datastream.source.config.ContinuousUnBoundingSettings;
 import 
org.apache.flink.connector.jdbc.core.datastream.source.reader.extractor.ResultExtractor;
@@ -34,7 +35,7 @@ import 
org.apache.flink.connector.jdbc.split.JdbcSlideTimingParameterProvider;
 import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
 import org.apache.flink.test.junit5.InjectClusterClient;
 import org.apache.flink.util.Collector;
 
@@ -288,8 +289,10 @@ class JdbcSourceStreamRelatedITCase implements 
DerbyTestBase, JdbcITCaseBase {
 
     @Nonnull
     private static StreamExecutionEnvironment 
getEnvWithRestartStrategyParallelism() {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRestartStrategy(new 
RestartStrategies.FallbackRestartStrategyConfiguration());
+        Configuration configuration = new Configuration();
+        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, 
"fixed-delay");
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 
         env.setParallelism(TESTING_PARALLELISM);
         env.enableCheckpointing(MINIMAL_CHECKPOINT_TIME);
@@ -337,8 +340,8 @@ class JdbcSourceStreamRelatedITCase implements 
DerbyTestBase, JdbcITCaseBase {
         private boolean errorOccurred = false;
 
         @Override
-        public void open(Configuration parameters) throws Exception {
-            super.open(parameters);
+        public void open(OpenContext openContext) throws Exception {
+            super.open(openContext);
             listState =
                     getRuntimeContext()
                             .getListState(
@@ -352,11 +355,12 @@ class JdbcSourceStreamRelatedITCase implements 
DerbyTestBase, JdbcITCaseBase {
                 KeyedProcessFunction<Long, TestEntry, TestEntry>.Context ctx,
                 Collector<TestEntry> out)
                 throws Exception {
-            if (value.id == testEntries.size() / 2 && 
getRuntimeContext().getAttemptNumber() < 1) {
+            if (value.id == testEntries.size() / 2
+                    && getRuntimeContext().getTaskInfo().getAttemptNumber() < 
1) {
                 throw new RuntimeException();
             }
             listState.add(value);
-            if (getRuntimeContext().getAttemptNumber() != 0) {
+            if (getRuntimeContext().getTaskInfo().getAttemptNumber() != 0) {
                 errorOccurred = true;
             }
             if (errorOccurred) {
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSinkITCase.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSinkITCase.java
index f95fd3c..963d01d 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSinkITCase.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/table/sink/JdbcDynamicTableSinkITCase.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.connector.jdbc.core.table.sink;
 
+import org.apache.flink.api.common.functions.DefaultOpenContext;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.jdbc.JdbcTestBase;
 import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
 import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
@@ -37,7 +37,7 @@ import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.sink.legacy.SinkFunctionProvider;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
@@ -386,7 +386,7 @@ public abstract class JdbcDynamicTableSinkITCase extends 
AbstractTestBase implem
         sinkFunction.setRuntimeContext(new MockStreamingRuntimeContext(true, 
1, 0));
         sinkFunction.setInputType(
                 TypeInformation.of(GenericRowData.class), 
JdbcTestBase.getExecutionConfig(false));
-        sinkFunction.open(new Configuration());
+        sinkFunction.open(new DefaultOpenContext());
         sinkFunction.invoke(GenericRowData.of(1L), 
SinkContextUtil.forTimestamp(1));
         sinkFunction.invoke(GenericRowData.of(2L), 
SinkContextUtil.forTimestamp(1));
 
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
index 43846b0..9e54c44 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcFullTest.java
@@ -20,8 +20,6 @@ package org.apache.flink.connector.jdbc.internal;
 
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.jdbc.JdbcDataTestBase;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
@@ -31,6 +29,8 @@ import 
org.apache.flink.connector.jdbc.datasource.connections.SimpleJdbcConnecti
 import 
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
 import 
org.apache.flink.connector.jdbc.internal.options.InternalJdbcConnectionOptions;
 import 
org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.types.Row;
 
 import org.junit.jupiter.api.AfterEach;
@@ -107,7 +107,7 @@ class JdbcFullTest extends JdbcDataTestBase {
     }
 
     private void runTest(boolean exploitParallelism) throws Exception {
-        ExecutionEnvironment environment = 
ExecutionEnvironment.getExecutionEnvironment();
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         JdbcInputFormat.JdbcInputFormatBuilder inputBuilder =
                 JdbcInputFormat.buildJdbcInputFormat()
                         .setDrivername(getMetadata().getDriverClass())
@@ -127,7 +127,7 @@ class JdbcFullTest extends JdbcDataTestBase {
                                     new 
JdbcNumericBetweenParametersProvider(min, max)
                                             .ofBatchSize(fetchSize));
         }
-        DataSet<Row> source = environment.createInput(inputBuilder.finish());
+        DataStreamSource<Row> source = env.createInput(inputBuilder.finish());
 
         // NOTE: in this case (with Derby driver) setSqlTypes could be 
skipped, but
         // some databases don't null values correctly when no column type was 
specified
@@ -153,8 +153,8 @@ class JdbcFullTest extends JdbcDataTestBase {
                                             Types.DOUBLE,
                                             Types.INTEGER
                                         }));
-        source.output(new TestOutputFormat(jdbcOutputFormat));
-        environment.execute();
+        source.forward().writeUsingOutputFormat(new 
TestOutputFormat(jdbcOutputFormat));
+        env.execute();
 
         try (Connection dbConn = 
DriverManager.getConnection(getMetadata().getJdbcUrl());
                 PreparedStatement statement = 
dbConn.prepareStatement(SELECT_ALL_NEWBOOKS);
@@ -193,7 +193,7 @@ class JdbcFullTest extends JdbcDataTestBase {
         public void configure(Configuration configuration) {}
 
         @Override
-        public void open(int i, int i1) throws IOException {
+        public void open(InitializationContext initializationContext) throws 
IOException {
             JdbcOutputSerializer<Row> serializer =
                     
JdbcOutputSerializer.of(getSerializer(TypeInformation.of(Row.class), true));
             this.jdbcOutputFormat.open(serializer);
diff --git 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcOutputSerializerTest.java
 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcOutputSerializerTest.java
index 3360401..1030c29 100644
--- 
a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcOutputSerializerTest.java
+++ 
b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/internal/JdbcOutputSerializerTest.java
@@ -15,7 +15,7 @@ class JdbcOutputSerializerTest {
     void testSerializer() {
         TypeInformation<Row> typeInformation = TypeInformation.of(Row.class);
         TypeSerializer<Row> typeSerializer =
-                typeInformation.createSerializer(new ExecutionConfig());
+                typeInformation.createSerializer(new 
ExecutionConfig().getSerializerConfig());
         JdbcOutputSerializer<Row> serializer = 
JdbcOutputSerializer.of(typeSerializer);
 
         Row original = Row.of(123);
diff --git a/pom.xml b/pom.xml
index f1d63f6..e083a2d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,7 +43,6 @@ under the License.
     </scm>
 
     <modules>
-        <module>flink-connector-jdbc</module>
         <module>flink-connector-jdbc-architecture</module>
         <module>flink-connector-jdbc-core</module>
         <module>flink-connector-jdbc-cratedb</module>
@@ -57,7 +56,7 @@ under the License.
     </modules>
 
     <properties>
-        <flink.version>1.20.0</flink.version>
+        <flink.version>2.0.0</flink.version>
         <scala.binary.version>2.12</scala.binary.version>
         <scala-library.version>2.12.7</scala-library.version>
         <jackson-bom.version>2.13.4.20221013</jackson-bom.version>

Reply via email to