This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d2fc39774 [flink] Support Flink 2.0 (#5408)
1d2fc39774 is described below

commit 1d2fc39774f6c77bb3697dcdc4202ca47c1bdd4b
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Thu Apr 10 11:21:00 2025 +0800

    [flink] Support Flink 2.0 (#5408)
---
 ...ink-jdk11.yml => e2e-tests-flink-1.x-jdk11.yml} |  8 +-
 ...e2e-tests-flink.yml => e2e-tests-flink-1.x.yml} |  8 +-
 ...sts-flink.yml => e2e-tests-flink-2.x-jdk11.yml} | 18 ++---
 ...link-jdk11.yml => utitcase-flink-1.x-jdk11.yml} |  6 +-
 .../{utitcase-flink.yml => utitcase-flink-1.x.yml} |  6 +-
 ...link-jdk11.yml => utitcase-flink-2.x-jdk11.yml} | 22 +++---
 paimon-e2e-tests/pom.xml                           |  7 +-
 .../paimon/tests/FlinkActionsWithKafkaE2eTest.java |  8 ++
 .../paimon/tests/FlinkProceduresE2eTest.java       |  4 +-
 .../org/apache/paimon/tests/LogStoreE2eTest.java   |  8 ++
 .../apache/paimon/tests/cdc/MySql57E2eTest.java    |  3 +
 .../apache/paimon/tests/cdc/MySql80E2eTest.java    |  3 +
 .../paimon/tests/cdc/MySqlCdcE2eTestBase.java      |  6 ++
 .../tests/cdc/MySqlComputedColumnE2ETest.java      |  2 +
 .../tests/cdc/MySqlTinyIntConvertE2ETest.java      |  2 +
 paimon-flink/paimon-flink-2.0/pom.xml              | 86 ++++++++++++++++++++++
 paimon-flink/paimon-flink-common/pom.xml           | 17 ++++-
 .../apache/paimon/flink/FormatCatalogTable.java    |  2 +
 .../apache/paimon/flink/action/ActionFactory.java  |  4 +-
 .../paimon/flink/factories/FlinkFactoryUtil.java   |  2 -
 .../flink/procedure/QueryServiceProcedure.java     |  2 +-
 .../paimon/flink/service/QueryAddressRegister.java |  1 +
 .../apache/paimon/flink/sink/LogSinkFunction.java  |  2 +-
 .../flink/sink/RowDataStoreWriteOperator.java      |  2 +-
 .../org/apache/paimon/flink/FlinkCatalogTest.java  | 65 ++++++++++------
 .../org/apache/paimon/flink/FlinkTestBase.java     | 11 +--
 .../paimon/flink/PredicateConverterTest.java       |  2 +-
 .../apache/paimon/flink/SchemaChangeITCase.java    |  2 +-
 ...tNewFilesCompactionCoordinatorOperatorTest.java |  3 +-
 .../paimon/flink/sink/LocalMergeOperatorTest.java  |  4 +
 .../sink/partition/PartitionMarkDoneTest.java      | 21 ++++++
 .../flink/source/operator/OperatorSourceTest.java  |  4 +
 .../source/operator/TestingSourceOperator.java     |  2 +-
 paimon-flink/paimon-flink1-common/pom.xml          | 69 +++++++++++++++++
 .../api/functions/sink/legacy/SinkFunction.java    | 16 ++--
 .../flink/action/MultipleParameterToolAdapter.java |  4 +-
 .../api/common/state/v2/ListStateDescriptor.java   | 16 +---
 .../api/common/state/v2/MapStateDescriptor.java    | 16 +---
 .../apache/flink/runtime/event/WatermarkEvent.java | 16 +---
 .../operator/AbstractTestingSourceOperator.java    | 58 +++++++++++++++
 paimon-flink/paimon-flink2-common/pom.xml          | 76 +++++++++++++++++++
 .../flink/action/MultipleParameterToolAdapter.java |  6 +-
 .../operator/AbstractTestingSourceOperator.java    | 63 ++++++++++++++++
 paimon-flink/pom.xml                               | 18 +----
 paimon-hive/paimon-hive-connector-common/pom.xml   |  2 +-
 pom.xml                                            | 47 ++++++++++++
 46 files changed, 596 insertions(+), 154 deletions(-)

diff --git a/.github/workflows/e2e-tests-flink-jdk11.yml 
b/.github/workflows/e2e-tests-flink-1.x-jdk11.yml
similarity index 84%
rename from .github/workflows/e2e-tests-flink-jdk11.yml
rename to .github/workflows/e2e-tests-flink-1.x-jdk11.yml
index 6674ec4e4f..8c0c0ce420 100644
--- a/.github/workflows/e2e-tests-flink-jdk11.yml
+++ b/.github/workflows/e2e-tests-flink-1.x-jdk11.yml
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-name: End to End Tests Flink on JDK 11
+name: End to End Tests Flink 1.x on JDK 11
 
 on:
   issue_comment:
@@ -52,7 +52,7 @@ jobs:
           distribution: 'temurin'
 
       - name: Build Flink
-        run:  mvn -T 1C -B clean install -DskipTests -pl paimon-e2e-tests -am 
-Pflink-${{ matrix.flink_version }},java11
+        run:  mvn -T 1C -B clean install -DskipTests -Pflink1 -pl 
paimon-e2e-tests -am -Pflink-${{ matrix.flink_version }},java11
 
       - name: Test Flink
         run: |
@@ -62,9 +62,9 @@ jobs:
           echo "JVM timezone is set to $jvm_timezone"
           profile="flink-${{ matrix.flink_version }}"
           if [ "${{ matrix.flink_version }}" = "${{ matrix.flink_version[-1] 
}}" ]; then
-            mvn -T 1C -B test -pl paimon-e2e-tests 
-Duser.timezone=$jvm_timezone -Pjava11
+            mvn -T 1C -B test -Pflink1 -pl paimon-e2e-tests 
-Duser.timezone=$jvm_timezone -Pjava11
           else
-            mvn -T 1C -B test -pl paimon-e2e-tests 
-Duser.timezone=$jvm_timezone -P${profile},java11
+            mvn -T 1C -B test -Pflink1 -pl paimon-e2e-tests 
-Duser.timezone=$jvm_timezone -P${profile},java11
           fi
         env:
           MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/e2e-tests-flink.yml 
