This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 49b43321a3 [flink] Bump Flink version to 2.2 (#6775)
49b43321a3 is described below

commit 49b43321a3eb3d954a0405fee5f910ab75997bd8
Author: Kerwin <[email protected]>
AuthorDate: Thu Dec 11 18:10:46 2025 +0800

    [flink] Bump Flink version to 2.2 (#6775)
---
 .github/workflows/e2e-tests-flink-2.x-jdk11.yml    |  2 +-
 .github/workflows/utitcase-flink-2.x-jdk11.yml     |  2 +-
 docs/content/flink/quick-start.md                  | 36 +++++++------
 docs/content/project/download.md                   |  2 +
 paimon-e2e-tests/pom.xml                           |  9 ++++
 .../java/org/apache/paimon/tests/E2eTestBase.java  | 51 ++++++++++++++++++
 .../java/org/apache/paimon/tests/TypeE2eTest.java  | 17 ++++++
 .../test/resources-filtered/docker-compose.yaml    |  2 +
 .../pom.xml                                        | 36 +++++++++----
 .../org/apache/paimon/flink/FlinkCatalogTest.java  | 30 ++---------
 .../predicate/SimpleSqlPredicateConvertorTest.java | 63 ++++++++++++----------
 .../source/operator/TestingSourceOperator.java     | 14 +----
 paimon-flink/paimon-flink1-common/pom.xml          |  7 +++
 .../apache/paimon/flink/FlinkCatalogTestBase.java  | 53 ++++++++++++++++++
 .../operator/AbstractTestingSourceOperator.java    | 14 +++++
 .../apache/paimon/flink/FlinkCatalogTestBase.java  | 55 +++++++++++++++++++
 .../operator/AbstractTestingSourceOperator.java    | 18 ++++++-
 pom.xml                                            |  7 +--
 tools/releasing/deploy_staging_jars_for_jdk11.sh   |  4 +-
 19 files changed, 322 insertions(+), 100 deletions(-)

diff --git a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml 
b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
index 5f97778467..22b6859d24 100644
--- a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
+++ b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
@@ -39,7 +39,7 @@ jobs:
       fail-fast: true
       matrix:
         # Last element should be the current default flink version
-        flink_version: [ '2.0', '2.1' ]
+        flink_version: [ '2.0', '2.1', '2.2' ]
     steps:
       - name: Checkout code
         uses: actions/checkout@v4
diff --git a/.github/workflows/utitcase-flink-2.x-jdk11.yml 
b/.github/workflows/utitcase-flink-2.x-jdk11.yml
index 8e90e679cb..95b345fa7a 100644
--- a/.github/workflows/utitcase-flink-2.x-jdk11.yml
+++ b/.github/workflows/utitcase-flink-2.x-jdk11.yml
@@ -56,7 +56,7 @@ jobs:
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
           test_modules=""
-          for suffix in 2.0 2.1 common; do
+          for suffix in 2.0 2.1 2.2 common; do
           test_modules+="org.apache.paimon:paimon-flink-${suffix},"
           done
           test_modules="${test_modules%,}"
diff --git a/docs/content/flink/quick-start.md 
b/docs/content/flink/quick-start.md
index f00139d348..0317ce1bad 100644
--- a/docs/content/flink/quick-start.md
+++ b/docs/content/flink/quick-start.md
@@ -30,35 +30,37 @@ This documentation is a guide for using Paimon in Flink.
 
 ## Jars
 
-Paimon currently supports Flink 2.0, 1.20, 1.19, 1.18, 1.17, 1.16. We 
recommend the latest Flink version for a better experience.
+Paimon currently supports Flink 2.2, 2.1, 2.0, 1.20, 1.19, 1.18, 1.17, 1.16. 
We recommend the latest Flink version for a better experience.
 
 Download the jar file with corresponding version.
 
 > Currently, paimon provides two types jar: one of which(the bundled jar) is 
 > used for read/write data, and the other(action jar) for operations such as 
 > manually compaction,
 {{< stable >}}
 
-| Version     | Type        | Jar                                              
                                                                                
                                             |
-|-------------|-------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| Flink 2.0   | Bundled Jar | [paimon-flink-2.0-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-2.0/{{<
 version >}}/paimon-flink-2.0-{{< version >}}.jar)          |
-| Flink 1.20  | Bundled Jar | [paimon-flink-1.20-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.20/{{<
 version >}}/paimon-flink-1.20-{{< version >}}.jar)       |
-| Flink 1.19  | Bundled Jar | [paimon-flink-1.19-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.19/{{<
 version >}}/paimon-flink-1.19-{{< version >}}.jar)       |
-| Flink 1.18  | Bundled Jar | [paimon-flink-1.18-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.18/{{<
 version >}}/paimon-flink-1.18-{{< version >}}.jar)       |
-| Flink 1.17  | Bundled Jar | [paimon-flink-1.17-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.17/{{<
 version >}}/paimon-flink-1.17-{{< version >}}.jar)       |
-| Flink 1.16  | Bundled Jar | [paimon-flink-1.16-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.16/{{<
 version >}}/paimon-flink-1.16-{{< version >}}.jar)       |
+| Version      | Type        | Jar                                             
                                                                                
                                              |
+|--------------|-------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Flink 2.2    | Bundled Jar | [paimon-flink-2.2-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-2.2/{{<
 version >}}/paimon-flink-2.2-{{< version >}}.jar)          |
+| Flink 2.1    | Bundled Jar | [paimon-flink-2.1-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-2.1/{{<
 version >}}/paimon-flink-2.1-{{< version >}}.jar)          |
+| Flink 2.0    | Bundled Jar | [paimon-flink-2.0-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-2.0/{{<
 version >}}/paimon-flink-2.0-{{< version >}}.jar)          |
+| Flink 1.20   | Bundled Jar | [paimon-flink-1.20-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.20/{{<
 version >}}/paimon-flink-1.20-{{< version >}}.jar)       |
+| Flink 1.19   | Bundled Jar | [paimon-flink-1.19-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.19/{{<
 version >}}/paimon-flink-1.19-{{< version >}}.jar)       |
+| Flink 1.18   | Bundled Jar | [paimon-flink-1.18-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.18/{{<
 version >}}/paimon-flink-1.18-{{< version >}}.jar)       |
+| Flink 1.17   | Bundled Jar | [paimon-flink-1.17-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.17/{{<
 version >}}/paimon-flink-1.17-{{< version >}}.jar)       |
+| Flink 1.16   | Bundled Jar | [paimon-flink-1.16-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.16/{{<
 version >}}/paimon-flink-1.16-{{< version >}}.jar)       |
 | Flink Action | Action Jar  | [paimon-flink-action-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-action/{{<
 version >}}/paimon-flink-action-{{< version >}}.jar) |
 
 {{< /stable >}}
 
 {{< unstable >}}
 
-| Version     | Type        | Jar                                              
                                                                                
         |
-|-------------|-------------|-------------------------------------------------------------------------------------------------------------------------------------------|
-| Flink 2.0   | Bundled Jar | Not yet released                                 
                                                                                
         |
-| Flink 1.20  | Bundled Jar | [paimon-flink-1.20-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.20/{{<
 version >}}/)     |
-| Flink 1.19  | Bundled Jar | [paimon-flink-1.19-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.19/{{<
 version >}}/)     |
-| Flink 1.18  | Bundled Jar | [paimon-flink-1.18-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.18/{{<
 version >}}/)     |
-| Flink 1.17  | Bundled Jar | [paimon-flink-1.17-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.17/{{<
 version >}}/)     |
-| Flink 1.16  | Bundled Jar | [paimon-flink-1.16-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.16/{{<
 version >}}/)     |
+| Version      | Type        | Jar                                             
                                                                                
          |
+|--------------|-------------|-------------------------------------------------------------------------------------------------------------------------------------------|
+| Flink 2.0    | Bundled Jar | Not yet released                                
                                                                                
          |
+| Flink 1.20   | Bundled Jar | [paimon-flink-1.20-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.20/{{<
 version >}}/)     |
+| Flink 1.19   | Bundled Jar | [paimon-flink-1.19-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.19/{{<
 version >}}/)     |
+| Flink 1.18   | Bundled Jar | [paimon-flink-1.18-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.18/{{<
 version >}}/)     |
+| Flink 1.17   | Bundled Jar | [paimon-flink-1.17-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.17/{{<
 version >}}/)     |
+| Flink 1.16   | Bundled Jar | [paimon-flink-1.16-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.16/{{<
 version >}}/)     |
 | Flink Action | Action Jar  | [paimon-flink-action-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-action/{{<
 version >}}/) |
 
 {{< /unstable >}}
diff --git a/docs/content/project/download.md b/docs/content/project/download.md
index 51c6f7ac22..e4aec5d8b6 100644
--- a/docs/content/project/download.md
+++ b/docs/content/project/download.md
@@ -59,6 +59,8 @@ This documentation is a guide for downloading Paimon Jars.
 
 | Version          | Jar                                                       
                                                                                
                                                                              |
 
|------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Flink 2.2        | [paimon-flink-2.2-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-2.2/{{<
 version >}}/paimon-flink-2.2-{{< version >}}.jar)                              
                      |
+| Flink 2.1        | [paimon-flink-2.1-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-2.1/{{<
 version >}}/paimon-flink-2.1-{{< version >}}.jar)                              
                      |
 | Flink 2.0        | [paimon-flink-2.0-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-2.0/{{<
 version >}}/paimon-flink-2.0-{{< version >}}.jar)                              
                      |
 | Flink 1.20       | [paimon-flink-1.20-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.20/{{<
 version >}}/paimon-flink-1.20-{{< version >}}.jar)                             
                    |
 | Flink 1.19       | [paimon-flink-1.19-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.19/{{<
 version >}}/paimon-flink-1.19-{{< version >}}.jar)                             
                    |
diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml
index 973d9357a2..d3246d8154 100644
--- a/paimon-e2e-tests/pom.xml
+++ b/paimon-e2e-tests/pom.xml
@@ -295,6 +295,15 @@ under the License.
     <profiles>
         <!-- Activate these profiles with -Pflink-x.xx to build and test 
against different Flink versions -->
 
+        <profile>
+            <id>flink-2.1</id>
+            <properties>
+                <test.flink.main.version>2.1</test.flink.main.version>
+                <test.flink.version>2.1.0</test.flink.version>
+                
<test.flink.connector.kafka.version>4.0.0-2.0</test.flink.connector.kafka.version>
+            </properties>
+        </profile>
+
         <profile>
             <id>flink-2.0</id>
             <properties>
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
index 44d23a6b36..048356933d 100644
--- a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
+++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
@@ -80,10 +80,13 @@ public abstract class E2eTestBase {
     private static final int CHECK_RESULT_INTERVAL_MS = 1000;
     private static final int CHECK_RESULT_RETRIES = 60;
     private final List<String> currentResults = new ArrayList<>();
+    private static final Pattern FLINK_VERSION_PATTERN =
+            Pattern.compile("Version:\\s*([0-9]+(?:\\.[0-9]+){1,2})");
 
     protected Network network;
     protected ComposeContainer environment;
     protected ContainerState jobManager;
+    protected String flinkVersion;
 
     @BeforeEach
     public void before() throws Exception {
@@ -146,6 +149,11 @@ public abstract class E2eTestBase {
 
         jobManager = 
environment.getContainerByServiceName("jobmanager-1").get();
         jobManager.execInContainer("chown", "-R", "flink:flink", 
TEST_DATA_DIR);
+
+        String flinkVersionCliOut =
+                jobManager.execInContainer("bash", "-c", "flink 
--version").getStdout();
+        Matcher flinkVersionMatcher = 
FLINK_VERSION_PATTERN.matcher(flinkVersionCliOut);
+        flinkVersion = flinkVersionMatcher.find() ? 
flinkVersionMatcher.group(1) : null;
     }
 
     private WaitStrategy buildWaitStrategy(String regex, int times) {
@@ -370,6 +378,49 @@ public abstract class E2eTestBase {
                         + actual);
     }
 
+    protected boolean isFlinkVersionAtLeast(String compareToVersion) {
+        if (flinkVersion == null || compareToVersion == null) {
+            return false;
+        }
+
+        int[] current = safelyParseVersion(flinkVersion);
+        int[] target = safelyParseVersion(compareToVersion);
+
+        for (int i = 0; i < 3; i++) {
+            if (current[i] < target[i]) {
+                return false;
+            }
+            if (current[i] > target[i]) {
+                return true;
+            }
+        }
+        return true; // equal
+    }
+
+    private int[] safelyParseVersion(String v) {
+        // default: [0, 0, 0]
+        int[] nums = new int[] {0, 0, 0};
+
+        if (v == null || v.isEmpty()) {
+            return nums;
+        }
+
+        // keep only "number" parts before possible suffix (e.g., rc1 / 
SNAPSHOT)
+        String[] parts = v.split("\\.");
+
+        for (int i = 0; i < Math.min(parts.length, 3); i++) {
+            String numeric = parts[i].replaceAll("[^0-9]", ""); // strip 
non-digit
+            if (!numeric.isEmpty()) {
+                try {
+                    nums[i] = Integer.parseInt(numeric);
+                } catch (NumberFormatException ignored) {
+                    nums[i] = 0;
+                }
+            }
+        }
+        return nums;
+    }
+
     private class LogConsumer extends Slf4jLogConsumer {
 
         public LogConsumer(Logger logger) {
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/TypeE2eTest.java 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/TypeE2eTest.java
index bbd1b22aca..33004cb87b 100644
--- a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/TypeE2eTest.java
+++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/TypeE2eTest.java
@@ -95,6 +95,14 @@ public class TypeE2eTest extends E2eTestBase {
                 "true, 1, 10, 100, 1000, 1.1, 1.11, 12.456, "
                         + "123456789123456789.12345678, hi, hello, 
table桌子store商店, [116], "
                         + "2022-04-28, 2022-04-28T15:35:45.123, [hi, hello, 
null, test], +I[1, 10, 测试]";
+
+        if (isFlinkVersionAtLeast("2.2.0")) {
+            // https://issues.apache.org/jira/browse/FLINK-38062 ENCODE 
function behaves wrong
+            expected =
+                    "true, 1, 10, 100, 1000, 1.1, 1.11, 12.456, "
+                            + "123456789123456789.12345678, hi, hello, 
table桌子store商店, [116, 97, 98, 108, 101, -26, -95, -116, -27, -83, -112, 115, 
116, 111, 114, 101, -27, -107, -122, -27, -70, -105], "
+                            + "2022-04-28, 2022-04-28T15:35:45.123, [hi, 
hello, null, test], +I[1, 10, 测试]";
+        }
         checkResult(
                 expected,
                 "null, null, null, null, null, null, null, null, null, "
@@ -182,6 +190,15 @@ public class TypeE2eTest extends E2eTestBase {
                         + "123456789123456789.12345678, hi, hello, 
table桌子store商店, [116], "
                         + "2022-04-28, 2022-04-28T15:35:45.123, [hi, hello, 
null, test], +I[1, 10, 测试], "
                         + "{hi=1, test=3, hello=null}";
+
+        if (isFlinkVersionAtLeast("2.2.0")) {
+            // https://issues.apache.org/jira/browse/FLINK-38062 ENCODE 
function behaves wrong
+            expected =
+                    "1, true, 1, 10, 100, 1000, 1.1, 1.11, 12.456, "
+                            + "123456789123456789.12345678, hi, hello, 
table桌子store商店, [116, 97, 98, 108, 101, -26, -95, -116, -27, -83, -112, 115, 
116, 111, 114, 101, -27, -107, -122, -27, -70, -105], "
+                            + "2022-04-28, 2022-04-28T15:35:45.123, [hi, 
hello, null, test], +I[1, 10, 测试], "
+                            + "{hi=1, test=3, hello=null}";
+        }
         checkResult(
                 expected,
                 "2, null, null, null, null, null, null, null, null, null, "
diff --git a/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml 
b/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
index 3909b54b41..c9d579fb56 100644
--- a/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
+++ b/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
@@ -25,6 +25,7 @@ services:
   # ----------------------------------------
 
   jobmanager:
+    user: root
     image: apache/flink:${test.flink.version}-${test.java.version}
     volumes:
       - testdata:/test-data
@@ -49,6 +50,7 @@ services:
       - "8081"
 
   taskmanager:
+    user: root
     image: apache/flink:${test.flink.version}-${test.java.version}
     volumes:
       - testdata:/test-data
diff --git a/paimon-flink/paimon-flink1-common/pom.xml 
b/paimon-flink/paimon-flink-2.2/pom.xml
similarity index 65%
copy from paimon-flink/paimon-flink1-common/pom.xml
copy to paimon-flink/paimon-flink-2.2/pom.xml
index df45d547d0..dc3dd75fb3 100644
--- a/paimon-flink/paimon-flink1-common/pom.xml
+++ b/paimon-flink/paimon-flink-2.2/pom.xml
@@ -21,6 +21,7 @@ under the License.
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
+
     <parent>
         <groupId>org.apache.paimon</groupId>
         <artifactId>paimon-flink</artifactId>
@@ -29,26 +30,33 @@ under the License.
 
     <packaging>jar</packaging>
 
-    <artifactId>paimon-flink1-common</artifactId>
-    <name>Paimon : Flink 1.x : Common</name>
+    <artifactId>paimon-flink-2.2</artifactId>
+    <name>Paimon : Flink : 2.2</name>
 
     <properties>
-        <flink.version>1.20.1</flink.version>
+        <flink.version>2.2.0</flink.version>
     </properties>
 
     <dependencies>
         <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-java</artifactId>
-            <version>${flink.version}</version>
-            <scope>provided</scope>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-flink-common</artifactId>
+            <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-flink2-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-streaming-java</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-common</artifactId>
@@ -61,12 +69,22 @@ under the License.
         <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-jar-plugin</artifactId>
+                <artifactId>maven-shade-plugin</artifactId>
                 <executions>
                     <execution>
+                        <id>shade-paimon</id>
+                        <phase>package</phase>
                         <goals>
-                            <goal>test-jar</goal>
+                            <goal>shade</goal>
                         </goals>
+                        <configuration>
+                            <artifactSet>
+                                <includes combine.children="append">
+                                    
<include>org.apache.paimon:paimon-flink-common</include>
+                                    
<include>org.apache.paimon:paimon-flink2-common</include>
+                                </includes>
+                            </artifactSet>
+                        </configuration>
                     </execution>
                 </executions>
             </plugin>
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index 0892def58c..7494f91205 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -45,7 +45,6 @@ import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.IntervalFreshness;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
@@ -109,7 +108,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /** Test for {@link FlinkCatalog}. */
-public class FlinkCatalogTest {
+public class FlinkCatalogTest extends FlinkCatalogTestBase {
 
     private static final String TESTING_LOG_STORE = "testing";
 
@@ -122,10 +121,6 @@ public class FlinkCatalogTest {
     private final ObjectPath nonExistDbPath = 
ObjectPath.fromString("non.exist");
     private final ObjectPath nonExistObjectPath = 
ObjectPath.fromString("db1.nonexist");
 
-    private static final String DEFINITION_QUERY = "SELECT id, region, county 
FROM T";
-
-    private static final IntervalFreshness FRESHNESS = 
IntervalFreshness.ofMinute("3");
-
     private String warehouse;
     private Catalog catalog;
 
@@ -217,29 +212,13 @@ public class FlinkCatalogTest {
         return new ResolvedCatalogTable(origin, resolvedSchema);
     }
 
-    private CatalogMaterializedTable createMaterializedTable(Map<String, 
String> options) {
-        ResolvedSchema resolvedSchema = this.createSchema();
-        return new ResolvedCatalogMaterializedTable(
-                CatalogMaterializedTable.newBuilder()
-                        
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
-                        .comment("test materialized table comment")
-                        .partitionKeys(Collections.emptyList())
-                        .options(options)
-                        .definitionQuery(DEFINITION_QUERY)
-                        .freshness(FRESHNESS)
-                        
.logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC)
-                        
.refreshMode(CatalogMaterializedTable.RefreshMode.CONTINUOUS)
-                        
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
-                        .build(),
-                resolvedSchema);
-    }
-
     @ParameterizedTest
     @MethodSource("batchOptionProvider")
     public void testCreateAndGetCatalogMaterializedTable(Map<String, String> 
options)
             throws Exception {
         ObjectPath tablePath = path1;
-        CatalogMaterializedTable materializedTable = 
createMaterializedTable(options);
+        CatalogMaterializedTable materializedTable =
+                createMaterializedTable(this.createSchema(), options);
         catalog.createDatabase(tablePath.getDatabaseName(), null, false);
         // test create materialized table
         catalog.createTable(tablePath, materializedTable, true);
@@ -276,7 +255,8 @@ public class FlinkCatalogTest {
     @MethodSource("batchOptionProvider")
     public void testAlterMaterializedTable(Map<String, String> options) throws 
Exception {
         ObjectPath tablePath = path1;
-        CatalogMaterializedTable materializedTable = 
createMaterializedTable(options);
+        CatalogMaterializedTable materializedTable =
+                createMaterializedTable(this.createSchema(), options);
         catalog.createDatabase(tablePath.getDatabaseName(), null, false);
         catalog.createTable(tablePath, materializedTable, true);
         TestRefreshHandler refreshHandler = new TestRefreshHandler("jobID: 
xxx, clusterId: yyy");
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertorTest.java
index a68d15e746..189b09df76 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertorTest.java
@@ -26,12 +26,14 @@ import org.apache.paimon.types.RowType;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
 
-import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
 /** test for {@link SimpleSqlPredicateConvertor} . */
 class SimpleSqlPredicateConvertorTest {
     RowType rowType;
@@ -58,7 +60,7 @@ class SimpleSqlPredicateConvertorTest {
             //
             // 
org.apache.calcite.sql.parser.ImmutableSqlParser$Config.withUnquotedCasing(org.apache.calcite.avatica.util.Casing)
             Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate("`hour` ='1'");
-            Assertions.assertThat(predicate)
+            assertThat(predicate)
                     .isEqualTo(
                             predicateBuilder.equal(
                                     predicateBuilder.indexOf("hour"),
@@ -68,7 +70,7 @@ class SimpleSqlPredicateConvertorTest {
         {
             Predicate predicate =
                     simpleSqlPredicateConvertor.convertSqlToPredicate(" 
'2024-05-25' = c");
-            Assertions.assertThat(predicate)
+            assertThat(predicate)
                     
.isEqualTo(predicateBuilder.equal(predicateBuilder.indexOf("c"), 19868));
         }
     }
@@ -77,14 +79,14 @@ class SimpleSqlPredicateConvertorTest {
     public void testNotEqual() throws Exception {
         {
             Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate("a <>'1'");
-            Assertions.assertThat(predicate)
+            assertThat(predicate)
                     
.isEqualTo(predicateBuilder.notEqual(predicateBuilder.indexOf("a"), 1));
         }
 
         {
             Predicate predicate =
                     
simpleSqlPredicateConvertor.convertSqlToPredicate("'2024-05-25' <> c");
-            Assertions.assertThat(predicate)
+            assertThat(predicate)
                     
.isEqualTo(predicateBuilder.notEqual(predicateBuilder.indexOf("c"), 19868));
         }
     }
@@ -93,14 +95,14 @@ class SimpleSqlPredicateConvertorTest {
     public void testLessThan() throws Exception {
         {
             Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate("a <'1'");
-            Assertions.assertThat(predicate)
+            assertThat(predicate)
                     
.isEqualTo(predicateBuilder.lessThan(predicateBuilder.indexOf("a"), 1));
         }
 
         {
             Predicate predicate =
                     
simpleSqlPredicateConvertor.convertSqlToPredicate("'2024-05-25' <c ");
-            Assertions.assertThat(predicate)
+            assertThat(predicate)
                     
.isEqualTo(predicateBuilder.greaterThan(predicateBuilder.indexOf("c"), 19868));
         }
     }
@@ -109,14 +111,14 @@ class SimpleSqlPredicateConvertorTest {
     public void testLessThanOrEqual() throws Exception {
         {
             Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate("a <='1'");
-            Assertions.assertThat(predicate)
+            assertThat(predicate)
                     
.isEqualTo(predicateBuilder.lessOrEqual(predicateBuilder.indexOf("a"), 1));
         }
 
         {
             Predicate predicate =
                     
simpleSqlPredicateConvertor.convertSqlToPredicate("'2024-05-25' <= c");
-            Assertions.assertThat(predicate)
+            assertThat(predicate)
                     .isEqualTo(
                             
predicateBuilder.greaterOrEqual(predicateBuilder.indexOf("c"), 19868));
         }
@@ -126,14 +128,14 @@ class SimpleSqlPredicateConvertorTest {
     public void testGreatThan() throws Exception {
         {
             Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate("a >'1'");
-            Assertions.assertThat(predicate)
+            assertThat(predicate)
                     
.isEqualTo(predicateBuilder.greaterThan(predicateBuilder.indexOf("a"), 1));
         }
 
         {
             Predicate predicate =
                     
simpleSqlPredicateConvertor.convertSqlToPredicate("'2024-05-25' > c");
-            Assertions.assertThat(predicate)
+            assertThat(predicate)
                     
.isEqualTo(predicateBuilder.lessThan(predicateBuilder.indexOf("c"), 19868));
         }
     }
@@ -142,14 +144,14 @@ class SimpleSqlPredicateConvertorTest {
     public void testGreatEqual() throws Exception {
         {
             Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate("a >='1'");
-            Assertions.assertThat(predicate)
+            assertThat(predicate)
                     
.isEqualTo(predicateBuilder.greaterOrEqual(predicateBuilder.indexOf("a"), 1));
         }
 
         {
             Predicate predicate =
                     simpleSqlPredicateConvertor.convertSqlToPredicate(" 
'2024-05-25' >= c");
-            Assertions.assertThat(predicate)
+            assertThat(predicate)
                     
.isEqualTo(predicateBuilder.lessOrEqual(predicateBuilder.indexOf("c"), 19868));
         }
     }
@@ -158,7 +160,7 @@ class SimpleSqlPredicateConvertorTest {
     public void testIN() throws Exception {
         Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate("a in ('1','2')");
         List<Object> elements = Lists.newArrayList(1, 2);
-        Assertions.assertThat(predicate)
+        assertThat(predicate)
                 .isEqualTo(predicateBuilder.in(predicateBuilder.indexOf("a"), 
elements));
     }
 
@@ -167,7 +169,7 @@ class SimpleSqlPredicateConvertorTest {
         Predicate predicate =
                 simpleSqlPredicateConvertor.convertSqlToPredicate("a not in 
('1','2')");
         List<Object> elements = Lists.newArrayList(1, 2);
-        Assertions.assertThat(predicate)
+        assertThat(predicate)
                 .isEqualTo(
                         predicateBuilder
                                 .in(predicateBuilder.indexOf("a"), elements)
@@ -178,15 +180,13 @@ class SimpleSqlPredicateConvertorTest {
     @Test
     public void testIsNull() throws Exception {
         Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate("a is null ");
-        Assertions.assertThat(predicate)
-                
.isEqualTo(predicateBuilder.isNull(predicateBuilder.indexOf("a")));
+        
assertThat(predicate).isEqualTo(predicateBuilder.isNull(predicateBuilder.indexOf("a")));
     }
 
     @Test
     public void testIsNotNull() throws Exception {
         Predicate predicate = 
simpleSqlPredicateConvertor.convertSqlToPredicate("a is not  null ");
-        Assertions.assertThat(predicate)
-                
.isEqualTo(predicateBuilder.isNotNull(predicateBuilder.indexOf("a")));
+        
assertThat(predicate).isEqualTo(predicateBuilder.isNotNull(predicateBuilder.indexOf("a")));
     }
 
     @Test
@@ -197,7 +197,7 @@ class SimpleSqlPredicateConvertorTest {
                 PredicateBuilder.and(
                         
predicateBuilder.isNotNull(predicateBuilder.indexOf("a")),
                         
predicateBuilder.isNull(predicateBuilder.indexOf("c")));
-        Assertions.assertThat(actual).isEqualTo(expected);
+        assertThat(actual).isEqualTo(expected);
     }
 
     @Test
@@ -208,34 +208,41 @@ class SimpleSqlPredicateConvertorTest {
                 PredicateBuilder.or(
                         
predicateBuilder.isNotNull(predicateBuilder.indexOf("a")),
                         
predicateBuilder.isNull(predicateBuilder.indexOf("c")));
-        Assertions.assertThat(actual).isEqualTo(expected);
+        assertThat(actual).isEqualTo(expected);
     }
 
     @Test
     public void testNOT() throws Exception {
         Predicate actual = 
simpleSqlPredicateConvertor.convertSqlToPredicate("not (a is null) ");
         Predicate expected = 
predicateBuilder.isNull(predicateBuilder.indexOf("a")).negate().get();
-        Assertions.assertThat(actual).isEqualTo(expected);
+        assertThat(actual).isEqualTo(expected);
     }
 
     @Test
     public void testFieldNoFound() {
-        Assertions.assertThatThrownBy(
-                        () -> 
simpleSqlPredicateConvertor.convertSqlToPredicate("f =1"))
+        assertThatThrownBy(() -> 
simpleSqlPredicateConvertor.convertSqlToPredicate("f =1"))
                 .hasMessage("Field `f` not found");
     }
 
     @Test
     public void testSqlNoSupport() {
         // function not supported
-        Assertions.assertThatThrownBy(
+        assertThatThrownBy(
                         () ->
                                 
simpleSqlPredicateConvertor.convertSqlToPredicate(
                                         "substring(f,0,1) =1"))
-                .hasMessage("SUBSTRING(`f` FROM 0 FOR 1) or 1 not been 
supported.");
+                .satisfiesAnyOf(
+                        // Legacy Calcite format: SUBSTRING(`f` FROM 0 FOR 1)
+                        e ->
+                                assertThat(e.getMessage())
+                                        .contains(
+                                                "SUBSTRING(`f` FROM 0 FOR 1) 
or 1 not been supported."),
+                        // Flink 2.2.0+ / newer Calcite format: SUBSTRING(`f`, 
0, 1)
+                        e ->
+                                assertThat(e.getMessage())
+                                        .contains("SUBSTRING(`f`, 0, 1) or 1 
not been supported."));
         // like not supported
-        Assertions.assertThatThrownBy(
-                        () -> 
simpleSqlPredicateConvertor.convertSqlToPredicate("b like 'x'"))
+        assertThatThrownBy(() -> 
simpleSqlPredicateConvertor.convertSqlToPredicate("b like 'x'"))
                 .hasMessage("LIKE not been supported.");
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java
index a77d0fc870..b60e63f2f3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java
@@ -40,13 +40,11 @@ import 
org.apache.flink.runtime.state.StateInitializationContextImpl;
 import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.streaming.api.operators.SourceOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.MockOutput;
 import org.apache.flink.streaming.util.MockStreamConfig;
-import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -65,9 +63,6 @@ public class TestingSourceOperator<T> extends 
AbstractTestingSourceOperator<T, S
 
     private static final long serialVersionUID = 1L;
 
-    private final int subtaskIndex;
-    private final int parallelism;
-
     public TestingSourceOperator(
             StreamOperatorParameters<T> parameters,
             SourceReader<T, SimpleSourceSplit> reader,
@@ -104,11 +99,11 @@ public class TestingSourceOperator<T> extends 
AbstractTestingSourceOperator<T, S
                 timeService,
                 new Configuration(),
                 "localhost",
+                subtaskIndex,
+                parallelism,
                 emitProgressiveWatermarks,
                 () -> false);
 
-        this.subtaskIndex = subtaskIndex;
-        this.parallelism = parallelism;
         this.metrics = 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
         initSourceMetricGroup();
 
@@ -122,11 +117,6 @@ public class TestingSourceOperator<T> extends 
AbstractTestingSourceOperator<T, S
         setup(parameters.getContainingTask(), parameters.getStreamConfig(), 
parameters.getOutput());
     }
 
-    @Override
-    public StreamingRuntimeContext getRuntimeContext() {
-        return new MockStreamingRuntimeContext(false, parallelism, 
subtaskIndex);
-    }
-
     // this is overridden to avoid complex mock injection through the 
"containingTask"
     @Override
     public ExecutionConfig getExecutionConfig() {
diff --git a/paimon-flink/paimon-flink1-common/pom.xml 
b/paimon-flink/paimon-flink1-common/pom.xml
index df45d547d0..74f5ed9eb9 100644
--- a/paimon-flink/paimon-flink1-common/pom.xml
+++ b/paimon-flink/paimon-flink1-common/pom.xml
@@ -55,6 +55,13 @@ under the License.
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTestBase.java
 
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTestBase.java
new file mode 100644
index 0000000000..77eb57c468
--- /dev/null
+++ 
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTestBase.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.IntervalFreshness;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+
+import java.util.Collections;
+import java.util.Map;
+
+/** Base class for Flink1 catalog tests. */
+public class FlinkCatalogTestBase {
+
+    private static final String DEFINITION_QUERY = "SELECT id, region, county 
FROM T";
+
+    private static final IntervalFreshness FRESHNESS = 
IntervalFreshness.ofMinute("3");
+
+    protected CatalogMaterializedTable createMaterializedTable(
+            ResolvedSchema resolvedSchema, Map<String, String> options) {
+        return new ResolvedCatalogMaterializedTable(
+                CatalogMaterializedTable.newBuilder()
+                        
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+                        .comment("test materialized table comment")
+                        .partitionKeys(Collections.emptyList())
+                        .options(options)
+                        .definitionQuery(DEFINITION_QUERY)
+                        .freshness(FRESHNESS)
+                        
.logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC)
+                        
.refreshMode(CatalogMaterializedTable.RefreshMode.CONTINUOUS)
+                        
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
+                        .build(),
+                resolvedSchema);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
 
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
index d293d30057..252b4f9b75 100644
--- 
a/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
+++ 
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
@@ -26,14 +26,19 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.streaming.api.operators.SourceOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
 import org.apache.flink.util.function.FunctionWithException;
 
 /** Helper class to resolve the compatibility of {@link SourceOperator}'s 
constructor. */
 public abstract class AbstractTestingSourceOperator<T, S extends SourceSplit>
         extends SourceOperator<T, S> {
 
+    private final int subtaskIndex;
+    private final int parallelism;
+
     public AbstractTestingSourceOperator(
             FunctionWithException<SourceReaderContext, SourceReader<T, S>, 
Exception> readerFactory,
             OperatorEventGateway operatorEventGateway,
@@ -42,6 +47,8 @@ public abstract class AbstractTestingSourceOperator<T, S 
extends SourceSplit>
             ProcessingTimeService timeService,
             Configuration configuration,
             String localHostname,
+            int subtaskIndex,
+            int parallelism,
             boolean emitProgressiveWatermarks,
             StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
         super(
@@ -54,5 +61,12 @@ public abstract class AbstractTestingSourceOperator<T, S 
extends SourceSplit>
                 localHostname,
                 emitProgressiveWatermarks,
                 canEmitBatchOfRecords);
+        this.subtaskIndex = subtaskIndex;
+        this.parallelism = parallelism;
+    }
+
+    @Override
+    public StreamingRuntimeContext getRuntimeContext() {
+        return new MockStreamingRuntimeContext(false, parallelism, 
subtaskIndex);
     }
 }
diff --git 
a/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTestBase.java
 
b/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTestBase.java
new file mode 100644
index 0000000000..564848c054
--- /dev/null
+++ 
b/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTestBase.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.IntervalFreshness;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+
+import java.util.Collections;
+import java.util.Map;
+
+/** Base class for Flink2 catalog tests. */
+public class FlinkCatalogTestBase {
+
+    private static final String DEFINITION_QUERY = "SELECT id, region, county 
FROM T";
+
+    private static final IntervalFreshness FRESHNESS = 
IntervalFreshness.ofMinute("3");
+
+    protected CatalogMaterializedTable createMaterializedTable(
+            ResolvedSchema resolvedSchema, Map<String, String> options) {
+        return new ResolvedCatalogMaterializedTable(
+                CatalogMaterializedTable.newBuilder()
+                        
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+                        .comment("test materialized table comment")
+                        .partitionKeys(Collections.emptyList())
+                        .options(options)
+                        .definitionQuery(DEFINITION_QUERY)
+                        .freshness(FRESHNESS)
+                        
.logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC)
+                        
.refreshMode(CatalogMaterializedTable.RefreshMode.CONTINUOUS)
+                        
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
+                        .build(),
+                resolvedSchema,
+                CatalogMaterializedTable.RefreshMode.CONTINUOUS,
+                FRESHNESS);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
 
b/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
index a2f402a874..51a759847f 100644
--- 
a/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
+++ 
b/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
@@ -26,8 +26,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.streaming.api.operators.SourceOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
 import org.apache.flink.util.function.FunctionWithException;
 
 import java.util.HashMap;
@@ -36,6 +38,9 @@ import java.util.HashMap;
 public abstract class AbstractTestingSourceOperator<T, S extends SourceSplit>
         extends SourceOperator<T, S> {
 
+    private final int subtaskIndex;
+    private final int parallelism;
+
     public AbstractTestingSourceOperator(
             FunctionWithException<SourceReaderContext, SourceReader<T, S>, 
Exception> readerFactory,
             OperatorEventGateway operatorEventGateway,
@@ -44,9 +49,10 @@ public abstract class AbstractTestingSourceOperator<T, S 
extends SourceSplit>
             ProcessingTimeService timeService,
             Configuration configuration,
             String localHostname,
+            int subtaskIndex,
+            int parallelism,
             boolean emitProgressiveWatermarks,
             StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
-
         super(
                 null,
                 readerFactory,
@@ -58,6 +64,14 @@ public abstract class AbstractTestingSourceOperator<T, S 
extends SourceSplit>
                 localHostname,
                 emitProgressiveWatermarks,
                 canEmitBatchOfRecords,
-                new HashMap<>());
+                new HashMap<>(),
+                false);
+        this.subtaskIndex = subtaskIndex;
+        this.parallelism = parallelism;
+    }
+
+    @Override
+    public StreamingRuntimeContext getRuntimeContext() {
+        return new MockStreamingRuntimeContext(parallelism, subtaskIndex);
     }
 }
diff --git a/pom.xml b/pom.xml
index e9659b707c..47f89aaa39 100644
--- a/pom.xml
+++ b/pom.xml
@@ -471,15 +471,16 @@ under the License.
             <id>flink2</id>
             <properties>
                 
<paimon-flinkx-common>paimon-flink2-common</paimon-flinkx-common>
-                
<paimon-flink-common.flink.version>2.1.0</paimon-flink-common.flink.version>
-                <test.flink.main.version>2.1</test.flink.main.version>
-                <test.flink.version>2.1.0</test.flink.version>
+                
<paimon-flink-common.flink.version>2.2.0</paimon-flink-common.flink.version>
+                <test.flink.main.version>2.2</test.flink.main.version>
+                <test.flink.version>2.2.0</test.flink.version>
                 <target.java.version>11</target.java.version>
             </properties>
             <modules>
                 <module>paimon-flink/paimon-flink2-common</module>
                 <module>paimon-flink/paimon-flink-2.0</module>
                 <module>paimon-flink/paimon-flink-2.1</module>
+                <module>paimon-flink/paimon-flink-2.2</module>
             </modules>
             <activation>
                 <property>
diff --git a/tools/releasing/deploy_staging_jars_for_jdk11.sh 
b/tools/releasing/deploy_staging_jars_for_jdk11.sh
index 44712e351c..d5d5e8b1b5 100755
--- a/tools/releasing/deploy_staging_jars_for_jdk11.sh
+++ b/tools/releasing/deploy_staging_jars_for_jdk11.sh
@@ -44,10 +44,10 @@ cd ${PROJECT_ROOT}
 
 echo "Building flink2 and iceberg modules"
 ${MVN} clean install -Pdocs-and-source,flink2 -DskipTests \
--pl 
org.apache.paimon:paimon-flink-2.0,org.apache.paimon:paimon-flink-2.1,org.apache.paimon:paimon-iceberg
 -am $CUSTOM_OPTIONS
+-pl 
org.apache.paimon:paimon-flink-2.0,org.apache.paimon:paimon-flink-2.1,org.apache.paimon:paimon-flink-2.2,org.apache.paimon:paimon-iceberg
 -am $CUSTOM_OPTIONS
 
 echo "Deploying flink2 and iceberg modules to repository.apache.org"
 ${MVN} deploy -Papache-release,docs-and-source,flink2 -DskipTests 
-DretryFailedDeploymentCount=10 \
--pl 
org.apache.paimon:paimon-flink-2.0,org.apache.paimon:paimon-flink-2.1,org.apache.paimon:paimon-flink2-common,org.apache.paimon:paimon-iceberg
 $CUSTOM_OPTIONS
+-pl 
org.apache.paimon:paimon-flink-2.0,org.apache.paimon:paimon-flink-2.1,org.apache.paimon:paimon-flink-2.2,org.apache.paimon:paimon-flink2-common,org.apache.paimon:paimon-iceberg
 $CUSTOM_OPTIONS
 
 cd ${CURR_DIR}

Reply via email to