This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1d2fc39774 [flink] Support Flink 2.0 (#5408)
1d2fc39774 is described below
commit 1d2fc39774f6c77bb3697dcdc4202ca47c1bdd4b
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Thu Apr 10 11:21:00 2025 +0800
[flink] Support Flink 2.0 (#5408)
---
...ink-jdk11.yml => e2e-tests-flink-1.x-jdk11.yml} | 8 +-
...e2e-tests-flink.yml => e2e-tests-flink-1.x.yml} | 8 +-
...sts-flink.yml => e2e-tests-flink-2.x-jdk11.yml} | 18 ++---
...link-jdk11.yml => utitcase-flink-1.x-jdk11.yml} | 6 +-
.../{utitcase-flink.yml => utitcase-flink-1.x.yml} | 6 +-
...link-jdk11.yml => utitcase-flink-2.x-jdk11.yml} | 22 +++---
paimon-e2e-tests/pom.xml | 7 +-
.../paimon/tests/FlinkActionsWithKafkaE2eTest.java | 8 ++
.../paimon/tests/FlinkProceduresE2eTest.java | 4 +-
.../org/apache/paimon/tests/LogStoreE2eTest.java | 8 ++
.../apache/paimon/tests/cdc/MySql57E2eTest.java | 3 +
.../apache/paimon/tests/cdc/MySql80E2eTest.java | 3 +
.../paimon/tests/cdc/MySqlCdcE2eTestBase.java | 6 ++
.../tests/cdc/MySqlComputedColumnE2ETest.java | 2 +
.../tests/cdc/MySqlTinyIntConvertE2ETest.java | 2 +
paimon-flink/paimon-flink-2.0/pom.xml | 86 ++++++++++++++++++++++
paimon-flink/paimon-flink-common/pom.xml | 17 ++++-
.../apache/paimon/flink/FormatCatalogTable.java | 2 +
.../apache/paimon/flink/action/ActionFactory.java | 4 +-
.../paimon/flink/factories/FlinkFactoryUtil.java | 2 -
.../flink/procedure/QueryServiceProcedure.java | 2 +-
.../paimon/flink/service/QueryAddressRegister.java | 1 +
.../apache/paimon/flink/sink/LogSinkFunction.java | 2 +-
.../flink/sink/RowDataStoreWriteOperator.java | 2 +-
.../org/apache/paimon/flink/FlinkCatalogTest.java | 65 ++++++++++------
.../org/apache/paimon/flink/FlinkTestBase.java | 11 +--
.../paimon/flink/PredicateConverterTest.java | 2 +-
.../apache/paimon/flink/SchemaChangeITCase.java | 2 +-
...tNewFilesCompactionCoordinatorOperatorTest.java | 3 +-
.../paimon/flink/sink/LocalMergeOperatorTest.java | 4 +
.../sink/partition/PartitionMarkDoneTest.java | 21 ++++++
.../flink/source/operator/OperatorSourceTest.java | 4 +
.../source/operator/TestingSourceOperator.java | 2 +-
paimon-flink/paimon-flink1-common/pom.xml | 69 +++++++++++++++++
.../api/functions/sink/legacy/SinkFunction.java | 16 ++--
.../flink/action/MultipleParameterToolAdapter.java | 4 +-
.../api/common/state/v2/ListStateDescriptor.java | 16 +---
.../api/common/state/v2/MapStateDescriptor.java | 16 +---
.../apache/flink/runtime/event/WatermarkEvent.java | 16 +---
.../operator/AbstractTestingSourceOperator.java | 58 +++++++++++++++
paimon-flink/paimon-flink2-common/pom.xml | 76 +++++++++++++++++++
.../flink/action/MultipleParameterToolAdapter.java | 6 +-
.../operator/AbstractTestingSourceOperator.java | 63 ++++++++++++++++
paimon-flink/pom.xml | 18 +----
paimon-hive/paimon-hive-connector-common/pom.xml | 2 +-
pom.xml | 47 ++++++++++++
46 files changed, 596 insertions(+), 154 deletions(-)
diff --git a/.github/workflows/e2e-tests-flink-jdk11.yml
b/.github/workflows/e2e-tests-flink-1.x-jdk11.yml
similarity index 84%
rename from .github/workflows/e2e-tests-flink-jdk11.yml
rename to .github/workflows/e2e-tests-flink-1.x-jdk11.yml
index 6674ec4e4f..8c0c0ce420 100644
--- a/.github/workflows/e2e-tests-flink-jdk11.yml
+++ b/.github/workflows/e2e-tests-flink-1.x-jdk11.yml
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-name: End to End Tests Flink on JDK 11
+name: End to End Tests Flink 1.x on JDK 11
on:
issue_comment:
@@ -52,7 +52,7 @@ jobs:
distribution: 'temurin'
- name: Build Flink
- run: mvn -T 1C -B clean install -DskipTests -pl paimon-e2e-tests -am
-Pflink-${{ matrix.flink_version }},java11
+ run: mvn -T 1C -B clean install -DskipTests -Pflink1 -pl
paimon-e2e-tests -am -Pflink-${{ matrix.flink_version }},java11
- name: Test Flink
run: |
@@ -62,9 +62,9 @@ jobs:
echo "JVM timezone is set to $jvm_timezone"
profile="flink-${{ matrix.flink_version }}"
if [ "${{ matrix.flink_version }}" = "${{ matrix.flink_version[-1]
}}" ]; then
- mvn -T 1C -B test -pl paimon-e2e-tests
-Duser.timezone=$jvm_timezone -Pjava11
+ mvn -T 1C -B test -Pflink1 -pl paimon-e2e-tests
-Duser.timezone=$jvm_timezone -Pjava11
else
- mvn -T 1C -B test -pl paimon-e2e-tests
-Duser.timezone=$jvm_timezone -P${profile},java11
+ mvn -T 1C -B test -Pflink1 -pl paimon-e2e-tests
-Duser.timezone=$jvm_timezone -P${profile},java11
fi
env:
MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/e2e-tests-flink.yml
b/.github/workflows/e2e-tests-flink-1.x.yml
similarity index 86%
copy from .github/workflows/e2e-tests-flink.yml
copy to .github/workflows/e2e-tests-flink-1.x.yml
index 4779efb6e1..1431ebd5cc 100644
--- a/.github/workflows/e2e-tests-flink.yml
+++ b/.github/workflows/e2e-tests-flink-1.x.yml
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-name: End to End Tests Flink
+name: End to End Tests Flink 1.x
on:
push:
@@ -52,7 +52,7 @@ jobs:
distribution: 'temurin'
- name: Build Flink
- run: mvn -T 2C -B clean install -DskipTests -pl paimon-e2e-tests -am
-Pflink-${{ matrix.flink_version }}
+ run: mvn -T 2C -B clean install -DskipTests -Pflink1 -pl
paimon-e2e-tests -am -Pflink-${{ matrix.flink_version }}
- name: Test Flink
run: |
@@ -62,9 +62,9 @@ jobs:
echo "JVM timezone is set to $jvm_timezone"
profile="flink-${{ matrix.flink_version }}"
if [ "${{ matrix.flink_version }}" = "${{ matrix.flink_version[-1]
}}" ]; then
- mvn -T 1C -B test -pl paimon-e2e-tests
-Duser.timezone=$jvm_timezone
+ mvn -T 1C -B test -Pflink1 -pl paimon-e2e-tests
-Duser.timezone=$jvm_timezone
else
- mvn -T 1C -B test -pl paimon-e2e-tests
-Duser.timezone=$jvm_timezone -P${profile}
+ mvn -T 1C -B test -Pflink1 -pl paimon-e2e-tests
-Duser.timezone=$jvm_timezone -P${profile}
fi
env:
MAVEN_OPTS: -Xmx4096m
diff --git a/.github/workflows/e2e-tests-flink.yml
b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
similarity index 77%
rename from .github/workflows/e2e-tests-flink.yml
rename to .github/workflows/e2e-tests-flink-2.x-jdk11.yml
index 4779efb6e1..c1667c4826 100644
--- a/.github/workflows/e2e-tests-flink.yml
+++ b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-name: End to End Tests Flink
+name: End to End Tests Flink 2.x on JDK 11
on:
push:
@@ -26,11 +26,7 @@ on:
- '**/*.md'
env:
- JDK_VERSION: 8
-
-concurrency:
- group: ${{ github.workflow }}-${{ github.event_name }}-${{
github.event.number || github.run_id }}
- cancel-in-progress: true
+ JDK_VERSION: 11
jobs:
build_test:
@@ -40,7 +36,7 @@ jobs:
fail-fast: true
matrix:
# Last element should be the current default flink version
- flink_version: [ '1.15', '1.16', '1.17', '1.18', '1.19', '1.20' ]
+ flink_version: [ '2.0' ]
steps:
- name: Checkout code
uses: actions/checkout@v4
@@ -52,7 +48,7 @@ jobs:
distribution: 'temurin'
- name: Build Flink
- run: mvn -T 2C -B clean install -DskipTests -pl paimon-e2e-tests -am
-Pflink-${{ matrix.flink_version }}
+ run: mvn -T 1C -B clean install -DskipTests -Pflink2 -pl
paimon-e2e-tests -am -Pflink-${{ matrix.flink_version }},java11
- name: Test Flink
run: |
@@ -62,9 +58,9 @@ jobs:
echo "JVM timezone is set to $jvm_timezone"
profile="flink-${{ matrix.flink_version }}"
if [ "${{ matrix.flink_version }}" = "${{ matrix.flink_version[-1]
}}" ]; then
- mvn -T 1C -B test -pl paimon-e2e-tests
-Duser.timezone=$jvm_timezone
+ mvn -T 1C -B test -Pflink2 -pl paimon-e2e-tests
-Duser.timezone=$jvm_timezone -Pjava11
else
- mvn -T 1C -B test -pl paimon-e2e-tests
-Duser.timezone=$jvm_timezone -P${profile}
+ mvn -T 1C -B test -Pflink2 -pl paimon-e2e-tests
-Duser.timezone=$jvm_timezone -P${profile},java11
fi
env:
- MAVEN_OPTS: -Xmx4096m
+ MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/utitcase-flink-jdk11.yml
b/.github/workflows/utitcase-flink-1.x-jdk11.yml
similarity index 89%
copy from .github/workflows/utitcase-flink-jdk11.yml
copy to .github/workflows/utitcase-flink-1.x-jdk11.yml
index 794e386fba..66aafb2ade 100644
--- a/.github/workflows/utitcase-flink-jdk11.yml
+++ b/.github/workflows/utitcase-flink-1.x-jdk11.yml
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-name: UTCase and ITCase Flink on JDK 11
+name: UTCase and ITCase Flink 1.x on JDK 11
on:
issue_comment:
@@ -45,7 +45,7 @@ jobs:
java-version: ${{ env.JDK_VERSION }}
distribution: 'temurin'
- name: Build Flink
- run: mvn -T 1C -B clean install -DskipTests
+ run: mvn -T 1C -B clean install -DskipTests -Pflink1
- name: Test Flink
run: |
# run tests with random timezone to find out timezone related bugs
@@ -57,6 +57,6 @@ jobs:
test_modules+="org.apache.paimon:paimon-flink-${suffix},"
done
test_modules="${test_modules%,}"
- mvn -T 1C -B clean install -pl "${test_modules}"
-Duser.timezone=$jvm_timezone
+ mvn -T 1C -B clean install -Pflink1 -pl "${test_modules}"
-Duser.timezone=$jvm_timezone
env:
MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/utitcase-flink.yml
b/.github/workflows/utitcase-flink-1.x.yml
similarity index 91%
rename from .github/workflows/utitcase-flink.yml
rename to .github/workflows/utitcase-flink-1.x.yml
index b094158595..fe249a45d3 100644
--- a/.github/workflows/utitcase-flink.yml
+++ b/.github/workflows/utitcase-flink-1.x.yml
@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-name: UTCase and ITCase Flink
+name: UTCase and ITCase Flink 1.x
on:
push:
@@ -54,7 +54,7 @@ jobs:
run: |
COMPILE_MODULE="org.apache.paimon:paimon-flink-${{
matrix.flink_version }}"
echo "Start compiling modules: $COMPILE_MODULE"
- mvn -T 2C -B clean install -DskipTests -pl "${COMPILE_MODULE}" -am
+ mvn -T 2C -B clean install -DskipTests -Pflink1 -pl
"${COMPILE_MODULE}" -am
- name: Test Flink
run: |
@@ -63,7 +63,7 @@ jobs:
echo "JVM timezone is set to $jvm_timezone"
TEST_MODULE="org.apache.paimon:paimon-flink-${{ matrix.flink_version
}}"
echo "Start testing module: $TEST_MODULE"
- mvn -T 2C -B test verify -pl "${TEST_MODULE}"
-Duser.timezone=$jvm_timezone
+ mvn -T 2C -B test verify -Pflink1 -pl "${TEST_MODULE}"
-Duser.timezone=$jvm_timezone
echo "All modules tested"
env:
MAVEN_OPTS: -Xmx4096m -XX:+UseG1GC -XX:CICompilerCount=2
\ No newline at end of file
diff --git a/.github/workflows/utitcase-flink-jdk11.yml
b/.github/workflows/utitcase-flink-2.x-jdk11.yml
similarity index 76%
rename from .github/workflows/utitcase-flink-jdk11.yml
rename to .github/workflows/utitcase-flink-2.x-jdk11.yml
index 794e386fba..44a0b42ad9 100644
--- a/.github/workflows/utitcase-flink-jdk11.yml
+++ b/.github/workflows/utitcase-flink-2.x-jdk11.yml
@@ -16,24 +16,20 @@
# limitations under the License.
################################################################################
-name: UTCase and ITCase Flink on JDK 11
+name: UTCase and ITCase Flink 2.x on JDK 11
on:
- issue_comment:
- types: [created, edited, deleted]
-
- # daily run
- schedule:
- - cron: "0 0 * * *"
+ push:
+ pull_request:
+ paths-ignore:
+ - 'docs/**'
+ - '**/*.md'
env:
JDK_VERSION: 11
jobs:
build:
- if: |
- github.event_name == 'schedule' ||
- (contains(github.event.comment.html_url, '/pull/') &&
contains(github.event.comment.body, '/jdk11'))
runs-on: ubuntu-latest
steps:
@@ -45,7 +41,7 @@ jobs:
java-version: ${{ env.JDK_VERSION }}
distribution: 'temurin'
- name: Build Flink
- run: mvn -T 1C -B clean install -DskipTests
+ run: mvn -T 1C -B clean install -DskipTests -Pflink2
- name: Test Flink
run: |
# run tests with random timezone to find out timezone related bugs
@@ -53,10 +49,10 @@ jobs:
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
test_modules=""
- for suffix in 1.15 1.16 1.17 1.18 1.19 1.20 common; do
+ for suffix in 2.0 common; do
test_modules+="org.apache.paimon:paimon-flink-${suffix},"
done
test_modules="${test_modules%,}"
- mvn -T 1C -B clean install -pl "${test_modules}"
-Duser.timezone=$jvm_timezone
+ mvn -T 1C -B clean install -Pflink2 -pl "${test_modules}"
-Duser.timezone=$jvm_timezone
env:
MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml
index a965e42afb..d7eba5eaa1 100644
--- a/paimon-e2e-tests/pom.xml
+++ b/paimon-e2e-tests/pom.xml
@@ -239,7 +239,7 @@ under the License.
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>${flink.sql.connector.hive}</artifactId>
- <version>${test.flink.version}</version>
+
<version>${test.flink.connector.hive.version}</version>
<destFileName>flink-sql-connector-hive.jar</destFileName>
<type>jar</type>
<overWrite>true</overWrite>
@@ -301,6 +301,7 @@ under the License.
<test.flink.version>1.19.1</test.flink.version>
<test.flink.connector.kafka.version>3.2.0-${test.flink.main.version}</test.flink.connector.kafka.version>
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.9_${scala.binary.version}</flink.sql.connector.hive>
+
<test.flink.connector.hive.version>${test.flink.version}</test.flink.connector.hive.version>
</properties>
</profile>
@@ -311,6 +312,7 @@ under the License.
<test.flink.version>1.18.1</test.flink.version>
<test.flink.connector.kafka.version>3.0.1-${test.flink.main.version}</test.flink.connector.kafka.version>
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.9_${scala.binary.version}</flink.sql.connector.hive>
+
<test.flink.connector.hive.version>${test.flink.version}</test.flink.connector.hive.version>
</properties>
</profile>
@@ -321,6 +323,7 @@ under the License.
<test.flink.version>1.17.2</test.flink.version>
<test.flink.connector.kafka.version>${test.flink.version}</test.flink.connector.kafka.version>
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.9_${scala.binary.version}</flink.sql.connector.hive>
+
<test.flink.connector.hive.version>${test.flink.version}</test.flink.connector.hive.version>
</properties>
</profile>
@@ -331,6 +334,7 @@ under the License.
<test.flink.version>1.16.3</test.flink.version>
<test.flink.connector.kafka.version>${test.flink.version}</test.flink.connector.kafka.version>
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.9_${scala.binary.version}</flink.sql.connector.hive>
+
<test.flink.connector.hive.version>${test.flink.version}</test.flink.connector.hive.version>
</properties>
</profile>
@@ -341,6 +345,7 @@ under the License.
<test.flink.version>1.15.3</test.flink.version>
<test.flink.connector.kafka.version>${test.flink.version}</test.flink.connector.kafka.version>
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.6_${scala.binary.version}</flink.sql.connector.hive>
+
<test.flink.connector.hive.version>${test.flink.version}</test.flink.connector.hive.version>
</properties>
</profile>
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsWithKafkaE2eTest.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsWithKafkaE2eTest.java
index bab41112c7..b4eb099fdc 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsWithKafkaE2eTest.java
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsWithKafkaE2eTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.tests;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
@@ -26,10 +27,17 @@ import org.testcontainers.containers.Container;
import java.util.UUID;
/** Tests for {@code FlinkActions}. */
+@EnabledIf("runTest")
public class FlinkActionsWithKafkaE2eTest extends FlinkActionsE2eTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkActionsWithKafkaE2eTest.class);
+ private static boolean runTest() {
+ // TODO: modify the following condition after paimon-flink-cdc
supports flink 2.0
+ String flinkVersion = System.getProperty("test.flink.main.version");
+ return flinkVersion.compareTo("2.0") < 0;
+ }
+
public FlinkActionsWithKafkaE2eTest() {
super(true, false);
}
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkProceduresE2eTest.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkProceduresE2eTest.java
index 23742dfe40..a50b149f2b 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkProceduresE2eTest.java
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkProceduresE2eTest.java
@@ -28,7 +28,9 @@ import java.util.UUID;
public class FlinkProceduresE2eTest extends E2eTestBase {
private static boolean runTest() {
- return System.getProperty("test.flink.main.version").compareTo("1.18")
>= 0;
+ // TODO: modify the following condition after paimon-flink-cdc
supports flink 2.0
+ String flinkVersion = System.getProperty("test.flink.main.version");
+ return flinkVersion.compareTo("1.18") >= 0 &&
flinkVersion.compareTo("2.0") < 0;
}
public FlinkProceduresE2eTest() {
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/LogStoreE2eTest.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/LogStoreE2eTest.java
index 294f4dd433..c40fb9e6f8 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/LogStoreE2eTest.java
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/LogStoreE2eTest.java
@@ -20,16 +20,24 @@ package org.apache.paimon.tests;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIf;
import java.util.UUID;
/** Tests for reading and writing log store in stream jobs. */
+@EnabledIf("runTest")
public class LogStoreE2eTest extends E2eTestBase {
private String topicName;
private int bucketNum;
+ private static boolean runTest() {
+ // TODO: modify the following condition after paimon-flink-cdc
supports flink 2.0
+ String flinkVersion = System.getProperty("test.flink.main.version");
+ return flinkVersion.compareTo("2.0") < 0;
+ }
+
public LogStoreE2eTest() {
super(true, false);
}
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
index 732f1eee24..e083f6d85f 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
@@ -20,10 +20,13 @@ package org.apache.paimon.tests.cdc;
import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
+import org.junit.jupiter.api.condition.EnabledIf;
+
/**
* E2e tests for {@link
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction} with MySQL
* 5.7.
*/
+@EnabledIf("runTest")
public class MySql57E2eTest extends MySqlCdcE2eTestBase {
public MySql57E2eTest() {
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql80E2eTest.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql80E2eTest.java
index c0ecd90d76..97624e4bb7 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql80E2eTest.java
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql80E2eTest.java
@@ -20,10 +20,13 @@ package org.apache.paimon.tests.cdc;
import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
+import org.junit.jupiter.api.condition.EnabledIf;
+
/**
* E2e tests for {@link
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction} with MySQL
* 8.0.
*/
+@EnabledIf("runTest")
public class MySql80E2eTest extends MySqlCdcE2eTestBase {
public MySql80E2eTest() {
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
index 8627befeeb..b77c81184e 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlCdcE2eTestBase.java
@@ -51,6 +51,12 @@ import java.util.stream.Stream;
*/
public abstract class MySqlCdcE2eTestBase extends E2eTestBase {
+ protected static boolean runTest() {
+ // TODO: modify the following condition after paimon-flink-cdc
supports flink 2.0
+ String flinkVersion = System.getProperty("test.flink.main.version");
+ return flinkVersion.compareTo("2.0") < 0;
+ }
+
private static final Logger LOG =
LoggerFactory.getLogger(MySqlCdcE2eTestBase.class);
private static final String USER = "paimonuser";
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
index 02246b8147..77549c0dbd 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlComputedColumnE2ETest.java
@@ -24,11 +24,13 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIf;
import java.sql.Connection;
import java.sql.Statement;
/** E2e test for MySql CDC with computed column. */
+@EnabledIf("runTest")
public class MySqlComputedColumnE2ETest extends MySqlCdcE2eTestBase {
protected MySqlComputedColumnE2ETest() {
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
index e638ac82c7..8a9b6d4686 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySqlTinyIntConvertE2ETest.java
@@ -24,11 +24,13 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIf;
import java.sql.Connection;
import java.sql.Statement;
/** E2e test for MySql CDC type convert tinyint(1) to tinyint. */
+@EnabledIf("runTest")
public class MySqlTinyIntConvertE2ETest extends MySqlCdcE2eTestBase {
protected MySqlTinyIntConvertE2ETest() {
diff --git a/paimon-flink/paimon-flink-2.0/pom.xml
b/paimon-flink/paimon-flink-2.0/pom.xml
new file mode 100644
index 0000000000..b49745b244
--- /dev/null
+++ b/paimon-flink/paimon-flink-2.0/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ </parent>
+
+ <packaging>jar</packaging>
+
+ <artifactId>paimon-flink-2.0</artifactId>
+ <name>Paimon : Flink : 2.0</name>
+
+ <properties>
+ <flink.version>2.0.0</flink.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-paimon</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes combine.children="append">
+
<include>org.apache.paimon:paimon-flink-common</include>
+ </includes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/paimon-flink/paimon-flink-common/pom.xml
b/paimon-flink/paimon-flink-common/pom.xml
index 50315cdb72..0f258a1c95 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -34,7 +34,7 @@ under the License.
<name>Paimon : Flink : Common</name>
<properties>
- <flink.version>1.20.1</flink.version>
+ <flink.version>${paimon-flink-common.flink.version}</flink.version>
</properties>
<dependencies>
@@ -81,6 +81,20 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>${paimon-flinkx-common}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>${paimon-flinkx-common}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<!-- lookup service -->
<dependency>
@@ -218,6 +232,7 @@ under the License.
<include>org.apache.paimon:paimon-service-client</include>
<include>org.apache.paimon:paimon-service-runtime</include>
<include>org.apache.paimon:paimon-shade-netty-4</include>
+
<include>org.apache.paimon:${paimon-flinkx-common}</include>
</includes>
</artifactSet>
</configuration>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
index a30c0a39ab..cb69cce258 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
@@ -134,6 +134,7 @@ public class FormatCatalogTable implements CatalogTable {
null,
context.getObjectIdentifier(),
context.getCatalogTable(),
+ new HashMap<>(),
context.getConfiguration(),
context.getClassLoader(),
context.isTemporary());
@@ -144,6 +145,7 @@ public class FormatCatalogTable implements CatalogTable {
null,
context.getObjectIdentifier(),
context.getCatalogTable(),
+ new HashMap<>(),
context.getConfiguration(),
context.getClassLoader(),
context.isTemporary());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
index 71298d4920..a2cfe7e41d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
@@ -23,7 +23,6 @@ import org.apache.paimon.factories.FactoryException;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.options.CatalogOptions;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,8 +78,7 @@ public interface ActionFactory extends Factory {
LOG.info("{} job args: {}", actionFactory.identifier(), String.join("
", actionArgs));
- MultipleParameterToolAdapter params =
- new
MultipleParameterToolAdapter(MultipleParameterTool.fromArgs(actionArgs));
+ MultipleParameterToolAdapter params = new
MultipleParameterToolAdapter(actionArgs);
if (params.has(HELP)) {
actionFactory.printHelp();
return Optional.empty();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java
index 87100ff0e3..fa56becbdc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java
@@ -54,7 +54,6 @@ import java.util.stream.StreamSupport;
import static org.apache.flink.configuration.ConfigurationUtils.canBePrefixMap;
import static
org.apache.flink.configuration.ConfigurationUtils.filterPrefixMapKey;
-import static
org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER;
/** Utility for working with {@link Factory}s. */
public final class FlinkFactoryUtil {
@@ -135,7 +134,6 @@ public final class FlinkFactoryUtil {
factoryClass.getName(),
foundFactories.stream()
.map(Factory::factoryIdentifier)
- .filter(identifier ->
!DEFAULT_IDENTIFIER.equals(identifier))
.distinct()
.sorted()
.collect(Collectors.joining("\n"))));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/QueryServiceProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/QueryServiceProcedure.java
index 1a1b34fe2b..53c6645c6e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/QueryServiceProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/QueryServiceProcedure.java
@@ -49,7 +49,7 @@ public class QueryServiceProcedure extends ProcedureBase {
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "parallelism", type =
@DataTypeHint("INT"))
})
- public String[] call(ProcedureContext procedureContext, String tableId,
int parallelism)
+ public String[] call(ProcedureContext procedureContext, String tableId,
Integer parallelism)
throws Exception {
Table table = catalog.getTable(Identifier.fromString(tableId));
StreamExecutionEnvironment env =
procedureContext.getExecutionEnvironment();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
index 00d527506c..dc37ba6996 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
@@ -23,6 +23,7 @@ import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.flink.api.connector.sink2.InitContext;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogSinkFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogSinkFunction.java
index 3586d2a030..95efeba276 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogSinkFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LogSinkFunction.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.table.sink.SinkRecord;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
/** Log {@link SinkFunction} with {@link WriteCallback}. */
public interface LogSinkFunction extends SinkFunction<SinkRecord> {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index bd849f9d3e..5a142d51f2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index 87cc9b7f04..0892def58c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -171,11 +171,12 @@ public class FlinkCatalogTest {
// TODO support change schema, modify it to createAnotherSchema
ResolvedSchema resolvedSchema = this.createSchema();
CatalogTable origin =
- CatalogTable.of(
-
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
- "test comment",
- Collections.emptyList(),
- options);
+ CatalogTable.newBuilder()
+
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+ .comment("test comment")
+ .partitionKeys(Collections.emptyList())
+ .options(options)
+ .build();
return new ResolvedCatalogTable(origin, resolvedSchema);
}
@@ -183,33 +184,36 @@ public class FlinkCatalogTest {
// TODO support change schema, modify it to createAnotherSchema
ResolvedSchema resolvedSchema = this.createSchema();
CatalogTable origin =
- CatalogTable.of(
-
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
- "test comment",
- this.createPartitionKeys(),
- options);
+ CatalogTable.newBuilder()
+
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+ .comment("test comment")
+ .partitionKeys(this.createPartitionKeys())
+ .options(options)
+ .build();
return new ResolvedCatalogTable(origin, resolvedSchema);
}
private CatalogTable createTable(Map<String, String> options) {
ResolvedSchema resolvedSchema = this.createSchema();
CatalogTable origin =
- CatalogTable.of(
-
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
- "test comment",
- Collections.emptyList(),
- options);
+ CatalogTable.newBuilder()
+
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+ .comment("test comment")
+ .partitionKeys(Collections.emptyList())
+ .options(options)
+ .build();
return new ResolvedCatalogTable(origin, resolvedSchema);
}
private CatalogTable createPartitionedTable(Map<String, String> options) {
ResolvedSchema resolvedSchema = this.createSchema();
CatalogTable origin =
- CatalogTable.of(
-
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
- "test comment",
- this.createPartitionKeys(),
- options);
+ CatalogTable.newBuilder()
+
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+ .comment("test comment")
+ .partitionKeys(this.createPartitionKeys())
+ .options(options)
+ .build();
return new ResolvedCatalogTable(origin, resolvedSchema);
}
@@ -694,7 +698,12 @@ public class FlinkCatalogTest {
CatalogTable catalogTable =
new ResolvedCatalogTable(
- CatalogTable.of(schema, "", Collections.emptyList(),
new HashMap<>()),
+ CatalogTable.newBuilder()
+ .schema(schema)
+ .comment("")
+ .partitionKeys(Collections.emptyList())
+ .options(new HashMap<>())
+ .build(),
resolvedSchema);
catalog.createDatabase(path1.getDatabaseName(), null, false);
@@ -740,7 +749,12 @@ public class FlinkCatalogTest {
Map<String, String> options = new HashMap<>();
CatalogTable catalogTable1 =
new ResolvedCatalogTable(
- CatalogTable.of(schema, "", Collections.emptyList(),
options),
+ CatalogTable.newBuilder()
+ .schema(schema)
+ .comment("")
+ .partitionKeys(Collections.emptyList())
+ .options(options)
+ .build(),
resolvedSchema);
catalog.createTable(path1, catalogTable1, false);
CatalogBaseTable storedTable1 = catalog.getTable(path1);
@@ -749,7 +763,12 @@ public class FlinkCatalogTest {
options.put(LOG_SYSTEM.key(), TESTING_LOG_STORE);
CatalogTable catalogTable2 =
new ResolvedCatalogTable(
- CatalogTable.of(schema, "", Collections.emptyList(),
options),
+ CatalogTable.newBuilder()
+ .schema(schema)
+ .comment("")
+ .partitionKeys(Collections.emptyList())
+ .options(options)
+ .build(),
resolvedSchema);
catalog.createTable(path3, catalogTable2, false);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java
index b7da6cd958..7a71dbabfc 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java
@@ -132,11 +132,12 @@ public abstract class FlinkTestBase extends
AbstractTestBase {
ResolvedSchema resolvedSchema =
new ResolvedSchema(resolvedColumns, Collections.emptyList(),
constraint);
CatalogTable origin =
- CatalogTable.of(
-
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
- "a comment",
- partitionKeys,
- options);
+ CatalogTable.newBuilder()
+
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+ .comment("a comment")
+ .partitionKeys(partitionKeys)
+ .options(options)
+ .build();
return new ResolvedCatalogTable(origin, resolvedSchema);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PredicateConverterTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PredicateConverterTest.java
index 6d90a455bd..f27108d866 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PredicateConverterTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PredicateConverterTest.java
@@ -796,6 +796,6 @@ public class PredicateConverterTest {
}
private static CallExpression call(FunctionDefinition function,
ResolvedExpression... args) {
- return new CallExpression(function, Arrays.asList(args),
DataTypes.BOOLEAN());
+ return new CallExpression(false, null, function, Arrays.asList(args),
DataTypes.BOOLEAN());
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index a8e8332156..eafa3b6916 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -1008,7 +1008,7 @@ public class SchemaChangeITCase extends CatalogITCaseBase
{
+ " `ts` TIMESTAMP(3),\n"
+ " `ee` VARCHAR(2147483647) METADATA,\n"
+ " WATERMARK FOR `ts` AS `ts`\n"
- + ") ")
+ + ")")
.doesNotContain("schema");
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java
index 5273e0c7db..326ed9b6a2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java
@@ -35,6 +35,7 @@ import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -255,7 +256,7 @@ public class
UnawareBucketNewFilesCompactionCoordinatorOperatorTest {
new CommittableTypeInfo().createSerializer(new
ExecutionConfig()),
new TupleTypeInfo<>(
BasicTypeInfo.LONG_TYPE_INFO, new
CompactionTaskTypeInfo())
- .createSerializer(new ExecutionConfig()));
+ .createSerializer(new SerializerConfigImpl()));
OneInputStreamOperatorTestHarness harness =
new OneInputStreamOperatorTestHarness(operator, 1, 1, 0);
harness.getStreamConfig().setupNetworkInputs(Preconditions.checkNotNull(serializer));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java
index fc45eceb3f..08fe8cb238 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java
@@ -27,6 +27,7 @@ import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
@@ -217,6 +218,9 @@ class LocalMergeOperatorTest {
@Override
public void emitRecordAttributes(RecordAttributes recordAttributes) {}
+ // @Override is skipped for compatibility with Flink 1.x.
+ public void emitWatermark(WatermarkEvent watermarkEvent) {}
+
@Override
public void collect(StreamRecord<InternalRow> record) {
consumer.accept(record.getValue());
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
index a220d566ad..9d71f94a36 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
@@ -148,6 +148,27 @@ class PartitionMarkDoneTest extends TableTestBase {
throw new UnsupportedOperationException();
}
+ // @Override is skipped for compatibility with Flink 1.x.
+ public <K, V> BroadcastState<K, V> getBroadcastState(
+ org.apache.flink.api.common.state.v2.MapStateDescriptor<K, V>
mapStateDescriptor)
+ throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ // @Override is skipped for compatibility with Flink 1.x.
+ public <S> org.apache.flink.api.common.state.v2.ListState<S>
getListState(
+ org.apache.flink.api.common.state.v2.ListStateDescriptor<S>
listStateDescriptor)
+ throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ // @Override is skipped for compatibility with Flink 1.x.
+ public <S> org.apache.flink.api.common.state.v2.ListState<S>
getUnionListState(
+ org.apache.flink.api.common.state.v2.ListStateDescriptor<S>
listStateDescriptor)
+ throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public Set<String> getRegisteredStateNames() {
throw new UnsupportedOperationException();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
index 6c4c7860e5..a8c1f9c9e5 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
@@ -36,6 +36,7 @@ import org.apache.paimon.types.DataTypes;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -299,6 +300,9 @@ public class OperatorSourceTest {
@Override
public void emitRecordAttributes(RecordAttributes
recordAttributes) {}
+
+ // @Override is skipped for compatibility with Flink 1.x.
+ public void emitWatermark(WatermarkEvent watermarkEvent) {}
};
AtomicBoolean isRunning = new AtomicBoolean(true);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java
index 77b44d5b0e..a77d0fc870 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/TestingSourceOperator.java
@@ -61,7 +61,7 @@ import java.util.Collections;
*
href="https://github.com/apache/flink/pull/12306/files#diff-bb7687690ffa79fd86950aa23171431fcf707246ca4620d79361a6612ba7b828">Flink
* PR that introduced this class</a>
*/
-public class TestingSourceOperator<T> extends SourceOperator<T,
SimpleSourceSplit> {
+public class TestingSourceOperator<T> extends AbstractTestingSourceOperator<T,
SimpleSourceSplit> {
private static final long serialVersionUID = 1L;
diff --git a/paimon-flink/paimon-flink1-common/pom.xml
b/paimon-flink/paimon-flink1-common/pom.xml
new file mode 100644
index 0000000000..6bd3e3c4e8
--- /dev/null
+++ b/paimon-flink/paimon-flink1-common/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ </parent>
+
+ <packaging>jar</packaging>
+
+ <artifactId>paimon-flink1-common</artifactId>
+ <name>Paimon : Flink 1.x : Common</name>
+
+ <properties>
+ <flink.version>${paimon-flink-common.flink.version}</flink.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/functions/sink/legacy/SinkFunction.java
similarity index 70%
copy from
paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
copy to
paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/functions/sink/legacy/SinkFunction.java
index 732f1eee24..cd8fff09de 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/streaming/api/functions/sink/legacy/SinkFunction.java
@@ -16,17 +16,11 @@
* limitations under the License.
*/
-package org.apache.paimon.tests.cdc;
-
-import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
+package org.apache.flink.streaming.api.functions.sink.legacy;
/**
- * E2e tests for {@link
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction} with MySQL
- * 5.7.
+ * The {@link org.apache.flink.streaming.api.functions.sink.SinkFunction}
migrated from Flink 1.20
+ * to resolve compatibility issues with Flink 2.x.
*/
-public class MySql57E2eTest extends MySqlCdcE2eTestBase {
-
- public MySql57E2eTest() {
- super(MySqlVersion.V5_7);
- }
-}
+public interface SinkFunction<T>
+ extends org.apache.flink.streaming.api.functions.sink.SinkFunction<T>
{}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
similarity index 94%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
copy to
paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
index 7755cf7b40..aee2bad58b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
@@ -27,8 +27,8 @@ public class MultipleParameterToolAdapter {
private final MultipleParameterTool params;
- public MultipleParameterToolAdapter(MultipleParameterTool params) {
- this.params = params;
+ public MultipleParameterToolAdapter(String[] args) {
+ this.params = MultipleParameterTool.fromArgs(args);
}
public boolean has(String key) {
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java
similarity index 70%
copy from
paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
copy to
paimon-flink/paimon-flink1-common/src/test/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java
index 732f1eee24..569aef0bab 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
+++
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/flink/api/common/state/v2/ListStateDescriptor.java
@@ -16,17 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.tests.cdc;
+package org.apache.flink.api.common.state.v2;
-import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
-
-/**
- * E2e tests for {@link
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction} with MySQL
- * 5.7.
- */
-public class MySql57E2eTest extends MySqlCdcE2eTestBase {
-
- public MySql57E2eTest() {
- super(MySqlVersion.V5_7);
- }
-}
+/** Helper class to resolve compatibility issue with Flink 2.x. */
+public class ListStateDescriptor<S> {}
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java
similarity index 70%
copy from
paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
copy to
paimon-flink/paimon-flink1-common/src/test/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java
index 732f1eee24..fde66eb91b 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
+++
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/flink/api/common/state/v2/MapStateDescriptor.java
@@ -16,17 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.tests.cdc;
+package org.apache.flink.api.common.state.v2;
-import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
-
-/**
- * E2e tests for {@link
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction} with MySQL
- * 5.7.
- */
-public class MySql57E2eTest extends MySqlCdcE2eTestBase {
-
- public MySql57E2eTest() {
- super(MySqlVersion.V5_7);
- }
-}
+/** Helper class to resolve compatibility issue with Flink 2.x. */
+public class MapStateDescriptor<K, V> {}
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/flink/runtime/event/WatermarkEvent.java
similarity index 70%
copy from
paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
copy to
paimon-flink/paimon-flink1-common/src/test/java/org/apache/flink/runtime/event/WatermarkEvent.java
index 732f1eee24..da773e9890 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/cdc/MySql57E2eTest.java
+++
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/flink/runtime/event/WatermarkEvent.java
@@ -16,17 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.tests.cdc;
+package org.apache.flink.runtime.event;
-import org.apache.paimon.flink.action.cdc.mysql.MySqlVersion;
-
-/**
- * E2e tests for {@link
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableAction} with MySQL
- * 5.7.
- */
-public class MySql57E2eTest extends MySqlCdcE2eTestBase {
-
- public MySql57E2eTest() {
- super(MySqlVersion.V5_7);
- }
-}
+/** Helper class to resolve compatibility issue with Flink 2.x. */
+public class WatermarkEvent {}
diff --git
a/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
new file mode 100644
index 0000000000..d293d30057
--- /dev/null
+++
b/paimon-flink/paimon-flink1-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.operator;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.operators.SourceOperator;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.function.FunctionWithException;
+
+/** Helper class to resolve the compatibility of {@link SourceOperator}'s
constructor. */
+public abstract class AbstractTestingSourceOperator<T, S extends SourceSplit>
+ extends SourceOperator<T, S> {
+
+ public AbstractTestingSourceOperator(
+ FunctionWithException<SourceReaderContext, SourceReader<T, S>,
Exception> readerFactory,
+ OperatorEventGateway operatorEventGateway,
+ SimpleVersionedSerializer<S> splitSerializer,
+ WatermarkStrategy<T> watermarkStrategy,
+ ProcessingTimeService timeService,
+ Configuration configuration,
+ String localHostname,
+ boolean emitProgressiveWatermarks,
+ StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
+ super(
+ readerFactory,
+ operatorEventGateway,
+ splitSerializer,
+ watermarkStrategy,
+ timeService,
+ configuration,
+ localHostname,
+ emitProgressiveWatermarks,
+ canEmitBatchOfRecords);
+ }
+}
diff --git a/paimon-flink/paimon-flink2-common/pom.xml
b/paimon-flink/paimon-flink2-common/pom.xml
new file mode 100644
index 0000000000..7dc9043c44
--- /dev/null
+++ b/paimon-flink/paimon-flink2-common/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink</artifactId>
+ <version>1.1-SNAPSHOT</version>
+ </parent>
+
+ <packaging>jar</packaging>
+
+ <artifactId>paimon-flink2-common</artifactId>
+ <name>Paimon : Flink 2.x : Common</name>
+
+ <properties>
+ <flink.version>${paimon-flink-common.flink.version}</flink.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
similarity index 92%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
rename to
paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
index 7755cf7b40..4fb80d63b5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
+++
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/action/MultipleParameterToolAdapter.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.action;
-import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.util.MultipleParameterTool;
import java.util.Collection;
@@ -27,8 +27,8 @@ public class MultipleParameterToolAdapter {
private final MultipleParameterTool params;
- public MultipleParameterToolAdapter(MultipleParameterTool params) {
- this.params = params;
+ public MultipleParameterToolAdapter(String[] args) {
+ this.params = MultipleParameterTool.fromArgs(args);
}
public boolean has(String key) {
diff --git
a/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
b/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
new file mode 100644
index 0000000000..a2f402a874
--- /dev/null
+++
b/paimon-flink/paimon-flink2-common/src/test/java/org/apache/paimon/flink/source/operator/AbstractTestingSourceOperator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.operator;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.operators.SourceOperator;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.util.HashMap;
+
+/** Helper class to resolve the compatibility of {@link SourceOperator}'s
constructor. */
+public abstract class AbstractTestingSourceOperator<T, S extends SourceSplit>
+ extends SourceOperator<T, S> {
+
+ public AbstractTestingSourceOperator(
+ FunctionWithException<SourceReaderContext, SourceReader<T, S>,
Exception> readerFactory,
+ OperatorEventGateway operatorEventGateway,
+ SimpleVersionedSerializer<S> splitSerializer,
+ WatermarkStrategy<T> watermarkStrategy,
+ ProcessingTimeService timeService,
+ Configuration configuration,
+ String localHostname,
+ boolean emitProgressiveWatermarks,
+ StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
+
+ super(
+ null,
+ readerFactory,
+ operatorEventGateway,
+ splitSerializer,
+ watermarkStrategy,
+ timeService,
+ configuration,
+ localHostname,
+ emitProgressiveWatermarks,
+ canEmitBatchOfRecords,
+ new HashMap<>());
+ }
+}
diff --git a/paimon-flink/pom.xml b/paimon-flink/pom.xml
index f19818bca3..d7764630fd 100644
--- a/paimon-flink/pom.xml
+++ b/paimon-flink/pom.xml
@@ -34,15 +34,9 @@ under the License.
<packaging>pom</packaging>
<modules>
+ <module>${paimon-flinkx-common}</module>
<module>paimon-flink-common</module>
- <module>paimon-flink-1.15</module>
- <module>paimon-flink-1.16</module>
- <module>paimon-flink-1.17</module>
- <module>paimon-flink-1.18</module>
- <module>paimon-flink-1.19</module>
- <module>paimon-flink-1.20</module>
<module>paimon-flink-action</module>
- <module>paimon-flink-cdc</module>
</modules>
<dependencies>
@@ -166,16 +160,6 @@ under the License.
<profiles>
<profile>
<id>skip-paimon-flink-tests</id>
- <modules>
- <module>paimon-flink-common</module>
- <module>paimon-flink-1.15</module>
- <module>paimon-flink-1.16</module>
- <module>paimon-flink-1.17</module>
- <module>paimon-flink-1.18</module>
- <module>paimon-flink-1.19</module>
- <module>paimon-flink-1.20</module>
- <module>paimon-flink-cdc</module>
- </modules>
<build>
<plugins>
<plugin>
diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml
b/paimon-hive/paimon-hive-connector-common/pom.xml
index 43fd4d538c..2edd5d459e 100644
--- a/paimon-hive/paimon-hive-connector-common/pom.xml
+++ b/paimon-hive/paimon-hive-connector-common/pom.xml
@@ -119,7 +119,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${flink.scala.binary.version}</artifactId>
- <version>${test.flink.version}</version>
+ <version>${test.flink.connector.hive.version}</version>
<scope>test</scope>
</dependency>
diff --git a/pom.xml b/pom.xml
index 734ef6f4cd..f54ea0450a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,6 +82,9 @@ under the License.
<antlr4.version>4.9.3</antlr4.version>
<hadoop.version>2.8.5</hadoop.version>
<scala.binary.version>2.12</scala.binary.version>
+ <!-- TODO: Update default Flink to 2.x after Paimon updates default
Java to 11 -->
+ <paimon-flinkx-common>paimon-flink1-common</paimon-flinkx-common>
+
<paimon-flink-common.flink.version>1.20.1</paimon-flink-common.flink.version>
<flink.scala.binary.version>2.12</flink.scala.binary.version>
<scala212.version>2.12.15</scala212.version>
<scala213.version>2.13.14</scala213.version>
@@ -114,6 +117,7 @@ under the License.
<test.flink.main.version>1.20</test.flink.main.version>
<test.flink.version>1.20.1</test.flink.version>
<test.flink.connector.kafka.version>3.4.0-1.20</test.flink.connector.kafka.version>
+
<test.flink.connector.hive.version>1.20.0</test.flink.connector.hive.version>
<test.mysql.connector.java.version>8.0.27</test.mysql.connector.java.version>
<!-- spark profile properties-->
@@ -405,6 +409,49 @@ under the License.
</property>
</activation>
</profile>
+
+ <profile>
+ <id>flink1</id>
+ <properties>
+
<paimon-flinkx-common>paimon-flink1-common</paimon-flinkx-common>
+
<paimon-flink-common.flink.version>1.20.1</paimon-flink-common.flink.version>
+ <test.flink.main.version>1.20</test.flink.main.version>
+ <test.flink.version>1.20.1</test.flink.version>
+ </properties>
+ <modules>
+ <module>paimon-flink/paimon-flink-1.15</module>
+ <module>paimon-flink/paimon-flink-1.16</module>
+ <module>paimon-flink/paimon-flink-1.17</module>
+ <module>paimon-flink/paimon-flink-1.18</module>
+ <module>paimon-flink/paimon-flink-1.19</module>
+ <module>paimon-flink/paimon-flink-1.20</module>
+ <module>paimon-flink/paimon-flink-cdc</module>
+ </modules>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ <property>
+ <name>flink1</name>
+ </property>
+ </activation>
+ </profile>
+
+ <profile>
+ <id>flink2</id>
+ <properties>
+
<paimon-flinkx-common>paimon-flink2-common</paimon-flinkx-common>
+
<paimon-flink-common.flink.version>2.0.0</paimon-flink-common.flink.version>
+ <test.flink.main.version>2.0</test.flink.main.version>
+ <test.flink.version>2.0.0</test.flink.version>
+ </properties>
+ <modules>
+ <module>paimon-flink/paimon-flink-2.0</module>
+ </modules>
+ <activation>
+ <property>
+ <name>flink2</name>
+ </property>
+ </activation>
+ </profile>
</profiles>
<build>