b/.github/workflows/e2e-tests-flink-1.x.yml
similarity index 86%
copy from .github/workflows/e2e-tests-flink.yml
copy to .github/workflows/e2e-tests-flink-1.x.yml
index 4779efb6e1..1431ebd5cc 100644
--- a/.github/workflows/e2e-tests-flink.yml
+++ b/.github/workflows/e2e-tests-flink-1.x.yml
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-name: End to End Tests Flink
+name: End to End Tests Flink 1.x
 
 on:
   push:
@@ -52,7 +52,7 @@ jobs:
           distribution: 'temurin'
 
       - name: Build Flink
-        run:  mvn -T 2C -B clean install -DskipTests -pl paimon-e2e-tests -am 
-Pflink-${{ matrix.flink_version }}
+        run:  mvn -T 2C -B clean install -DskipTests -Pflink1 -pl 
paimon-e2e-tests -am -Pflink-${{ matrix.flink_version }}
 
       - name: Test Flink
         run: |
@@ -62,9 +62,9 @@ jobs:
           echo "JVM timezone is set to $jvm_timezone"
           profile="flink-${{ matrix.flink_version }}"
           if [ "${{ matrix.flink_version }}" = "${{ matrix.flink_version[-1] 
}}" ]; then
-            mvn -T 1C -B test -pl paimon-e2e-tests 
-Duser.timezone=$jvm_timezone
+            mvn -T 1C -B test -Pflink1 -pl paimon-e2e-tests 
-Duser.timezone=$jvm_timezone
           else
-            mvn -T 1C -B test -pl paimon-e2e-tests 
-Duser.timezone=$jvm_timezone -P${profile}
+            mvn -T 1C -B test -Pflink1 -pl paimon-e2e-tests 
-Duser.timezone=$jvm_timezone -P${profile}
           fi
         env:
           MAVEN_OPTS: -Xmx4096m
diff --git a/.github/workflows/e2e-tests-flink.yml 
b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
similarity index 77%
rename from .github/workflows/e2e-tests-flink.yml
rename to .github/workflows/e2e-tests-flink-2.x-jdk11.yml
index 4779efb6e1..c1667c4826 100644
--- a/.github/workflows/e2e-tests-flink.yml
+++ b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-name: End to End Tests Flink
+name: End to End Tests Flink 2.x on JDK 11
 
 on:
   push:
@@ -26,11 +26,7 @@ on:
       - '**/*.md'
 
 env:
-  JDK_VERSION: 8
-
-concurrency:
-  group: ${{ github.workflow }}-${{ github.event_name }}-${{ 
github.event.number || github.run_id }}
-  cancel-in-progress: true
+  JDK_VERSION: 11
 
 jobs:
   build_test:
@@ -40,7 +36,7 @@ jobs:
       fail-fast: true
       matrix:
         # Last element should be the current default flink version
-        flink_version: [ '1.15', '1.16', '1.17', '1.18', '1.19', '1.20' ]
+        flink_version: [ '2.0' ]
     steps:
       - name: Checkout code
         uses: actions/checkout@v4
@@ -52,7 +48,7 @@ jobs:
           distribution: 'temurin'
 
       - name: Build Flink
-        run:  mvn -T 2C -B clean install -DskipTests -pl paimon-e2e-tests -am 
-Pflink-${{ matrix.flink_version }}
+        run:  mvn -T 1C -B clean install -DskipTests -Pflink2 -pl 
paimon-e2e-tests -am -Pflink-${{ matrix.flink_version }},java11
 
       - name: Test Flink
         run: |
@@ -62,9 +58,9 @@ jobs:
           echo "JVM timezone is set to $jvm_timezone"
           profile="flink-${{ matrix.flink_version }}"
           if [ "${{ matrix.flink_version }}" = "${{ matrix.flink_version[-1] 
}}" ]; then
-            mvn -T 1C -B test -pl paimon-e2e-tests 
-Duser.timezone=$jvm_timezone
+            mvn -T 1C -B test -Pflink2 -pl paimon-e2e-tests 
-Duser.timezone=$jvm_timezone -Pjava11
           else
-            mvn -T 1C -B test -pl paimon-e2e-tests 
-Duser.timezone=$jvm_timezone -P${profile}
+            mvn -T 1C -B test -Pflink2 -pl paimon-e2e-tests 
-Duser.timezone=$jvm_timezone -P${profile},java11
           fi
         env:
-          MAVEN_OPTS: -Xmx4096m
+          MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/utitcase-flink-jdk11.yml 
b/.github/workflows/utitcase-flink-1.x-jdk11.yml
similarity index 89%
copy from .github/workflows/utitcase-flink-jdk11.yml
copy to .github/workflows/utitcase-flink-1.x-jdk11.yml
index 794e386fba..66aafb2ade 100644
--- a/.github/workflows/utitcase-flink-jdk11.yml
+++ b/.github/workflows/utitcase-flink-1.x-jdk11.yml
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-name: UTCase and ITCase Flink on JDK 11
+name: UTCase and ITCase Flink 1.x on JDK 11
 
 on:
   issue_comment:
@@ -45,7 +45,7 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'temurin'
       - name: Build Flink
-        run:  mvn -T 1C -B clean install -DskipTests
+        run:  mvn -T 1C -B clean install -DskipTests -Pflink1
       - name: Test Flink
         run: |
           # run tests with random timezone to find out timezone related bugs
@@ -57,6 +57,6 @@ jobs:
           test_modules+="org.apache.paimon:paimon-flink-${suffix},"
           done
           test_modules="${test_modules%,}"
-          mvn -T 1C -B clean install -pl "${test_modules}" 
-Duser.timezone=$jvm_timezone
+          mvn -T 1C -B clean install -Pflink1 -pl "${test_modules}" 
-Duser.timezone=$jvm_timezone
         env:
           MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/utitcase-flink.yml 
b/.github/workflows/utitcase-flink-1.x.yml
similarity index 91%
rename from .github/workflows/utitcase-flink.yml
rename to .github/workflows/utitcase-flink-1.x.yml
index b094158595..fe249a45d3 100644
--- a/.github/workflows/utitcase-flink.yml
+++ b/.github/workflows/utitcase-flink-1.x.yml
@@ -15,7 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-name: UTCase and ITCase Flink
+name: UTCase and ITCase Flink 1.x
 
 on:
   push:
@@ -54,7 +54,7 @@ jobs:
         run: |
           COMPILE_MODULE="org.apache.paimon:paimon-flink-${{ 
matrix.flink_version }}"
           echo "Start compiling modules: $COMPILE_MODULE"
