This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 49b43321a3 [flink] Bump Flink version to 2.2 (#6775)
49b43321a3 is described below
commit 49b43321a3eb3d954a0405fee5f910ab75997bd8
Author: Kerwin <[email protected]>
AuthorDate: Thu Dec 11 18:10:46 2025 +0800
[flink] Bump Flink version to 2.2 (#6775)
---
.github/workflows/e2e-tests-flink-2.x-jdk11.yml | 2 +-
.github/workflows/utitcase-flink-2.x-jdk11.yml | 2 +-
docs/content/flink/quick-start.md | 36 +++++++------
docs/content/project/download.md | 2 +
paimon-e2e-tests/pom.xml | 9 ++++
.../java/org/apache/paimon/tests/E2eTestBase.java | 51 ++++++++++++++++++
.../java/org/apache/paimon/tests/TypeE2eTest.java | 17 ++++++
.../test/resources-filtered/docker-compose.yaml | 2 +
.../pom.xml | 36 +++++++++----
.../org/apache/paimon/flink/FlinkCatalogTest.java | 30 ++---------
.../predicate/SimpleSqlPredicateConvertorTest.java | 63 ++++++++++++----------
.../source/operator/TestingSourceOperator.java | 14 +----
paimon-flink/paimon-flink1-common/pom.xml | 7 +++
.../apache/paimon/flink/FlinkCatalogTestBase.java | 53 ++++++++++++++++++
.../operator/AbstractTestingSourceOperator.java | 14 +++++
.../apache/paimon/flink/FlinkCatalogTestBase.java | 55 +++++++++++++++++++
.../operator/AbstractTestingSourceOperator.java | 18 ++++++-
pom.xml | 7 +--
tools/releasing/deploy_staging_jars_for_jdk11.sh | 4 +-
19 files changed, 322 insertions(+), 100 deletions(-)
diff --git a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
index 5f97778467..22b6859d24 100644
--- a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
+++ b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
@@ -39,7 +39,7 @@ jobs:
fail-fast: true
matrix:
# Last element should be the current default flink version
- flink_version: [ '2.0', '2.1' ]
+ flink_version: [ '2.0', '2.1', '2.2' ]
steps:
- name: Checkout code
uses: actions/checkout@v4
diff --git a/.github/workflows/utitcase-flink-2.x-jdk11.yml
b/.github/workflows/utitcase-flink-2.x-jdk11.yml
index 8e90e679cb..95b345fa7a 100644
--- a/.github/workflows/utitcase-flink-2.x-jdk11.yml
+++ b/.github/workflows/utitcase-flink-2.x-jdk11.yml
@@ -56,7 +56,7 @@ jobs:
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
test_modules=""
- for suffix in 2.0 2.1 common; do
+ for suffix in 2.0 2.1 2.2 common; do
test_modules+="org.apache.paimon:paimon-flink-${suffix},"
done
test_modules="${test_modules%,}"
diff --git a/docs/content/flink/quick-start.md
b/docs/content/flink/quick-start.md
index f00139d348..0317ce1bad 100644
--- a/docs/content/flink/quick-start.md
+++ b/docs/content/flink/quick-start.md
@@ -30,35 +30,37 @@ This documentation is a guide for using Paimon in Flink.
## Jars
-Paimon currently supports Flink 2.0, 1.20, 1.19, 1.18, 1.17, 1.16. We
recommend the latest Flink version for a better experience.
+Paimon currently supports Flink 2.2, 2.1, 2.0, 1.20, 1.19, 1.18, 1.17, 1.16.
We recommend the latest Flink version for a better experience.
Download the jar file with corresponding version.
> Currently, paimon provides two types jar: one of which(the bundled jar) is
> used for read/write data, and the other(action jar) for operations such as
> manually compaction,
{{< stable >}}
-| Version | Type | Jar
|
-|-------------|-------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| Flink 2.0 | Bundled Jar | [paimon-flink-2.0-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-2.0/{{<
version >}}/paimon-flink-2.0-{{< version >}}.jar) |
-| Flink 1.20 | Bundled Jar | [paimon-flink-1.20-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.20/{{<
version >}}/paimon-flink-1.20-{{< version >}}.jar) |
-| Flink 1.19 | Bundled Jar | [paimon-flink-1.19-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.19/{{<
version >}}/paimon-flink-1.19-{{< version >}}.jar) |
-| Flink 1.18 | Bundled Jar | [paimon-flink-1.18-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.18/{{<
version >}}/paimon-flink-1.18-{{< version >}}.jar) |
-| Flink 1.17 | Bundled Jar | [paimon-flink-1.17-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.17/{{<
version >}}/paimon-flink-1.17-{{< version >}}.jar) |
-| Flink 1.16 | Bundled Jar | [paimon-flink-1.16-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.16/{{<
version >}}/paimon-flink-1.16-{{< version >}}.jar) |
+| Version | Type | Jar
|
+|--------------|-------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Flink 2.2 | Bundled Jar | [paimon-flink-2.2-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-2.2/{{<
version >}}/paimon-flink-2.2-{{< version >}}.jar) |
+| Flink 2.1 | Bundled Jar | [paimon-flink-2.1-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-2.1/{{<
version >}}/paimon-flink-2.1-{{< version >}}.jar) |
+| Flink 2.0 | Bundled Jar | [paimon-flink-2.0-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-2.0/{{<
version >}}/paimon-flink-2.0-{{< version >}}.jar) |
+| Flink 1.20 | Bundled Jar | [paimon-flink-1.20-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.20/{{<
version >}}/paimon-flink-1.20-{{< version >}}.jar) |
+| Flink 1.19 | Bundled Jar | [paimon-flink-1.19-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.19/{{<
version >}}/paimon-flink-1.19-{{< version >}}.jar) |
+| Flink 1.18 | Bundled Jar | [paimon-flink-1.18-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.18/{{<
version >}}/paimon-flink-1.18-{{< version >}}.jar) |
+| Flink 1.17 | Bundled Jar | [paimon-flink-1.17-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.17/{{<
version >}}/paimon-flink-1.17-{{< version >}}.jar) |
+| Flink 1.16 | Bundled Jar | [paimon-flink-1.16-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.16/{{<
version >}}/paimon-flink-1.16-{{< version >}}.jar) |
| Flink Action | Action Jar | [paimon-flink-action-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-action/{{<
version >}}/paimon-flink-action-{{< version >}}.jar) |
{{< /stable >}}
{{< unstable >}}
-| Version | Type | Jar
|
-|-------------|-------------|-------------------------------------------------------------------------------------------------------------------------------------------|
-| Flink 2.0 | Bundled Jar | Not yet released
|
-| Flink 1.20 | Bundled Jar | [paimon-flink-1.20-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.20/{{<
version >}}/) |
-| Flink 1.19 | Bundled Jar | [paimon-flink-1.19-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.19/{{<
version >}}/) |
-| Flink 1.18 | Bundled Jar | [paimon-flink-1.18-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.18/{{<
version >}}/) |
-| Flink 1.17 | Bundled Jar | [paimon-flink-1.17-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.17/{{<
version >}}/) |
-| Flink 1.16 | Bundled Jar | [paimon-flink-1.16-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.16/{{<
version >}}/) |
+| Version | Type | Jar
|
+|--------------|-------------|-------------------------------------------------------------------------------------------------------------------------------------------|
+| Flink 2.0 | Bundled Jar | Not yet released
|
+| Flink 1.20 | Bundled Jar | [paimon-flink-1.20-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.20/{{<
version >}}/) |
+| Flink 1.19 | Bundled Jar | [paimon-flink-1.19-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.19/{{<
version >}}/) |
+| Flink 1.18 | Bundled Jar | [paimon-flink-1.18-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.18/{{<
version >}}/) |
+| Flink 1.17 | Bundled Jar | [paimon-flink-1.17-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.17/{{<
version >}}/) |
+| Flink 1.16 | Bundled Jar | [paimon-flink-1.16-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-1.16/{{<
version >}}/) |
| Flink Action | Action Jar | [paimon-flink-action-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-action/{{<
version >}}/) |
{{< /unstable >}}
diff --git a/docs/content/project/download.md b/docs/content/project/download.md
index 51c6f7ac22..e4aec5d8b6 100644
--- a/docs/content/project/download.md
+++ b/docs/content/project/download.md
@@ -59,6 +59,8 @@ This documentation is a guide for downloading Paimon Jars.
| Version | Jar
|
|------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Flink 2.2 | [paimon-flink-2.2-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-2.2/{{<
version >}}/paimon-flink-2.2-{{< version >}}.jar)
|
+| Flink 2.1 | [paimon-flink-2.1-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-2.1/{{<
version >}}/paimon-flink-2.1-{{< version >}}.jar)
|
| Flink 2.0 | [paimon-flink-2.0-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-2.0/{{<
version >}}/paimon-flink-2.0-{{< version >}}.jar)
|
| Flink 1.20 | [paimon-flink-1.20-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.20/{{<
version >}}/paimon-flink-1.20-{{< version >}}.jar)
|
| Flink 1.19 | [paimon-flink-1.19-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.19/{{<
version >}}/paimon-flink-1.19-{{< version >}}.jar)
|
diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml
index 973d9357a2..d3246d8154 100644
--- a/paimon-e2e-tests/pom.xml
+++ b/paimon-e2e-tests/pom.xml
@@ -295,6 +295,15 @@ under the License.
<profiles>
<!-- Activate these profiles with -Pflink-x.xx to build and test
against different Flink versions -->
+ <profile>
+ <id>flink-2.1</id>
+ <properties>
+ <test.flink.main.version>2.1</test.flink.main.version>
+ <test.flink.version>2.1.0</test.flink.version>
+
<test.flink.connector.kafka.version>4.0.0-2.0</test.flink.connector.kafka.version>
+ </properties>
+ </profile>
+
<profile>
<id>flink-2.0</id>
<properties>
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
index 44d23a6b36..048356933d 100644
--- a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
+++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/E2eTestBase.java
@@ -80,10 +80,13 @@ public abstract class E2eTestBase {
private static final int CHECK_RESULT_INTERVAL_MS = 1000;
private static final int CHECK_RESULT_RETRIES = 60;
private final List<String> currentResults = new ArrayList<>();
+ private static final Pattern FLINK_VERSION_PATTERN =
+ Pattern.compile("Version:\\s*([0-9]+(?:\\.[0-9]+){1,2})");
protected Network network;
protected ComposeContainer environment;
protected ContainerState jobManager;
+ protected String flinkVersion;
@BeforeEach
public void before() throws Exception {
@@ -146,6 +149,11 @@ public abstract class E2eTestBase {
jobManager =
environment.getContainerByServiceName("jobmanager-1").get();
jobManager.execInContainer("chown", "-R", "flink:flink",
TEST_DATA_DIR);
+
+ String flinkVersionCliOut =
+ jobManager.execInContainer("bash", "-c", "flink
--version").getStdout();
+ Matcher flinkVersionMatcher =
FLINK_VERSION_PATTERN.matcher(flinkVersionCliOut);
+ flinkVersion = flinkVersionMatcher.find() ?
flinkVersionMatcher.group(1) : null;
}
private WaitStrategy buildWaitStrategy(String regex, int times) {
@@ -370,6 +378,49 @@ public abstract class E2eTestBase {
+ actual);
}
+ protected boolean isFlinkVersionAtLeast(String compareToVersion) {
+ if (flinkVersion == null || compareToVersion == null) {
+ return false;
+ }
+
+ int[] current = safelyParseVersion(flinkVersion);
+ int[] target = safelyParseVersion(compareToVersion);
+
+ for (int i = 0; i < 3; i++) {
+ if (current[i] < target[i]) {
+ return false;
+ }
+ if (current[i] > target[i]) {
+ return true;
+ }
+ }
+ return true; // equal
+ }
+
+ private int[] safelyParseVersion(String v) {
+ // default: [0, 0, 0]
+ int[] nums = new int[] {0, 0, 0};
+
+ if (v == null || v.isEmpty()) {
+ return nums;
+ }
+
+ // keep only "number" parts before possible suffix (e.g., rc1 /
SNAPSHOT)
+ String[] parts = v.split("\\.");
+
+ for (int i = 0; i < Math.min(parts.length, 3); i++) {
+ String numeric = parts[i].replaceAll("[^0-9]", ""); // strip
non-digit
+ if (!numeric.isEmpty()) {
+ try {
+ nums[i] = Integer.parseInt(numeric);
+ } catch (NumberFormatException ignored) {
+ nums[i] = 0;
+ }
+ }
+ }
+ return nums;
+ }
+
private class LogConsumer extends Slf4jLogConsumer {
public LogConsumer(Logger logger) {
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/TypeE2eTest.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/TypeE2eTest.java
index bbd1b22aca..33004cb87b 100644
--- a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/TypeE2eTest.java
+++ b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/TypeE2eTest.java
@@ -95,6 +95,14 @@ public class TypeE2eTest extends E2eTestBase {
"true, 1, 10, 100, 1000, 1.1, 1.11, 12.456, "
+ "123456789123456789.12345678, hi, hello,
table桌子store商店, [116], "
+ "2022-04-28, 2022-04-28T15:35:45.123, [hi, hello,
null, test], +I[1, 10, 测试]";
+
+ if (isFlinkVersionAtLeast("2.2.0")) {
+ // https://issues.apache.org/jira/browse/FLINK-38062 ENCODE
function behaves wrong
+ expected =
+ "true, 1, 10, 100, 1000, 1.1, 1.11, 12.456, "
+ + "123456789123456789.12345678, hi, hello,
table桌子store商店, [116, 97, 98, 108, 101, -26, -95, -116, -27, -83, -112, 115,
116, 111, 114, 101, -27, -107, -122, -27, -70, -105], "
+ + "2022-04-28, 2022-04-28T15:35:45.123, [hi,
hello, null, test], +I[1, 10, 测试]";
+ }
checkResult(
expected,
"null, null, null, null, null, null, null, null, null, "
@@ -182,6 +190,15 @@ public class TypeE2eTest extends E2eTestBase {
+ "123456789123456789.12345678, hi, hello,
table桌子store商店, [116], "
+ "2022-04-28, 2022-04-28T15:35:45.123, [hi, hello,
null, test], +I[1, 10, 测试], "
+ "{hi=1, test=3, hello=null}";
+
+ if (isFlinkVersionAtLeast("2.2.0")) {
+ // https://issues.apache.org/jira/browse/FLINK-38062 ENCODE
function behaves wrong
+ expected =
+ "1, true, 1, 10, 100, 1000, 1.1, 1.11, 12.456, "
+ + "123456789123456789.12345678, hi, hello,
table桌子store商店, [116, 97, 98, 108, 101, -26, -95, -116, -27, -83, -112, 115,
116, 111, 114, 101, -27, -107, -122, -27, -70, -105], "
+ + "2022-04-28, 2022-04-28T15:35:45.123, [hi,
hello, null, test], +I[1, 10, 测试], "
+ + "{hi=1, test=3, hello=null}";
+ }
checkResult(
expected,
"2, null, null, null, null, null, null, null, null, null, "
diff --git a/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
b/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
index 3909b54b41..c9d579fb56 100644
--- a/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
+++ b/paimon-e2e-tests/src/test/resources-filtered/docker-compose.yaml
@@ -25,6 +25,7 @@ services:
# ----------------------------------------
jobmanager:
+ user: root
image: apache/flink:${test.flink.version}-${test.java.version}
volumes:
- testdata:/test-data
@@ -49,6 +50,7 @@ services:
- "8081"
taskmanager:
+ user: root
image: apache/flink:${test.flink.version}-${test.java.version}
volumes:
- testdata:/test-data
diff --git a/paimon-flink/paimon-flink1-common/pom.xml
b/paimon-flink/paimon-flink-2.2/pom.xml
similarity index 65%
copy from paimon-flink/paimon-flink1-common/pom.xml
copy to paimon-flink/paimon-flink-2.2/pom.xml
index df45d547d0..dc3dd75fb3 100644
--- a/paimon-flink/paimon-flink1-common/pom.xml
+++ b/paimon-flink/paimon-flink-2.2/pom.xml
@@ -21,6 +21,7 @@ under the License.
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
+
<parent>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink</artifactId>
@@ -29,26 +30,33 @@ under the License.
<packaging>jar</packaging>
- <artifactId>paimon-flink1-common</artifactId>
- <name>Paimon : Flink 1.x : Common</name>
+ <artifactId>paimon-flink-2.2</artifactId>
+ <name>Paimon : Flink : 2.2</name>
<properties>
- <flink.version>1.20.1</flink.version>
+ <flink.version>2.2.0</flink.version>
</properties>
<dependencies>
<dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink-common</artifactId>
+ <version>${project.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink2-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
@@ -61,12 +69,22 @@ under the License.
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
+ <artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
+ <id>shade-paimon</id>
+ <phase>package</phase>
<goals>
- <goal>test-jar</goal>
+ <goal>shade</goal>
</goals>
+ <configuration>
+ <artifactSet>
+ <includes combine.children="append">
+
<include>org.apache.paimon:paimon-flink-common</include>
+
<include>org.apache.paimon:paimon-flink2-common</include>
+ </includes>
+ </artifactSet>
+ </configuration>
</execution>
</executions>
</plugin>
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index 0892def58c..7494f91205 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -45,7 +45,6 @@ import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.IntervalFreshness;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
@@ -109,7 +108,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/** Test for {@link FlinkCatalog}. */
-public class FlinkCatalogTest {
+public class FlinkCatalogTest extends FlinkCatalogTestBase {
private static final String TESTING_LOG_STORE = "testing";
@@ -122,10 +121,6 @@ public class FlinkCatalogTest {
private final ObjectPath nonExistDbPath =
ObjectPath.fromString("non.exist");
private final ObjectPath nonExistObjectPath =
ObjectPath.fromString("db1.nonexist");
- private static final String DEFINITION_QUERY = "SELECT id, region, county
FROM T";
-
- private static final IntervalFreshness FRESHNESS =
IntervalFreshness.ofMinute("3");
-
private String warehouse;
private Catalog catalog;
@@ -217,29 +212,13 @@ public class FlinkCatalogTest {
return new ResolvedCatalogTable(origin, resolvedSchema);
}
- private CatalogMaterializedTable createMaterializedTable(Map<String,
String> options) {
- ResolvedSchema resolvedSchema = this.createSchema();
- return new ResolvedCatalogMaterializedTable(
- CatalogMaterializedTable.newBuilder()
-
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
- .comment("test materialized table comment")
- .partitionKeys(Collections.emptyList())
- .options(options)
- .definitionQuery(DEFINITION_QUERY)
- .freshness(FRESHNESS)
-
.logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC)
-
.refreshMode(CatalogMaterializedTable.RefreshMode.CONTINUOUS)
-
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
- .build(),
- resolvedSchema);
- }
-
@ParameterizedTest
@MethodSource("batchOptionProvider")
public void testCreateAndGetCatalogMaterializedTable(Map<String, String>
options)
throws Exception {
ObjectPath tablePath = path1;
- CatalogMaterializedTable materializedTable =
createMaterializedTable(options);
+ CatalogMaterializedTable materializedTable =
+ createMaterializedTable(this.createSchema(), options);
catalog.createDatabase(tablePath.getDatabaseName(), null, false);
// test create materialized table
catalog.createTable(tablePath, materializedTable, true);
@@ -276,7 +255,8 @@ public class FlinkCatalogTest {
@MethodSource("batchOptionProvider")
public void testAlterMaterializedTable(Map<String, String> options) throws
Exception {
ObjectPath tablePath = path1;
- CatalogMaterializedTable materializedTable =
createMaterializedTable(options);
+ CatalogMaterializedTable materializedTable =
+ createMaterializedTable(this.createSchema(), options);
catalog.createDatabase(tablePath.getDatabaseName(), null, false);
catalog.createTable(tablePath, materializedTable, true);
TestRefreshHandler refreshHandler = new TestRefreshHandler("jobID:
xxx, clusterId: yyy");
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertorTest.java
index a68d15e746..189b09df76 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/predicate/SimpleSqlPredicateConvertorTest.java
@@ -26,12 +26,14 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
-import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
/** test for {@link SimpleSqlPredicateConvertor} . */
class SimpleSqlPredicateConvertorTest {
RowType rowType;
@@ -58,7 +60,7 @@ class SimpleSqlPredicateConvertorTest {
//
//
org.apache.calcite.sql.parser.ImmutableSqlParser$Config.withUnquotedCasing(org.apache.calcite.avatica.util.Casing)
Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("`hour` ='1'");
- Assertions.assertThat(predicate)
+ assertThat(predicate)
.isEqualTo(
predicateBuilder.equal(
predicateBuilder.indexOf("hour"),
@@ -68,7 +70,7 @@ class SimpleSqlPredicateConvertorTest {
{
Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("
'2024-05-25' = c");
- Assertions.assertThat(predicate)
+ assertThat(predicate)
.isEqualTo(predicateBuilder.equal(predicateBuilder.indexOf("c"), 19868));
}
}
@@ -77,14 +79,14 @@ class SimpleSqlPredicateConvertorTest {
public void testNotEqual() throws Exception {
{
Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("a <>'1'");
- Assertions.assertThat(predicate)
+ assertThat(predicate)
.isEqualTo(predicateBuilder.notEqual(predicateBuilder.indexOf("a"), 1));
}
{
Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("'2024-05-25' <> c");
- Assertions.assertThat(predicate)
+ assertThat(predicate)
.isEqualTo(predicateBuilder.notEqual(predicateBuilder.indexOf("c"), 19868));
}
}
@@ -93,14 +95,14 @@ class SimpleSqlPredicateConvertorTest {
public void testLessThan() throws Exception {
{
Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("a <'1'");
- Assertions.assertThat(predicate)
+ assertThat(predicate)
.isEqualTo(predicateBuilder.lessThan(predicateBuilder.indexOf("a"), 1));
}
{
Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("'2024-05-25' <c ");
- Assertions.assertThat(predicate)
+ assertThat(predicate)
.isEqualTo(predicateBuilder.greaterThan(predicateBuilder.indexOf("c"), 19868));
}
}
@@ -109,14 +111,14 @@ class SimpleSqlPredicateConvertorTest {
public void testLessThanOrEqual() throws Exception {
{
Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("a <='1'");
- Assertions.assertThat(predicate)
+ assertThat(predicate)
.isEqualTo(predicateBuilder.lessOrEqual(predicateBuilder.indexOf("a"), 1));
}
{
Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("'2024-05-25' <= c");
- Assertions.assertThat(predicate)
+ assertThat(predicate)
.isEqualTo(
predicateBuilder.greaterOrEqual(predicateBuilder.indexOf("c"), 19868));
}
@@ -126,14 +128,14 @@ class SimpleSqlPredicateConvertorTest {
public void testGreatThan() throws Exception {
{
Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("a >'1'");
- Assertions.assertThat(predicate)
+ assertThat(predicate)
.isEqualTo(predicateBuilder.greaterThan(predicateBuilder.indexOf("a"), 1));
}
{
Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("'2024-05-25' > c");
- Assertions.assertThat(predicate)
+ assertThat(predicate)
.isEqualTo(predicateBuilder.lessThan(predicateBuilder.indexOf("c"), 19868));
}
}
@@ -142,14 +144,14 @@ class SimpleSqlPredicateConvertorTest {
public void testGreatEqual() throws Exception {
{
Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("a >='1'");
- Assertions.assertThat(predicate)
+ assertThat(predicate)
.isEqualTo(predicateBuilder.greaterOrEqual(predicateBuilder.indexOf("a"), 1));
}
{
Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("
'2024-05-25' >= c");
- Assertions.assertThat(predicate)
+ assertThat(predicate)
.isEqualTo(predicateBuilder.lessOrEqual(predicateBuilder.indexOf("c"), 19868));
}
}
@@ -158,7 +160,7 @@ class SimpleSqlPredicateConvertorTest {
public void testIN() throws Exception {
Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("a in ('1','2')");
List<Object> elements = Lists.newArrayList(1, 2);
- Assertions.assertThat(predicate)
+ assertThat(predicate)
.isEqualTo(predicateBuilder.in(predicateBuilder.indexOf("a"),
elements));
}
@@ -167,7 +169,7 @@ class SimpleSqlPredicateConvertorTest {
Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("a not in
('1','2')");
List<Object> elements = Lists.newArrayList(1, 2);
- Assertions.assertThat(predicate)
+ assertThat(predicate)
.isEqualTo(
predicateBuilder
.in(predicateBuilder.indexOf("a"), elements)
@@ -178,15 +180,13 @@ class SimpleSqlPredicateConvertorTest {
@Test
public void testIsNull() throws Exception {
Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("a is null ");
- Assertions.assertThat(predicate)
-
.isEqualTo(predicateBuilder.isNull(predicateBuilder.indexOf("a")));
+
assertThat(predicate).isEqualTo(predicateBuilder.isNull(predicateBuilder.indexOf("a")));
}
@Test
public void testIsNotNull() throws Exception {
Predicate predicate =
simpleSqlPredicateConvertor.convertSqlToPredicate("a is not null ");
- Assertions.assertThat(predicate)
-
.isEqualTo(predicateBuilder.isNotNull(predicateBuilder.indexOf("a")));
+
assertThat(predicate).isEqualTo(predicateBuilder.isNotNull(predicateBuilder.indexOf("a")));
}
@Test
@@ -197,7 +197,7 @@ class SimpleSqlPredicateConvertorTest {
PredicateBuilder.and(
predicateBuilder.isNotNull(predicateBuilder.indexOf("a")),
predicateBuilder.isNull(predicateBuilder.indexOf("c")));
- Assertions.assertThat(actual).isEqualTo(expected);
+ assertThat(actual).isEqualTo(expected);
}
@Test
@@ -208,34 +208,41 @@ class SimpleSqlPredicateConvertorTest {
PredicateBuilder.or(
predicateBuilder.isNotNull(predicateBuilder.indexOf("a")),
predicateBuilder.isNull(predicateBuilder.indexOf("c")));
- Assertions.assertThat(actual).isEqualTo(expected);
+ assertThat(actual).isEqualTo(expected);
}
@Test
public void testNOT() throws Exception {
Predicate actual =
simpleSqlPredicateConvertor.convertSqlToPredicate("not (a is null) ");
Predicate expected =
predicateBuilder.isNull(predicateBuilder.indexOf("a")).negate().get();
- Assertions.assertThat(actual).isEqualTo(expected);
+ assertThat(actual).isEqualTo(expected);
}
@Test
public void testFieldNoFound() {
- Assertions.assertThatThrownBy(
- () ->
simpleSqlPredicateConvertor.convertSqlToPredicate("f =1"))
+ assertThatThrownBy(() ->
simpleSqlPredicateConvertor.convertSqlToPredicate("f =1"))
.hasMessage("Field `f` not found");
}
@Test
public void testSqlNoSupport() {
// function not supported
- Assertions.assertThatThrownBy(
+ assertThatThrownBy(
() ->
simpleSqlPredicateConvertor.convertSqlToPredicate(
"substring(f,0,1) =1"))
- .hasMessage("SUBSTRING(`f` FROM 0 FOR 1) or 1 not been
supported.");
+ .satisfiesAnyOf(
+ // Legacy Calcite format: SUBSTRING(`f` FROM 0 FOR 1)
+ e ->
+ assertThat(e.getMessage())
+ .contains(
+ "SUBSTRING(`f` FROM 0 FOR 1)
or 1 not been supported."),
+ // Flink 2.2.0+ / newer Calcite format: SUBSTRING(`f`,
0, 1)
+ e ->
+ assertThat(e.getMessage())
+ .contains("SUBSTRING(`f`, 0, 1) or 1
not been supported."));
// like not supported
- Assertions.assertThatThrownBy(
- () ->
simpleSqlPredicateConvertor.convertSqlToPredicate("b like 'x'"))
+ assertThatThrownBy(() ->
simpleSqlPredicateConvertor.convertSqlToPredicate("b like 'x'"))
.hasMessage("LIKE not been supported.");
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java
index a77d0fc870..b60e63f2f3 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java
@@ -40,13 +40,11 @@ import
org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
-import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import java.util.ArrayList;
import java.util.Collections;
@@ -65,9 +63,6 @@ public class TestingSourceOperator<T> extends
AbstractTestingSourceOperator<T, S
private static final long serialVersionUID = 1L;
- private final int subtaskIndex;
- private final int parallelism;
-
public TestingSourceOperator(
StreamOperatorParameters<T> parameters,
SourceReader<T, SimpleSourceSplit> reader,
@@ -104,11 +99,11 @@ public class TestingSourceOperator<T> extends
AbstractTestingSourceOperator<T, S
timeService,
new Configuration(),
"localhost",
+ subtaskIndex,
+ parallelism,
emitProgressiveWatermarks,
() -> false);
- this.subtaskIndex = subtaskIndex;
- this.parallelism = parallelism;
this.metrics =
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
initSourceMetricGroup();
@@ -122,11 +117,6 @@ public class TestingSourceOperator<T> extends
AbstractTestingSourceOperator<T, S
setup(parameters.getContainingTask(), parameters.getStreamConfig(),
parameters.getOutput());
}
- @Override
- public StreamingRuntimeContext getRuntimeContext() {
- return new MockStreamingRuntimeContext(false, parallelism,
subtaskIndex);
- }
-
// this is overridden to avoid complex mock injection through the
"containingTask"
@Override
public ExecutionConfig getExecutionConfig() {
diff --git a/paimon-flink/paimon-flink1-common/pom.xml
b/paimon-flink/paimon-flink1-common/pom.xml
index df45d547d0..74f5ed9eb9 100644
--- a/paimon-flink/paimon-flink1-common/pom.xml
+++ b/paimon-flink/paimon-flink1-common/pom.xml
@@ -55,6 +55,13 @@ under the License.
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTestBase.java
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTestBase.java
new file mode 100644
index 0000000000..77eb57c468
--- /dev/null
+++
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTestBase.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.IntervalFreshness;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+
+import java.util.Collections;
+import java.util.Map;
+
+/** Base class for Flink1 catalog tests. */
+public class FlinkCatalogTestBase {
+
+ private static final String DEFINITION_QUERY = "SELECT id, region, county
FROM T";
+
+ private static final IntervalFreshness FRESHNESS =
IntervalFreshness.ofMinute("3");
+
+ protected CatalogMaterializedTable createMaterializedTable(
+ ResolvedSchema resolvedSchema, Map<String, String> options) {
+ return new ResolvedCatalogMaterializedTable(
+ CatalogMaterializedTable.newBuilder()
+
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+ .comment("test materialized table comment")
+ .partitionKeys(Collections.emptyList())
+ .options(options)
+ .definitionQuery(DEFINITION_QUERY)
+ .freshness(FRESHNESS)
+
.logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC)
+
.refreshMode(CatalogMaterializedTable.RefreshMode.CONTINUOUS)
+
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
+ .build(),
+ resolvedSchema);
+ }
+}
diff --git
a/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
index d293d30057..252b4f9b75 100644
---
a/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
+++
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
@@ -26,14 +26,19 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.streaming.api.operators.SourceOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.util.function.FunctionWithException;
/** Helper class to resolve the compatibility of {@link SourceOperator}'s
constructor. */
public abstract class AbstractTestingSourceOperator<T, S extends SourceSplit>
extends SourceOperator<T, S> {
+ private final int subtaskIndex;
+ private final int parallelism;
+
public AbstractTestingSourceOperator(
FunctionWithException<SourceReaderContext, SourceReader<T, S>,
Exception> readerFactory,
OperatorEventGateway operatorEventGateway,
@@ -42,6 +47,8 @@ public abstract class AbstractTestingSourceOperator<T, S
extends SourceSplit>
ProcessingTimeService timeService,
Configuration configuration,
String localHostname,
+ int subtaskIndex,
+ int parallelism,
boolean emitProgressiveWatermarks,
StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
super(
@@ -54,5 +61,12 @@ public abstract class AbstractTestingSourceOperator<T, S
extends SourceSplit>
localHostname,
emitProgressiveWatermarks,
canEmitBatchOfRecords);
+ this.subtaskIndex = subtaskIndex;
+ this.parallelism = parallelism;
+ }
+
+ @Override
+ public StreamingRuntimeContext getRuntimeContext() {
+ return new MockStreamingRuntimeContext(false, parallelism,
subtaskIndex);
}
}
diff --git
a/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTestBase.java
b/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTestBase.java
new file mode 100644
index 0000000000..564848c054
--- /dev/null
+++
b/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTestBase.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.IntervalFreshness;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+
+import java.util.Collections;
+import java.util.Map;
+
+/** Base class for Flink2 catalog tests. */
+public class FlinkCatalogTestBase {
+
+ private static final String DEFINITION_QUERY = "SELECT id, region, county
FROM T";
+
+ private static final IntervalFreshness FRESHNESS =
IntervalFreshness.ofMinute("3");
+
+ protected CatalogMaterializedTable createMaterializedTable(
+ ResolvedSchema resolvedSchema, Map<String, String> options) {
+ return new ResolvedCatalogMaterializedTable(
+ CatalogMaterializedTable.newBuilder()
+
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+ .comment("test materialized table comment")
+ .partitionKeys(Collections.emptyList())
+ .options(options)
+ .definitionQuery(DEFINITION_QUERY)
+ .freshness(FRESHNESS)
+
.logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC)
+
.refreshMode(CatalogMaterializedTable.RefreshMode.CONTINUOUS)
+
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
+ .build(),
+ resolvedSchema,
+ CatalogMaterializedTable.RefreshMode.CONTINUOUS,
+ FRESHNESS);
+ }
+}
diff --git
a/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
b/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
index a2f402a874..51a759847f 100644
---
a/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
+++
b/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
@@ -26,8 +26,10 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.streaming.api.operators.SourceOperator;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.util.function.FunctionWithException;
import java.util.HashMap;
@@ -36,6 +38,9 @@ import java.util.HashMap;
public abstract class AbstractTestingSourceOperator<T, S extends SourceSplit>
extends SourceOperator<T, S> {
+ private final int subtaskIndex;
+ private final int parallelism;
+
public AbstractTestingSourceOperator(
FunctionWithException<SourceReaderContext, SourceReader<T, S>,
Exception> readerFactory,
OperatorEventGateway operatorEventGateway,
@@ -44,9 +49,10 @@ public abstract class AbstractTestingSourceOperator<T, S
extends SourceSplit>
ProcessingTimeService timeService,
Configuration configuration,
String localHostname,
+ int subtaskIndex,
+ int parallelism,
boolean emitProgressiveWatermarks,
StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
-
super(
null,
readerFactory,
@@ -58,6 +64,14 @@ public abstract class AbstractTestingSourceOperator<T, S
extends SourceSplit>
localHostname,
emitProgressiveWatermarks,
canEmitBatchOfRecords,
- new HashMap<>());
+ new HashMap<>(),
+ false);
+ this.subtaskIndex = subtaskIndex;
+ this.parallelism = parallelism;
+ }
+
+ @Override
+ public StreamingRuntimeContext getRuntimeContext() {
+ return new MockStreamingRuntimeContext(parallelism, subtaskIndex);
}
}
diff --git a/pom.xml b/pom.xml
index e9659b707c..47f89aaa39 100644
--- a/pom.xml
+++ b/pom.xml
@@ -471,15 +471,16 @@ under the License.
<id>flink2</id>
<properties>
<paimon-flinkx-common>paimon-flink2-common</paimon-flinkx-common>
-
<paimon-flink-common.flink.version>2.1.0</paimon-flink-common.flink.version>
- <test.flink.main.version>2.1</test.flink.main.version>
- <test.flink.version>2.1.0</test.flink.version>
+
<paimon-flink-common.flink.version>2.2.0</paimon-flink-common.flink.version>
+ <test.flink.main.version>2.2</test.flink.main.version>
+ <test.flink.version>2.2.0</test.flink.version>
<target.java.version>11</target.java.version>
</properties>
<modules>
<module>paimon-flink/paimon-flink2-common</module>
<module>paimon-flink/paimon-flink-2.0</module>
<module>paimon-flink/paimon-flink-2.1</module>
+ <module>paimon-flink/paimon-flink-2.2</module>
</modules>
<activation>
<property>
diff --git a/tools/releasing/deploy_staging_jars_for_jdk11.sh
b/tools/releasing/deploy_staging_jars_for_jdk11.sh
index 44712e351c..d5d5e8b1b5 100755
--- a/tools/releasing/deploy_staging_jars_for_jdk11.sh
+++ b/tools/releasing/deploy_staging_jars_for_jdk11.sh
@@ -44,10 +44,10 @@ cd ${PROJECT_ROOT}
echo "Building flink2 and iceberg modules"
${MVN} clean install -Pdocs-and-source,flink2 -DskipTests \
--pl
org.apache.paimon:paimon-flink-2.0,org.apache.paimon:paimon-flink-2.1,org.apache.paimon:paimon-iceberg
-am $CUSTOM_OPTIONS
+-pl
org.apache.paimon:paimon-flink-2.0,org.apache.paimon:paimon-flink-2.1,org.apache.paimon:paimon-flink-2.2,org.apache.paimon:paimon-iceberg
-am $CUSTOM_OPTIONS
echo "Deploying flink2 and iceberg modules to repository.apache.org"
${MVN} deploy -Papache-release,docs-and-source,flink2 -DskipTests
-DretryFailedDeploymentCount=10 \
--pl
org.apache.paimon:paimon-flink-2.0,org.apache.paimon:paimon-flink-2.1,org.apache.paimon:paimon-flink2-common,org.apache.paimon:paimon-iceberg
$CUSTOM_OPTIONS
+-pl
org.apache.paimon:paimon-flink-2.0,org.apache.paimon:paimon-flink-2.1,org.apache.paimon:paimon-flink-2.2,org.apache.paimon:paimon-flink2-common,org.apache.paimon:paimon-iceberg
$CUSTOM_OPTIONS
cd ${CURR_DIR}