-          mvn -T 2C -B clean install -DskipTests -pl "${COMPILE_MODULE}" -am
+          mvn -T 2C -B clean install -DskipTests -Pflink1 -pl 
"${COMPILE_MODULE}" -am
 
       - name: Test Flink
         run: |
@@ -63,7 +63,7 @@ jobs:
           echo "JVM timezone is set to $jvm_timezone"
           TEST_MODULE="org.apache.paimon:paimon-flink-${{ matrix.flink_version 
}}" 
           echo "Start testing module: $TEST_MODULE"
-          mvn -T 2C -B test verify -pl "${TEST_MODULE}" 
-Duser.timezone=$jvm_timezone
+          mvn -T 2C -B test verify -Pflink1 -pl "${TEST_MODULE}" 
-Duser.timezone=$jvm_timezone
           echo "All modules tested"
         env:
           MAVEN_OPTS: -Xmx4096m -XX:+UseG1GC -XX:CICompilerCount=2
\ No newline at end of file
diff --git a/.github/workflows/utitcase-flink-jdk11.yml 
b/.github/workflows/utitcase-flink-2.x-jdk11.yml
similarity index 76%
rename from .github/workflows/utitcase-flink-jdk11.yml
rename to .github/workflows/utitcase-flink-2.x-jdk11.yml
index 794e386fba..44a0b42ad9 100644
--- a/.github/workflows/utitcase-flink-jdk11.yml
+++ b/.github/workflows/utitcase-flink-2.x-jdk11.yml
@@ -16,24 +16,20 @@
 # limitations under the License.
 
################################################################################
 
-name: UTCase and ITCase Flink on JDK 11
+name: UTCase and ITCase Flink 2.x on JDK 11
 
 on:
-  issue_comment:
-    types: [created, edited, deleted]
-
-  # daily run
-  schedule:
-    - cron: "0 0 * * *"
+  push:
+  pull_request:
+    paths-ignore:
+      - 'docs/**'
+      - '**/*.md'
 
 env:
   JDK_VERSION: 11
 
 jobs:
   build:
-    if: |
-      github.event_name == 'schedule' ||
-      (contains(github.event.comment.html_url, '/pull/') && 
contains(github.event.comment.body, '/jdk11'))
     runs-on: ubuntu-latest
 
     steps:
@@ -45,7 +41,7 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'temurin'
       - name: Build Flink
-        run:  mvn -T 1C -B clean install -DskipTests
+        run:  mvn -T 1C -B clean install -DskipTests -Pflink2
       - name: Test Flink
         run: |
           # run tests with random timezone to find out timezone related bugs
@@ -53,10 +49,10 @@ jobs:
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
           test_modules=""
-          for suffix in 1.15 1.16 1.17 1.18 1.19 1.20 common; do
+          for suffix in 2.0 common; do
           test_modules+="org.apache.paimon:paimon-flink-${suffix},"
           done
           test_modules="${test_modules%,}"
-          mvn -T 1C -B clean install -pl "${test_modules}" 
-Duser.timezone=$jvm_timezone
+          mvn -T 1C -B clean install -Pflink2 -pl "${test_modules}" 
-Duser.timezone=$jvm_timezone
         env:
           MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml
index a965e42afb..d7eba5eaa1 100644
--- a/paimon-e2e-tests/pom.xml
+++ b/paimon-e2e-tests/pom.xml
@@ -239,7 +239,7 @@ under the License.
                         <artifactItem>
                             <groupId>org.apache.flink</groupId>
                             
<artifactId>${flink.sql.connector.hive}</artifactId>
-                            <version>${test.flink.version}</version>
+                            
<version>${test.flink.connector.hive.version}</version>
                             
<destFileName>flink-sql-connector-hive.jar</destFileName>
                             <type>jar</type>
                             <overWrite>true</overWrite>
@@ -301,6 +301,7 @@ under the License.
                 <test.flink.version>1.19.1</test.flink.version>
                 
<test.flink.connector.kafka.version>3.2.0-${test.flink.main.version}</test.flink.connector.kafka.version>
                 
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.9_${scala.binary.version}</flink.sql.connector.hive>
+                
<test.flink.connector.hive.version>${test.flink.version}</test.flink.connector.hive.version>
             </properties>
         </profile>
 
@@ -311,6 +312,7 @@ under the License.
                 <test.flink.version>1.18.1</test.flink.version>
                 
<test.flink.connector.kafka.version>3.0.1-${test.flink.main.version}</test.flink.connector.kafka.version>
                 
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.9_${scala.binary.version}</flink.sql.connector.hive>
+                
<test.flink.connector.hive.version>${test.flink.version}</test.flink.connector.hive.version>
             </properties>
         </profile>
 
@@ -321,6 +323,7 @@ under the License.
                 <test.flink.version>1.17.2</test.flink.version>
                 
<test.flink.connector.kafka.version>${test.flink.version}</test.flink.connector.kafka.version>
                 
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.9_${scala.binary.version}</flink.sql.connector.hive>
+                
<test.flink.connector.hive.version>${test.flink.version}</test.flink.connector.hive.version>
             </properties>
         </profile>
 
@@ -331,6 +334,7 @@ under the License.
                 <test.flink.version>1.16.3</test.flink.version>
                 
<test.flink.connector.kafka.version>${test.flink.version}</test.flink.connector.kafka.version>
                 
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.9_${scala.binary.version}</flink.sql.connector.hive>
+                
<test.flink.connector.hive.version>${test.flink.version}</test.flink.connector.hive.version>
             </properties>
         </profile>
 
@@ -341,6 +345,7 @@ under the License.
                 <test.flink.version>1.15.3</test.flink.version>
                 
<test.flink.connector.kafka.version>${test.flink.version}</test.flink.connector.kafka.version>
                 
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.6_${scala.binary.version}</flink.sql.connector.hive>
+                
<test.flink.connector.hive.version>${test.flink.version}</test.flink.connector.hive.version>
             </properties>
         </profile>
 
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsWithKafkaE2eTest.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsWithKafkaE2eTest.java
index bab41112c7..b4eb099fdc 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsWithKafkaE2eTest.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsWithKafkaE2eTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.tests;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
@@ -26,10 +27,17 @@ import org.testcontainers.containers.Container;
 import java.util.UUID;
 
 /** Tests for {@code FlinkActions}. */
+@EnabledIf("runTest")
 public class FlinkActionsWithKafkaE2eTest extends FlinkActionsE2eTestBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkActionsWithKafkaE2eTest.class);
 
+    private static boolean runTest() {
+        // TODO: modify the following condition after paimon-flink-cdc 
supports flink 2.0
+        String flinkVersion = System.getProperty("test.flink.main.version");
+        return flinkVersion.compareTo("2.0") < 0;
+    }
+
     public FlinkActionsWithKafkaE2eTest() {
         super(true, false);
     }
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkProceduresE2eTest.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkProceduresE2eTest.java
index 23742dfe40..a50b149f2b 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkProceduresE2eTest.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkProceduresE2eTest.java
@@ -28,7 +28,9 @@ import java.util.UUID;
 public class FlinkProceduresE2eTest extends E2eTestBase {
 
     private static boolean runTest() {
-        return System.getProperty("test.flink.main.version").compareTo("1.18") 
>= 0;
+        // TODO: modify the following condition after paimon-flink-cdc 
supports flink 2.0
+        String flinkVersion = System.getProperty("test.flink.main.version");
+        return flinkVersion.compareTo("1.18") >= 0 && 
flinkVersion.compareTo("2.0") < 0;
     }
 
     public FlinkProceduresE2eTest() {
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/LogStoreE2eTest.java 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/LogStoreE2eTest.java
index 294f4dd433..c40fb9e6f8 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/LogStoreE2eTest.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/LogStoreE2eTest.java
@@ -20,16 +20,24 @@ package org.apache.paimon.tests;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIf;
 
 import java.util.UUID;
 
 /** Tests for reading and writing log store in stream jobs. */
+@EnabledIf("runTest")
 public class LogStoreE2eTest extends E2eTestBase {
 
     private String topicName;
 
     private int bucketNum;
 
+    private static boolean runTest() {
+        // TODO: modify the following condition after paimon-flink-cdc 
supports flink 2.0
+        String flinkVersion = System.getProperty("test.flink.main.version");
+        return flinkVersion.compareTo("2.0") < 0;
+    }
+
     public LogStoreE2eTest() {
         super(true, false);
     }
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
index 732f1eee24..e083f6d85f 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
@@ -20,10 +20,13 @@ package org.apache.paimon.tests.cdc;
 
 import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
 
+import org.junit.jupiter.api.condition.EnabledIf;
+
 /**
  * E2e tests for {@link 
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction} with MySQL
  * 5.7.
  */
+@EnabledIf("runTest")
 public class MySql57E2eTest extends MySqlCdcE2eTestBase {
 
     public MySql57E2eTest() {
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql80E2eTest.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql80E2eTest.java
index c0ecd90d76..97624e4bb7 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql80E2eTest.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql80E2eTest.java
@@ -20,10 +20,13 @@ package org.apache.paimon.tests.cdc;
 
 import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
 
+import org.junit.jupiter.api.condition.EnabledIf;
+
 /**
  * E2e tests for {@link 
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction} with MySQL
  * 8.0.
  */
+@EnabledIf("runTest")
 public class MySql80E2eTest extends MySqlCdcE2eTestBase {
 
     public MySql80E2eTest() {
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
index 8627befeeb..b77c81184e 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
@@ -51,6 +51,12 @@ import java.util.stream.Stream;
  */
 public abstract class MySqlCdcE2eTestBase extends E2eTestBase {
 
+    protected static boolean runTest() {
+        // TODO: modify the following condition after paimon-flink-cdc 
supports flink 2.0
+        String flinkVersion = System.getProperty("test.flink.main.version");
+        return flinkVersion.compareTo("2.0") < 0;
+    }
+
     private static final Logger LOG = 
LoggerFactory.getLogger(MySqlCdcE2eTestBase.class);
 
     private static final String USER = "paimonuser";
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
index 02246b8147..77549c0dbd 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
@@ -24,11 +24,13 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIf;
 
 import java.sql.Connection;
 import java.sql.Statement;
 
 /** E2e test for MySql CDC with computed column. */
+@EnabledIf("runTest")
 public class MySqlComputedColumnE2ETest extends MySqlCdcE2eTestBase {
 
     protected MySqlComputedColumnE2ETest() {
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
index e638ac82c7..8a9b6d4686 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
@@ -24,11 +24,13 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIf;
 
 import java.sql.Connection;
 import java.sql.Statement;
 
 /** E2e test for MySql CDC type convert tinyint(1) to tinyint. */
+@EnabledIf("runTest")
 public class MySqlTinyIntConvertE2ETest extends MySqlCdcE2eTestBase {
 
     protected MySqlTinyIntConvertE2ETest() {
diff --git a/paimon-flink/paimon-flink-2.0/pom.xml 
b/paimon-flink/paimon-flink-2.0/pom.xml
new file mode 100644
index 0000000000..b49745b244
--- /dev/null
+++ b/paimon-flink/paimon-flink-2.0/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.paimon</groupId>
+        <artifactId>paimon-flink</artifactId>
+        <version>1.1-SNAPSHOT</version>
+    </parent>
+
+    <packaging>jar</packaging>
+
+    <artifactId>paimon-flink-2.0</artifactId>
+    <name>Paimon : Flink : 2.0</name>
+
+    <properties>
+        <flink.version>2.0.0</flink.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-flink-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-paimon</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <includes combine.children="append">
+                                    
<include>org.apache.paimon:paimon-flink-common</include>
+                                </includes>
+                            </artifactSet>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/paimon-flink/paimon-flink-common/pom.xml 
b/paimon-flink/paimon-flink-common/pom.xml
index 50315cdb72..0f258a1c95 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -34,7 +34,7 @@ under the License.
     <name>Paimon : Flink : Common</name>
 
     <properties>
-        <flink.version>1.20.1</flink.version>
+        <flink.version>${paimon-flink-common.flink.version}</flink.version>
     </properties>
 
     <dependencies>
@@ -81,6 +81,20 @@ under the License.
           <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>${paimon-flinkx-common}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>${paimon-flinkx-common}</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
         <!-- lookup service -->
 
         <dependency>
@@ -218,6 +232,7 @@ under the License.
                                     
<include>org.apache.paimon:paimon-service-client</include>
                                     
<include>org.apache.paimon:paimon-service-runtime</include>
                                     
<include>org.apache.paimon:paimon-shade-netty-4</include>
+                                    
<include>org.apache.paimon:${paimon-flinkx-common}</include>
                                 </includes>
                             </artifactSet>
                         </configuration>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
index a30c0a39ab..cb69cce258 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
@@ -134,6 +134,7 @@ public class FormatCatalogTable implements CatalogTable {
                 null,
                 context.getObjectIdentifier(),
                 context.getCatalogTable(),
+                new HashMap<>(),
                 context.getConfiguration(),
                 context.getClassLoader(),
                 context.isTemporary());
@@ -144,6 +145,7 @@ public class FormatCatalogTable implements CatalogTable {
                 null,
                 context.getObjectIdentifier(),
                 context.getCatalogTable(),
+                new HashMap<>(),
                 context.getConfiguration(),
                 context.getClassLoader(),
                 context.isTemporary());
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
index 71298d4920..a2cfe7e41d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
@@ -23,7 +23,6 @@ import org.apache.paimon.factories.FactoryException;
 import org.apache.paimon.factories.FactoryUtil;
 import org.apache.paimon.options.CatalogOptions;
 
-import org.apache.flink.api.java.utils.MultipleParameterTool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,8 +78,7 @@ public interface ActionFactory extends Factory {
 
         LOG.info("{} job args: {}", actionFactory.identifier(), String.join(" 
", actionArgs));
 
-        MultipleParameterToolAdapter params =
-                new 
MultipleParameterToolAdapter(MultipleParameterTool.fromArgs(actionArgs));
+        MultipleParameterToolAdapter params = new 
MultipleParameterToolAdapter(actionArgs);
         if (params.has(HELP)) {
             actionFactory.printHelp();
             return Optional.empty();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java
index 87100ff0e3..fa56becbdc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java
@@ -54,7 +54,6 @@ import java.util.stream.StreamSupport;
 
 import static org.apache.flink.configuration.ConfigurationUtils.canBePrefixMap;
 import static 
org.apache.flink.configuration.ConfigurationUtils.filterPrefixMapKey;
-import static 
org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER;
 
 /** Utility for working with {@link Factory}s. */
 public final class FlinkFactoryUtil {
@@ -135,7 +134,6 @@ public final class FlinkFactoryUtil {
                             factoryClass.getName(),
                             foundFactories.stream()
                                     .map(Factory::factoryIdentifier)
-                                    .filter(identifier -> 
!DEFAULT_IDENTIFIER.equals(identifier))
                                     .distinct()
                                     .sorted()
                                     .collect(Collectors.joining("\n"))));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/QueryServiceProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/QueryServiceProcedure.java
index 1a1b34fe2b..53c6645c6e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/QueryServiceProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/QueryServiceProcedure.java
@@ -49,7 +49,7 @@ public class QueryServiceProcedure extends ProcedureBase {
                 @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
                 @ArgumentHint(name = "parallelism", type = 
@DataTypeHint("INT"))
             })
-    public String[] call(ProcedureContext procedureContext, String tableId, 
int parallelism)
+    public String[] call(ProcedureContext procedureContext, String tableId, 
Integer parallelism)
             throws Exception {
         Table table = catalog.getTable(Identifier.fromString(tableId));
         StreamExecutionEnvironment env = 
procedureContext.getExecutionEnvironment();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
index 00d527506c..dc37ba6996 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
@@ -23,6 +23,7 @@ import org.apache.paimon.service.ServiceManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 
+import org.apache.flink.api.connector.sink2.InitContext;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.connector.sink2.WriterInitContext;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogSinkFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogSinkFunction.java
index 3586d2a030..95efeba276 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogSinkFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogSinkFunction.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.table.sink.SinkRecord;
 
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
 
 /** Log {@link SinkFunction} with {@link WriteCallback}. */
 public interface LogSinkFunction extends SinkFunction<SinkRecord> {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index bd849f9d3e..5a142d51f2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index 87cc9b7f04..0892def58c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -171,11 +171,12 @@ public class FlinkCatalogTest {
         // TODO support change schema, modify it to createAnotherSchema
         ResolvedSchema resolvedSchema = this.createSchema();
         CatalogTable origin =
-                CatalogTable.of(
-                        
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
-                        "test comment",
-                        Collections.emptyList(),
-                        options);
+                CatalogTable.newBuilder()
+                        
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+                        .comment("test comment")
+                        .partitionKeys(Collections.emptyList())
+                        .options(options)
+                        .build();
         return new ResolvedCatalogTable(origin, resolvedSchema);
     }
 
@@ -183,33 +184,36 @@ public class FlinkCatalogTest {
         // TODO support change schema, modify it to createAnotherSchema
         ResolvedSchema resolvedSchema = this.createSchema();
         CatalogTable origin =
-                CatalogTable.of(
-                        
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
-                        "test comment",
-                        this.createPartitionKeys(),
-                        options);
+                CatalogTable.newBuilder()
+                        
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+                        .comment("test comment")
+                        .partitionKeys(this.createPartitionKeys())
+                        .options(options)
+                        .build();
         return new ResolvedCatalogTable(origin, resolvedSchema);
     }
 
     private CatalogTable createTable(Map<String, String> options) {
         ResolvedSchema resolvedSchema = this.createSchema();
         CatalogTable origin =
-                CatalogTable.of(
-                        
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
-                        "test comment",
-                        Collections.emptyList(),
-                        options);
+                CatalogTable.newBuilder()
+                        
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+                        .comment("test comment")
+                        .partitionKeys(Collections.emptyList())
+                        .options(options)
+                        .build();
         return new ResolvedCatalogTable(origin, resolvedSchema);
     }
 
     private CatalogTable createPartitionedTable(Map<String, String> options) {
         ResolvedSchema resolvedSchema = this.createSchema();
         CatalogTable origin =
-                CatalogTable.of(
-                        
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
-                        "test comment",
-                        this.createPartitionKeys(),
-                        options);
+                CatalogTable.newBuilder()
+                        
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+                        .comment("test comment")
+                        .partitionKeys(this.createPartitionKeys())
+                        .options(options)
+                        .build();
         return new ResolvedCatalogTable(origin, resolvedSchema);
     }
 
@@ -694,7 +698,12 @@ public class FlinkCatalogTest {
 
         CatalogTable catalogTable =
                 new ResolvedCatalogTable(
-                        CatalogTable.of(schema, "", Collections.emptyList(), 
new HashMap<>()),
+                        CatalogTable.newBuilder()
+                                .schema(schema)
+                                .comment("")
+                                .partitionKeys(Collections.emptyList())
+                                .options(new HashMap<>())
+                                .build(),
                         resolvedSchema);
 
         catalog.createDatabase(path1.getDatabaseName(), null, false);
@@ -740,7 +749,12 @@ public class FlinkCatalogTest {
         Map<String, String> options = new HashMap<>();
         CatalogTable catalogTable1 =
                 new ResolvedCatalogTable(
-                        CatalogTable.of(schema, "", Collections.emptyList(), 
options),
+                        CatalogTable.newBuilder()
+                                .schema(schema)
+                                .comment("")
+                                .partitionKeys(Collections.emptyList())
+                                .options(options)
+                                .build(),
                         resolvedSchema);
         catalog.createTable(path1, catalogTable1, false);
         CatalogBaseTable storedTable1 = catalog.getTable(path1);
@@ -749,7 +763,12 @@ public class FlinkCatalogTest {
         options.put(LOG_SYSTEM.key(), TESTING_LOG_STORE);
         CatalogTable catalogTable2 =
                 new ResolvedCatalogTable(
-                        CatalogTable.of(schema, "", Collections.emptyList(), 
options),
+                        CatalogTable.newBuilder()
+                                .schema(schema)
+                                .comment("")
+                                .partitionKeys(Collections.emptyList())
+                                .options(options)
+                                .build(),
                         resolvedSchema);
         catalog.createTable(path3, catalogTable2, false);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java
index b7da6cd958..7a71dbabfc 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java
@@ -132,11 +132,12 @@ public abstract class FlinkTestBase extends 
AbstractTestBase {
         ResolvedSchema resolvedSchema =
                 new ResolvedSchema(resolvedColumns, Collections.emptyList(), 
constraint);
         CatalogTable origin =
-                CatalogTable.of(
-                        
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
-                        "a comment",
-                        partitionKeys,
-                        options);
+                CatalogTable.newBuilder()
+                        
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+                        .comment("a comment")
+                        .partitionKeys(partitionKeys)
+                        .options(options)
+                        .build();
         return new ResolvedCatalogTable(origin, resolvedSchema);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PredicateConverterTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PredicateConverterTest.java
index 6d90a455bd..f27108d866 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PredicateConverterTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PredicateConverterTest.java
@@ -796,6 +796,6 @@ public class PredicateConverterTest {
     }
 
     private static CallExpression call(FunctionDefinition function, 
ResolvedExpression... args) {
-        return new CallExpression(function, Arrays.asList(args), 
DataTypes.BOOLEAN());
+        return new CallExpression(false, null, function, Arrays.asList(args), 
DataTypes.BOOLEAN());
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index a8e8332156..eafa3b6916 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -1008,7 +1008,7 @@ public class SchemaChangeITCase extends CatalogITCaseBase 
{
                                 + "  `ts` TIMESTAMP(3),\n"
                                 + "  `ee` VARCHAR(2147483647) METADATA,\n"
                                 + "  WATERMARK FOR `ts` AS `ts`\n"
-                                + ") ")
+                                + ")")
                 .doesNotContain("schema");
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java
index 5273e0c7db..326ed9b6a2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java
@@ -35,6 +35,7 @@ import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -255,7 +256,7 @@ public class 
UnawareBucketNewFilesCompactionCoordinatorOperatorTest {
                         new CommittableTypeInfo().createSerializer(new 
ExecutionConfig()),
                         new TupleTypeInfo<>(
                                         BasicTypeInfo.LONG_TYPE_INFO, new 
CompactionTaskTypeInfo())
-                                .createSerializer(new ExecutionConfig()));
+                                .createSerializer(new SerializerConfigImpl()));
         OneInputStreamOperatorTestHarness harness =
                 new OneInputStreamOperatorTestHarness(operator, 1, 1, 0);
         
harness.getStreamConfig().setupNetworkInputs(Preconditions.checkNotNull(serializer));
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java
index fc45eceb3f..08fe8cb238 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java
@@ -27,6 +27,7 @@ import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.event.WatermarkEvent;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
@@ -217,6 +218,9 @@ class LocalMergeOperatorTest {
         @Override
         public void emitRecordAttributes(RecordAttributes recordAttributes) {}
 
+        // @Override is skipped for compatibility with Flink 1.x.
+        public void emitWatermark(WatermarkEvent watermarkEvent) {}
+
         @Override
         public void collect(StreamRecord<InternalRow> record) {
             consumer.accept(record.getValue());
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
index a220d566ad..9d71f94a36 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
@@ -148,6 +148,27 @@ class PartitionMarkDoneTest extends TableTestBase {
             throw new UnsupportedOperationException();
         }
 
+        // @Override is skipped for compatibility with Flink 1.x.
+        public <K, V> BroadcastState<K, V> getBroadcastState(
+                org.apache.flink.api.common.state.v2.MapStateDescriptor<K, V> 
mapStateDescriptor)
+                throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        // @Override is skipped for compatibility with Flink 1.x.
+        public <S> org.apache.flink.api.common.state.v2.ListState<S> 
getListState(
+                org.apache.flink.api.common.state.v2.ListStateDescriptor<S> 
listStateDescriptor)
+                throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        // @Override is skipped for compatibility with Flink 1.x.
+        public <S> org.apache.flink.api.common.state.v2.ListState<S> 
getUnionListState(
+                org.apache.flink.api.common.state.v2.ListStateDescriptor<S> 
listStateDescriptor)
+                throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
         @Override
         public Set<String> getRegisteredStateNames() {
             throw new UnsupportedOperationException();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
index 6c4c7860e5..a8c1f9c9e5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
@@ -36,6 +36,7 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.event.WatermarkEvent;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.SourceOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -299,6 +300,9 @@ public class OperatorSourceTest {
 
                     @Override
                     public void emitRecordAttributes(RecordAttributes 
recordAttributes) {}
+
+                    // @Override is skipped for compatibility with Flink 1.x.
+                    public void emitWatermark(WatermarkEvent watermarkEvent) {}
                 };
 
         AtomicBoolean isRunning = new AtomicBoolean(true);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java
index 77b44d5b0e..a77d0fc870 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java
@@ -61,7 +61,7 @@ import java.util.Collections;
  * 
href="https://github.com/apache/flink/pull/12306/files#diff-bb7687690ffa79fd86950aa23171431fcf707246ca4620d79361a6612ba7b828";>Flink
  * PR that introduced this class</a>
  */
-public class TestingSourceOperator<T> extends SourceOperator<T, 
SimpleSourceSplit> {
+public class TestingSourceOperator<T> extends AbstractTestingSourceOperator<T, 
SimpleSourceSplit> {
 
     private static final long serialVersionUID = 1L;
 
diff --git a/paimon-flink/paimon-flink1-common/pom.xml 
b/paimon-flink/paimon-flink1-common/pom.xml
new file mode 100644
index 0000000000..6bd3e3c4e8
--- /dev/null
+++ b/paimon-flink/paimon-flink1-common/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.paimon</groupId>
+        <artifactId>paimon-flink</artifactId>
+        <version>1.1-SNAPSHOT</version>
+    </parent>
+
+    <packaging>jar</packaging>
+
+    <artifactId>paimon-flink1-common</artifactId>
+    <name>Paimon : Flink 1.x : Common</name>
+
+    <properties>
+        <flink.version>${paimon-flink-common.flink.version}</flink.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/functions/sink/legacy/SinkFunction.java
similarity index 70%
copy from 
paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
copy to 
paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/functions/sink/legacy/SinkFunction.java
index 732f1eee24..cd8fff09de 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/functions/sink/legacy/SinkFunction.java
@@ -16,17 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.tests.cdc;
-
-import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
+package org.apache.flink.streaming.api.functions.sink.legacy;
 
 /**
- * E2e tests for {@link 
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction} with MySQL
- * 5.7.
+ * The {@link org.apache.flink.streaming.api.functions.sink.SinkFunction} 
migrated from Flink 1.20
+ * to resolve compatibility issues with Flink 2.x.
  */
-public class MySql57E2eTest extends MySqlCdcE2eTestBase {
-
-    public MySql57E2eTest() {
-        super(MySqlVersion.V5_7);
-    }
-}
+public interface SinkFunction<T>
+        extends org.apache.flink.streaming.api.functions.sink.SinkFunction<T> 
{}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
similarity index 94%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
copy to 
paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
index 7755cf7b40..aee2bad58b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
@@ -27,8 +27,8 @@ public class MultipleParameterToolAdapter {
 
     private final MultipleParameterTool params;
 
-    public MultipleParameterToolAdapter(MultipleParameterTool params) {
-        this.params = params;
+    public MultipleParameterToolAdapter(String[] args) {
+        this.params = MultipleParameterTool.fromArgs(args);
     }
 
     public boolean has(String key) {
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
 
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java
similarity index 70%
copy from 
paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
copy to 
paimon-flink/paimon-flink1-common/src/test/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java
index 732f1eee24..569aef0bab 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
+++ 
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java
@@ -16,17 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.tests.cdc;
+package org.apache.flink.api.common.state.v2;
 
-import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
-
-/**
- * E2e tests for {@link 
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction} with MySQL
- * 5.7.
- */
-public class MySql57E2eTest extends MySqlCdcE2eTestBase {
-
-    public MySql57E2eTest() {
-        super(MySqlVersion.V5_7);
-    }
-}
+/** Helper class to resolve compatibility issue with Flink 2.x. */
+public class ListStateDescriptor<S> {}
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
 
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java
similarity index 70%
copy from 
paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
copy to 
paimon-flink/paimon-flink1-common/src/test/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java
index 732f1eee24..fde66eb91b 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
+++ 
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java
@@ -16,17 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.tests.cdc;
+package org.apache.flink.api.common.state.v2;
 
-import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
-
-/**
- * E2e tests for {@link 
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction} with MySQL
- * 5.7.
- */
-public class MySql57E2eTest extends MySqlCdcE2eTestBase {
-
-    public MySql57E2eTest() {
-        super(MySqlVersion.V5_7);
-    }
-}
+/** Helper class to resolve compatibility issue with Flink 2.x. */
+public class MapStateDescriptor<K, V> {}
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
 
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/flink/runtime/event/WatermarkEvent.java
similarity index 70%
copy from 
paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
copy to 
paimon-flink/paimon-flink1-common/src/test/java/org/apache/flink/runtime/event/WatermarkEvent.java
index 732f1eee24..da773e9890 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
+++ 
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/flink/runtime/event/WatermarkEvent.java
@@ -16,17 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.tests.cdc;
+package org.apache.flink.runtime.event;
 
-import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
-
-/**
- * E2e tests for {@link 
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction} with MySQL
- * 5.7.
- */
-public class MySql57E2eTest extends MySqlCdcE2eTestBase {
-
-    public MySql57E2eTest() {
-        super(MySqlVersion.V5_7);
-    }
-}
+/** Helper class to resolve compatibility issue with Flink 2.x. */
+public class WatermarkEvent {}
diff --git 
a/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
 
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
new file mode 100644
index 0000000000..d293d30057
--- /dev/null
+++ 
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.operator;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.operators.SourceOperator;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.function.FunctionWithException;
+
+/** Helper class to resolve the compatibility of {@link SourceOperator}'s 
constructor. */
+public abstract class AbstractTestingSourceOperator<T, S extends SourceSplit>
+        extends SourceOperator<T, S> {
+
+    public AbstractTestingSourceOperator(
+            FunctionWithException<SourceReaderContext, SourceReader<T, S>, 
Exception> readerFactory,
+            OperatorEventGateway operatorEventGateway,
+            SimpleVersionedSerializer<S> splitSerializer,
+            WatermarkStrategy<T> watermarkStrategy,
+            ProcessingTimeService timeService,
+            Configuration configuration,
+            String localHostname,
+            boolean emitProgressiveWatermarks,
+            StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
+        super(
+                readerFactory,
+                operatorEventGateway,
+                splitSerializer,
+                watermarkStrategy,
+                timeService,
+                configuration,
+                localHostname,
+                emitProgressiveWatermarks,
+                canEmitBatchOfRecords);
+    }
+}
diff --git a/paimon-flink/paimon-flink2-common/pom.xml 
b/paimon-flink/paimon-flink2-common/pom.xml
new file mode 100644
index 0000000000..7dc9043c44
--- /dev/null
+++ b/paimon-flink/paimon-flink2-common/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.paimon</groupId>
+        <artifactId>paimon-flink</artifactId>
+        <version>1.1-SNAPSHOT</version>
+    </parent>
+
+    <packaging>jar</packaging>
+
+    <artifactId>paimon-flink2-common</artifactId>
+    <name>Paimon : Flink 2.x : Common</name>
+
+    <properties>
+        <flink.version>${paimon-flink-common.flink.version}</flink.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
 
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
similarity index 92%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
rename to 
paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
index 7755cf7b40..4fb80d63b5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
+++ 
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.action;
 
-import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.util.MultipleParameterTool;
 
 import java.util.Collection;
 
@@ -27,8 +27,8 @@ public class MultipleParameterToolAdapter {
 
     private final MultipleParameterTool params;
 
-    public MultipleParameterToolAdapter(MultipleParameterTool params) {
-        this.params = params;
+    public MultipleParameterToolAdapter(String[] args) {
+        this.params = MultipleParameterTool.fromArgs(args);
     }
 
     public boolean has(String key) {
diff --git 
a/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
 
b/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
new file mode 100644
index 0000000000..a2f402a874
--- /dev/null
+++ 
b/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.operator;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.operators.SourceOperator;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.util.HashMap;
+
+/** Helper class to resolve the compatibility of {@link SourceOperator}'s 
constructor. */
+public abstract class AbstractTestingSourceOperator<T, S extends SourceSplit>
+        extends SourceOperator<T, S> {
+
+    public AbstractTestingSourceOperator(
+            FunctionWithException<SourceReaderContext, SourceReader<T, S>, 
Exception> readerFactory,
+            OperatorEventGateway operatorEventGateway,
+            SimpleVersionedSerializer<S> splitSerializer,
+            WatermarkStrategy<T> watermarkStrategy,
+            ProcessingTimeService timeService,
+            Configuration configuration,
+            String localHostname,
+            boolean emitProgressiveWatermarks,
+            StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
+
+        super(
+                null,
+                readerFactory,
+                operatorEventGateway,
+                splitSerializer,
+                watermarkStrategy,
+                timeService,
+                configuration,
+                localHostname,
+                emitProgressiveWatermarks,
+                canEmitBatchOfRecords,
+                new HashMap<>());
+    }
+}
diff --git a/paimon-flink/pom.xml b/paimon-flink/pom.xml
index f19818bca3..d7764630fd 100644
--- a/paimon-flink/pom.xml
+++ b/paimon-flink/pom.xml
@@ -34,15 +34,9 @@ under the License.
     <packaging>pom</packaging>
 
     <modules>
+        <module>${paimon-flinkx-common}</module>
         <module>paimon-flink-common</module>
-        <module>paimon-flink-1.15</module>
-        <module>paimon-flink-1.16</module>
-        <module>paimon-flink-1.17</module>
-        <module>paimon-flink-1.18</module>
-        <module>paimon-flink-1.19</module>
-        <module>paimon-flink-1.20</module>
         <module>paimon-flink-action</module>
-        <module>paimon-flink-cdc</module>
     </modules>
 
     <dependencies>
@@ -166,16 +160,6 @@ under the License.
     <profiles>
         <profile>
             <id>skip-paimon-flink-tests</id>
-            <modules>
-                <module>paimon-flink-common</module>
-                <module>paimon-flink-1.15</module>
-                <module>paimon-flink-1.16</module>
-                <module>paimon-flink-1.17</module>
-                <module>paimon-flink-1.18</module>
-                <module>paimon-flink-1.19</module>
-                <module>paimon-flink-1.20</module>
-                <module>paimon-flink-cdc</module>
-            </modules>
             <build>
                 <plugins>
                     <plugin>
diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml 
b/paimon-hive/paimon-hive-connector-common/pom.xml
index 43fd4d538c..2edd5d459e 100644
--- a/paimon-hive/paimon-hive-connector-common/pom.xml
+++ b/paimon-hive/paimon-hive-connector-common/pom.xml
@@ -119,7 +119,7 @@ under the License.
         <dependency>
             <groupId>org.apache.flink</groupId>
             
<artifactId>flink-connector-hive_${flink.scala.binary.version}</artifactId>
-            <version>${test.flink.version}</version>
+            <version>${test.flink.connector.hive.version}</version>
             <scope>test</scope>
         </dependency>
 
diff --git a/pom.xml b/pom.xml
index 734ef6f4cd..f54ea0450a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,6 +82,9 @@ under the License.
         <antlr4.version>4.9.3</antlr4.version>
         <hadoop.version>2.8.5</hadoop.version>
         <scala.binary.version>2.12</scala.binary.version>
+        <!-- TODO: Update default Flink to 2.x after Paimon updates default 
Java to 11 -->
+        <paimon-flinkx-common>paimon-flink1-common</paimon-flinkx-common>
+        
<paimon-flink-common.flink.version>1.20.1</paimon-flink-common.flink.version>
         <flink.scala.binary.version>2.12</flink.scala.binary.version>
         <scala212.version>2.12.15</scala212.version>
         <scala213.version>2.13.14</scala213.version>
@@ -114,6 +117,7 @@ under the License.
         <test.flink.main.version>1.20</test.flink.main.version>
         <test.flink.version>1.20.1</test.flink.version>
         
<test.flink.connector.kafka.version>3.4.0-1.20</test.flink.connector.kafka.version>
+        
<test.flink.connector.hive.version>1.20.0</test.flink.connector.hive.version>
         
<test.mysql.connector.java.version>8.0.27</test.mysql.connector.java.version>
 
         <!-- spark profile properties-->
@@ -405,6 +409,49 @@ under the License.
                 </property>
             </activation>
         </profile>
+
+        <profile>
+            <id>flink1</id>
+            <properties>
+                
<paimon-flinkx-common>paimon-flink1-common</paimon-flinkx-common>
+                
<paimon-flink-common.flink.version>1.20.1</paimon-flink-common.flink.version>
+                <test.flink.main.version>1.20</test.flink.main.version>
+                <test.flink.version>1.20.1</test.flink.version>
+            </properties>
+            <modules>
+                <module>paimon-flink/paimon-flink-1.15</module>
+                <module>paimon-flink/paimon-flink-1.16</module>
+                <module>paimon-flink/paimon-flink-1.17</module>
+                <module>paimon-flink/paimon-flink-1.18</module>
+                <module>paimon-flink/paimon-flink-1.19</module>
+                <module>paimon-flink/paimon-flink-1.20</module>
+                <module>paimon-flink/paimon-flink-cdc</module>
+            </modules>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+                <property>
+                    <name>flink1</name>
+                </property>
+            </activation>
+        </profile>
+
+        <profile>
+            <id>flink2</id>
+            <properties>
+                
<paimon-flinkx-common>paimon-flink2-common</paimon-flinkx-common>
+                
<paimon-flink-common.flink.version>2.0.0</paimon-flink-common.flink.version>
+                <test.flink.main.version>2.0</test.flink.main.version>
+                <test.flink.version>2.0.0</test.flink.version>
+            </properties>
+            <modules>
+                <module>paimon-flink/paimon-flink-2.0</module>
+            </modules>
+            <activation>
+                <property>
+                    <name>flink2</name>
+                </property>
+            </activation>
+        </profile>
     </profiles>
 
     <build>

Reply via email to