This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new f94f8e29a [flink] Support Flink 2.1 (#1176)
f94f8e29a is described below
commit f94f8e29ab42e52854ca4d77643b376f3a77964f
Author: Hongshun Wang <[email protected]>
AuthorDate: Sun Aug 24 17:37:17 2025 +0800
[flink] Support Flink 2.1 (#1176)
1. support flink 2.1 in module fluss-flink-2.1
2. skip compilation and test for flink 2.1 module when Java8 is activated
3. split CI matrix into core, flink and lake
---
.github/workflows/ci-template.yaml | 2 +-
.github/workflows/stage.sh | 14 ++++-
fluss-flink/fluss-flink-1.18/pom.xml | 2 +-
.../fluss/flink/adapter/CatalogTableAdapter.java} | 25 ++++++++-
.../adapter/SingleThreadFetcherManagerAdapter.java | 43 ++++++++++++++
...ngleThreadMultiplexSourceReaderBaseAdapter.java | 44 +++++++++++++++
.../CatalogTableAdapter.java} | 25 ++++++++-
.../{fluss-flink-1.18 => fluss-flink-2.1}/pom.xml | 65 +++++++++++-----------
.../org/apache/flink/api/connector/sink2/Sink.java | 59 ++++++++++++++++++++
.../org.apache.flink.table.factories.Factory | 17 ++++++
.../fluss/flink/catalog/Flink21CatalogITCase.java} | 6 +-
.../fluss/flink/metrics/Flink21MetricsITCase.java | 20 +++++++
.../flink/procedure/Flink21ProcedureITCase.java | 20 +++++++
.../security/acl/Flink21AuthorizationITCase.java | 20 +++++++
.../fluss/flink/sink/Flink21TableSinkITCase.java | 20 +++++++
.../source/Flink21TableSourceBatchITCase.java | 20 +++++++
.../source/Flink21TableSourceFailOverITCase.java | 20 +++++++
.../flink/source/Flink21TableSourceITCase.java | 20 +++++++
.../fluss/flink/adapter/CatalogTableAdapter.java} | 30 +++++++++-
.../adapter/SingleThreadFetcherManagerAdapter.java | 43 ++++++++++++++
...ngleThreadMultiplexSourceReaderBaseAdapter.java | 44 +++++++++++++++
.../alibaba/fluss/flink/catalog/FlinkCatalog.java | 27 ++++-----
.../flink/source/reader/FlinkSourceReader.java | 4 +-
.../reader/fetcher/FlinkSourceFetcherManager.java | 11 +++-
.../fluss/flink/utils/FlinkConversions.java | 4 +-
.../fluss/flink/catalog/FlinkCatalogTest.java | 20 ++++---
fluss-flink/pom.xml | 1 +
fluss-test-coverage/pom.xml | 53 +++++++++++++++++-
pom.xml | 3 +
29 files changed, 605 insertions(+), 77 deletions(-)
diff --git a/.github/workflows/ci-template.yaml
b/.github/workflows/ci-template.yaml
index 26c81ab59..c5f77af3a 100644
--- a/.github/workflows/ci-template.yaml
+++ b/.github/workflows/ci-template.yaml
@@ -36,7 +36,7 @@ jobs:
strategy:
fail-fast: false
matrix:
- module: [ core, flink ]
+ module: [ core, flink, lake ]
name: "${{ matrix.module }}"
steps:
- name: Checkout code
diff --git a/.github/workflows/stage.sh b/.github/workflows/stage.sh
index e96d34ddf..49a3ed57e 100755
--- a/.github/workflows/stage.sh
+++ b/.github/workflows/stage.sh
@@ -19,11 +19,17 @@
STAGE_CORE="core"
STAGE_FLINK="flink"
+STAGE_LAKE="lake"
MODULES_FLINK="\
fluss-flink,\
fluss-flink/fluss-flink-common,\
+fluss-flink/fluss-flink-2.1,\
fluss-flink/fluss-flink-1.20,\
+"
+
+# we move Flink legacy version tests to "lake" module for balancing testing
time
+MODULES_LAKE="\
fluss-flink/fluss-flink-1.19,\
fluss-flink/fluss-flink-1.18,\
fluss-lake,\
@@ -36,7 +42,10 @@ function get_test_modules_for_stage() {
local stage=$1
local modules_flink=$MODULES_FLINK
- local modules_core=\!${MODULES_FLINK//,/,\!}
+ local modules_lake=$MODULES_LAKE
+ local negated_flink=\!${MODULES_FLINK//,/,\!}
+ local negated_lake=\!${MODULES_LAKE//,/,\!}
+ local modules_core="$negated_flink,$negated_lake"
case ${stage} in
(${STAGE_CORE})
@@ -45,6 +54,9 @@ function get_test_modules_for_stage() {
(${STAGE_FLINK})
echo "-pl fluss-test-coverage,$modules_flink"
;;
+ (${STAGE_LAKE})
+ echo "-pl fluss-test-coverage,$modules_lake"
+ ;;
esac
}
diff --git a/fluss-flink/fluss-flink-1.18/pom.xml
b/fluss-flink/fluss-flink-1.18/pom.xml
index 93b9f65f2..8e71d0d12 100644
--- a/fluss-flink/fluss-flink-1.18/pom.xml
+++ b/fluss-flink/fluss-flink-1.18/pom.xml
@@ -105,7 +105,7 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.minor.version}</version>
- <scope>test</scope>
+ <scope>provided</scope>
</dependency>
diff --git
a/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
b/fluss-flink/fluss-flink-1.18/src/main/java/com/alibaba/fluss/flink/adapter/CatalogTableAdapter.java
similarity index 54%
copy from
fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
copy to
fluss-flink/fluss-flink-1.18/src/main/java/com/alibaba/fluss/flink/adapter/CatalogTableAdapter.java
index 566c42ff3..a047202f1 100644
---
a/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
+++
b/fluss-flink/fluss-flink-1.18/src/main/java/com/alibaba/fluss/flink/adapter/CatalogTableAdapter.java
@@ -15,7 +15,26 @@
* limitations under the License.
*/
-package com.alibaba.fluss.flink;
+package com.alibaba.fluss.flink.adapter;
-/** This is an empty package to generate a javadoc jar to make Sonatype OSS
happy. */
-public class DummyClass119 {}
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogTable;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A adapter for {@link CatalogTable} constructor. TODO: remove this class
when no longer support
+ * flink 1.18 and 1.19.
+ */
+public class CatalogTableAdapter {
+ public static CatalogTable toCatalogTable(
+ Schema schema,
+ @Nullable String comment,
+ List<String> partitionKeys,
+ Map<String, String> options) {
+ return CatalogTable.of(schema, comment, partitionKeys, options);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-1.18/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadFetcherManagerAdapter.java
b/fluss-flink/fluss-flink-1.18/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadFetcherManagerAdapter.java
new file mode 100644
index 000000000..31989c16b
--- /dev/null
+++
b/fluss-flink/fluss-flink-1.18/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadFetcherManagerAdapter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.adapter;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+
+import java.util.Collection;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * Adapter for {@link SingleThreadFetcherManager}.TODO: remove it until not
supported in flink 1.18.
+ */
+public class SingleThreadFetcherManagerAdapter<E, SplitT extends SourceSplit>
+ extends SingleThreadFetcherManager<E, SplitT> {
+ public SingleThreadFetcherManagerAdapter(
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
+ Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
+ Configuration configuration,
+ Consumer<Collection<String>> splitFinishedHook) {
+ super(elementsQueue, splitReaderSupplier, configuration,
splitFinishedHook);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-1.18/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
b/fluss-flink/fluss-flink-1.18/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
new file mode 100644
index 000000000..9dd864145
--- /dev/null
+++
b/fluss-flink/fluss-flink-1.18/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.adapter;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+
+/**
+ * Adapter for {@link SingleThreadMultiplexSourceReaderBase}.TODO: remove it
until not supported in
+ * flink 1.18.
+ */
+public abstract class SingleThreadMultiplexSourceReaderBaseAdapter<
+ E, T, SplitT extends SourceSplit, SplitStateT>
+ extends SingleThreadMultiplexSourceReaderBase<E, T, SplitT,
SplitStateT> {
+ public SingleThreadMultiplexSourceReaderBaseAdapter(
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
+ SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
+ RecordEmitter<E, T, SplitStateT> recordEmitter,
+ Configuration config,
+ SourceReaderContext context) {
+ super(elementsQueue, splitFetcherManager, recordEmitter, config,
context);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
b/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/adapter/CatalogTableAdapter.java
similarity index 54%
copy from
fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
copy to
fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/adapter/CatalogTableAdapter.java
index 566c42ff3..a047202f1 100644
---
a/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
+++
b/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/adapter/CatalogTableAdapter.java
@@ -15,7 +15,26 @@
* limitations under the License.
*/
-package com.alibaba.fluss.flink;
+package com.alibaba.fluss.flink.adapter;
-/** This is an empty package to generate a javadoc jar to make Sonatype OSS
happy. */
-public class DummyClass119 {}
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogTable;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A adapter for {@link CatalogTable} constructor. TODO: remove this class
when no longer support
+ * flink 1.18 and 1.19.
+ */
+public class CatalogTableAdapter {
+ public static CatalogTable toCatalogTable(
+ Schema schema,
+ @Nullable String comment,
+ List<String> partitionKeys,
+ Map<String, String> options) {
+ return CatalogTable.of(schema, comment, partitionKeys, options);
+ }
+}
diff --git a/fluss-flink/fluss-flink-1.18/pom.xml
b/fluss-flink/fluss-flink-2.1/pom.xml
similarity index 83%
copy from fluss-flink/fluss-flink-1.18/pom.xml
copy to fluss-flink/fluss-flink-2.1/pom.xml
index 93b9f65f2..79493a587 100644
--- a/fluss-flink/fluss-flink-1.18/pom.xml
+++ b/fluss-flink/fluss-flink-2.1/pom.xml
@@ -1,22 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
+ ~ Copyright (c) 2025 Alibaba Group Holding Ltd.
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
@@ -27,13 +24,11 @@
<version>0.8-SNAPSHOT</version>
</parent>
- <artifactId>fluss-flink-1.18</artifactId>
-
- <name>Fluss : Engine Flink : 1.18</name>
-
+ <artifactId>fluss-flink-2.1</artifactId>
+ <name>Fluss : Engine Flink : 2.1 </name>
<properties>
- <flink.major.version>1.18</flink.major.version>
- <flink.minor.version>1.18.1</flink.minor.version>
+ <flink.major.version>2.1</flink.major.version>
+ <flink.minor.version>2.1.0</flink.minor.version>
</properties>
<dependencies>
@@ -71,13 +66,6 @@
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
- <version>${flink.minor.version}</version>
- <scope>provided</scope>
- </dependency>
-
<!-- test dependency -->
<dependency>
<groupId>com.alibaba.fluss</groupId>
@@ -108,7 +96,6 @@
<scope>test</scope>
</dependency>
-
<dependency>
<groupId>com.alibaba.fluss</groupId>
<artifactId>fluss-server</artifactId>
@@ -165,6 +152,17 @@
<build>
<plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <!-- compilation of main sources -->
+ <skipMain>${skip.on.java8}</skipMain>
+ <!-- compilation of test sources -->
+ <skip>${skip.on.java8}</skip>
+ </configuration>
+ </plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
@@ -179,6 +177,7 @@
<goal>test</goal>
</goals>
<configuration>
+ <skip>${skip.on.java8}</skip>
<includes>
<include>**/*ITCase.*</include>
</includes>
@@ -195,6 +194,7 @@
<goal>test</goal>
</goals>
<configuration>
+ <skip>${skip.on.java8}</skip>
<excludes>
<exclude>**/*ITCase.*</exclude>
</excludes>
@@ -202,6 +202,7 @@
</execution>
</executions>
</plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
@@ -226,4 +227,4 @@
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
diff --git
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
new file mode 100644
index 000000000..5dfc628ac
--- /dev/null
+++
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Placeholder class to resolve compatibility issues. This placeholder class
can be removed once we
+ * drop the support of Flink v1.18 which requires the legacy InitContext
interface.
+ */
+public interface Sink<InputT> extends Serializable {
+
+ /**
+ * Creates a {@link SinkWriter}.
+ *
+ * @param context the runtime context.
+ * @return A sink writer.
+ * @throws IOException for any failure during creation.
+ */
+ SinkWriter<InputT> createWriter(WriterInitContext context) throws
IOException;
+
+ /** The interface exposes some runtime info for creating a {@link
SinkWriter}. */
+ interface InitContext {
+
+ /**
+ * Returns the mailbox executor that allows to execute {@link
Runnable}s inside the task
+ * thread in between record processing.
+ *
+ * <p>Note that this method should not be used per-record for
performance reasons in the
+ * same way as records should not be sent to the external system
individually. Rather,
+ * implementers are expected to batch records and only enqueue a
single {@link Runnable} per
+ * batch to handle the result.
+ */
+ MailboxExecutor getMailboxExecutor();
+
+ /** @return The metric group this writer belongs to. */
+ SinkWriterMetricGroup metricGroup();
+ }
+}
diff --git
a/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..fb9a3d86b
--- /dev/null
+++
b/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,17 @@
+#
+# Copyright (c) 2025 Alibaba Group Holding Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+com.alibaba.fluss.flink.catalog.FlinkCatalogFactory
\ No newline at end of file
diff --git
a/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/catalog/Flink21CatalogITCase.java
similarity index 84%
copy from
fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
copy to
fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/catalog/Flink21CatalogITCase.java
index 566c42ff3..78d4c0a4b 100644
---
a/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
+++
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/catalog/Flink21CatalogITCase.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package com.alibaba.fluss.flink;
+package com.alibaba.fluss.flink.catalog;
-/** This is an empty package to generate a javadoc jar to make Sonatype OSS
happy. */
-public class DummyClass119 {}
+/** IT case for catalog in Flink 2.1. */
+public class Flink21CatalogITCase extends FlinkCatalogITCase {}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/metrics/Flink21MetricsITCase.java
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/metrics/Flink21MetricsITCase.java
new file mode 100644
index 000000000..72b690bbe
--- /dev/null
+++
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/metrics/Flink21MetricsITCase.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.metrics;
+
+/** IT case for metrics in Flink 2.1. */
+public class Flink21MetricsITCase extends FlinkMetricsITCase {}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/procedure/Flink21ProcedureITCase.java
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/procedure/Flink21ProcedureITCase.java
new file mode 100644
index 000000000..9183b24cc
--- /dev/null
+++
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/procedure/Flink21ProcedureITCase.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.procedure;
+
+/** IT case for procedure in Flink 2.1. */
+public class Flink21ProcedureITCase extends FlinkProcedureITCase {}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/security/acl/Flink21AuthorizationITCase.java
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/security/acl/Flink21AuthorizationITCase.java
new file mode 100644
index 000000000..aa75e4851
--- /dev/null
+++
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/security/acl/Flink21AuthorizationITCase.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.security.acl;
+
+/** IT case for authorization in Flink 2.1. */
+public class Flink21AuthorizationITCase extends FlinkAuthorizationITCase {}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/sink/Flink21TableSinkITCase.java
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/sink/Flink21TableSinkITCase.java
new file mode 100644
index 000000000..ba29e5609
--- /dev/null
+++
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/sink/Flink21TableSinkITCase.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.sink;
+
+/** IT case for {@link FlinkTableSink} in Flink 2.1. */
+public class Flink21TableSinkITCase extends FlinkTableSinkITCase {}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/source/Flink21TableSourceBatchITCase.java
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/source/Flink21TableSourceBatchITCase.java
new file mode 100644
index 000000000..f96de12e0
--- /dev/null
+++
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/source/Flink21TableSourceBatchITCase.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.source;
+
+/** IT case for batch source in Flink 2.1. */
+public class Flink21TableSourceBatchITCase extends FlinkTableSourceBatchITCase
{}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/source/Flink21TableSourceFailOverITCase.java
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/source/Flink21TableSourceFailOverITCase.java
new file mode 100644
index 000000000..150c142c6
--- /dev/null
+++
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/source/Flink21TableSourceFailOverITCase.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.source;
+
+/** IT case for source failover and recovery in Flink 2.1. */
+public class Flink21TableSourceFailOverITCase extends
FlinkTableSourceFailOverITCase {}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/source/Flink21TableSourceITCase.java
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/source/Flink21TableSourceITCase.java
new file mode 100644
index 000000000..22078bb15
--- /dev/null
+++
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/source/Flink21TableSourceITCase.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.source;
+
+/** IT case for {@link FlinkTableSource} in Flink 2.1. */
+public class Flink21TableSourceITCase extends FlinkTableSourceITCase {}
diff --git
a/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/adapter/CatalogTableAdapter.java
similarity index 50%
rename from
fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
rename to
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/adapter/CatalogTableAdapter.java
index 566c42ff3..7b3abe86d 100644
---
a/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/adapter/CatalogTableAdapter.java
@@ -15,7 +15,31 @@
* limitations under the License.
*/
-package com.alibaba.fluss.flink;
+package com.alibaba.fluss.flink.adapter;
-/** This is an empty package to generate a javadoc jar to make Sonatype OSS
happy. */
-public class DummyClass119 {}
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogTable;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A adapter for {@link CatalogTable} constructor. TODO: remove this class
when no longer support
+ * flink 1.18 and 1.19.
+ */
+public class CatalogTableAdapter {
+ public static CatalogTable toCatalogTable(
+ Schema schema,
+ @Nullable String comment,
+ List<String> partitionKeys,
+ Map<String, String> options) {
+ return CatalogTable.newBuilder()
+ .schema(schema)
+ .comment(comment)
+ .partitionKeys(partitionKeys)
+ .options(options)
+ .build();
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadFetcherManagerAdapter.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadFetcherManagerAdapter.java
new file mode 100644
index 000000000..bc97e2f22
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadFetcherManagerAdapter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.adapter;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+
+import java.util.Collection;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * Adapter for {@link SingleThreadFetcherManager}.TODO: remove it until not
supported in flink 1.18.
+ */
+public class SingleThreadFetcherManagerAdapter<E, SplitT extends SourceSplit>
+ extends SingleThreadFetcherManager<E, SplitT> {
+ public SingleThreadFetcherManagerAdapter(
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
+ Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
+ Configuration configuration,
+ Consumer<Collection<String>> splitFinishedHook) {
+ super(splitReaderSupplier, configuration, splitFinishedHook);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
new file mode 100644
index 000000000..f6b858cbb
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.adapter;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+
+/**
+ * Adapter for {@link SingleThreadMultiplexSourceReaderBase}.TODO: remove it
until not supported in
+ * flink 1.18.
+ */
+public abstract class SingleThreadMultiplexSourceReaderBaseAdapter<
+ E, T, SplitT extends SourceSplit, SplitStateT>
+ extends SingleThreadMultiplexSourceReaderBase<E, T, SplitT,
SplitStateT> {
+ public SingleThreadMultiplexSourceReaderBaseAdapter(
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
+ SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
+ RecordEmitter<E, T, SplitStateT> recordEmitter,
+ Configuration config,
+ SourceReaderContext context) {
+ super(splitFetcherManager, recordEmitter, config, context);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java
index 476b1aabf..d2114bd89 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java
@@ -39,7 +39,7 @@ import com.alibaba.fluss.utils.ExceptionUtils;
import com.alibaba.fluss.utils.IOUtils;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
@@ -90,8 +90,18 @@ import static
com.alibaba.fluss.flink.utils.CatalogExceptionUtils.isTableNotPart
import static com.alibaba.fluss.flink.utils.FlinkConversions.toFlussDatabase;
import static org.apache.flink.util.Preconditions.checkArgument;
-/** A Flink Catalog for fluss. */
-public class FlinkCatalog implements Catalog {
+/**
+ * A Flink Catalog for fluss.
+ *
+ * <p>Currently, this class must extend the internal Flink class {@link
AbstractCatalog} because an
+ * incompatibility bug ( <a
+ * href="https://issues.apache.org/jira/browse/FLINK-38030">FLINK-38030</a>)
in flink 2.0.0.
+ *
+ * <p>TODO: Once this issue is resolved in a future version of Flink (likely
2.1+), refactor this
+ * class to implement the public interface {@link
org.apache.flink.table.catalog.Catalog} instead of
+ * extending the internal class {@link AbstractCatalog}.
+ */
+public class FlinkCatalog extends AbstractCatalog {
public static final String LAKE_TABLE_SPLITTER = "$lake";
@@ -111,6 +121,7 @@ public class FlinkCatalog implements Catalog {
String bootstrapServers,
ClassLoader classLoader,
Map<String, String> securityConfigs) {
+ super(name, defaultDatabase);
this.catalogName = name;
this.defaultDatabase = defaultDatabase;
this.bootstrapServers = bootstrapServers;
@@ -139,16 +150,6 @@ public class FlinkCatalog implements Catalog {
IOUtils.closeQuietly(connection, "fluss-connection");
}
- public String getName() {
- return catalogName;
- }
-
- @Nullable
- @Override
- public String getDefaultDatabase() throws CatalogException {
- return defaultDatabase;
- }
-
@Override
public List<String> listDatabases() throws CatalogException {
try {
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java
index d2db9ac8c..3efe46aa6 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java
@@ -18,6 +18,7 @@
package com.alibaba.fluss.flink.source.reader;
import com.alibaba.fluss.config.Configuration;
+import
com.alibaba.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter;
import com.alibaba.fluss.flink.lake.LakeSplitStateInitializer;
import com.alibaba.fluss.flink.source.emitter.FlinkRecordEmitter;
import com.alibaba.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent;
@@ -37,7 +38,6 @@ import com.alibaba.fluss.types.RowType;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
-import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import javax.annotation.Nullable;
@@ -48,7 +48,7 @@ import java.util.function.Consumer;
/** The source reader for Fluss. */
public class FlinkSourceReader<OUT>
- extends SingleThreadMultiplexSourceReaderBase<
+ extends SingleThreadMultiplexSourceReaderBaseAdapter<
RecordAndPos, OUT, SourceSplitBase, SourceSplitState> {
public FlinkSourceReader(
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/fetcher/FlinkSourceFetcherManager.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/fetcher/FlinkSourceFetcherManager.java
index dc48aa601..001cd4a16 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/fetcher/FlinkSourceFetcherManager.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/fetcher/FlinkSourceFetcherManager.java
@@ -17,6 +17,7 @@
package com.alibaba.fluss.flink.source.reader.fetcher;
+import com.alibaba.fluss.flink.adapter.SingleThreadFetcherManagerAdapter;
import com.alibaba.fluss.flink.source.reader.FlinkSourceSplitReader;
import com.alibaba.fluss.flink.source.reader.RecordAndPos;
import com.alibaba.fluss.flink.source.split.SourceSplitBase;
@@ -26,7 +27,6 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
-import
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
@@ -46,10 +46,17 @@ import java.util.function.Supplier;
*/
@Internal
public class FlinkSourceFetcherManager
- extends SingleThreadFetcherManager<RecordAndPos, SourceSplitBase> {
+ extends SingleThreadFetcherManagerAdapter<RecordAndPos,
SourceSplitBase> {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkSourceFetcherManager.class);
+ /**
+ * Creates a new SplitFetcherManager with a single I/O threads.
+ *
+ * @param splitReaderSupplier The factory for the split reader that
connects to the source
+ * system.
+ * @param splitFinishedHook Hook for handling finished splits in split
fetchers.
+ */
/**
* Creates a new SplitFetcherManager with a single I/O threads.
*
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java
index 1c95cf87e..49452d706 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java
@@ -57,6 +57,7 @@ import java.util.stream.Collectors;
import static com.alibaba.fluss.flink.FlinkConnectorOptions.BUCKET_KEY;
import static com.alibaba.fluss.flink.FlinkConnectorOptions.BUCKET_NUMBER;
+import static
com.alibaba.fluss.flink.adapter.CatalogTableAdapter.toCatalogTable;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
/** Utils for conversion between Flink and Fluss. */
@@ -148,8 +149,7 @@ public class FlinkConversions {
// deserialize watermark
CatalogPropertiesUtils.deserializeWatermark(newOptions, schemaBuilder);
-
- return CatalogTable.of(
+ return toCatalogTable(
schemaBuilder.build(),
tableInfo.getComment().orElse(null),
tableInfo.getPartitionKeys(),
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/catalog/FlinkCatalogTest.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/catalog/FlinkCatalogTest.java
index 9a22ff4e2..7a93bf60b 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/catalog/FlinkCatalogTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/catalog/FlinkCatalogTest.java
@@ -438,15 +438,17 @@ class FlinkCatalogTest {
// Test catalog with null default database
Configuration flussConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
- Catalog catalogWithoutDefault =
- new FlinkCatalog(
- "test-catalog-no-default",
- null, // null default database
- String.join(",",
flussConf.get(ConfigOptions.BOOTSTRAP_SERVERS)),
- Thread.currentThread().getContextClassLoader(),
- Collections.emptyMap());
-
- assertThat(catalogWithoutDefault.getDefaultDatabase()).isNull();
+ assertThatThrownBy(
+ () ->
+ new FlinkCatalog(
+ "test-catalog-no-default",
+ null, // null default database
+ String.join(
+ ",",
+
flussConf.get(ConfigOptions.BOOTSTRAP_SERVERS)),
+
Thread.currentThread().getContextClassLoader(),
+ Collections.emptyMap()))
+ .hasMessageContaining("defaultDatabase cannot be null or
empty");
}
@Test
diff --git a/fluss-flink/pom.xml b/fluss-flink/pom.xml
index 4d402426a..fcdc6c05c 100644
--- a/fluss-flink/pom.xml
+++ b/fluss-flink/pom.xml
@@ -36,6 +36,7 @@
<module>fluss-flink-1.20</module>
<module>fluss-flink-1.19</module>
<module>fluss-flink-1.18</module>
+ <module>fluss-flink-2.1</module>
<module>fluss-flink-tiering</module>
</modules>
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index 0b1e3ca0d..811d6e353 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -68,6 +68,13 @@
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>com.alibaba.fluss</groupId>
+ <artifactId>fluss-flink-2.1</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
<dependency>
<groupId>com.alibaba.fluss</groupId>
<artifactId>fluss-flink-1.20</artifactId>
@@ -153,11 +160,54 @@
<directory>${project.basedir}/../</directory>
<includes>
<include>fluss-flink/**/target/classes/**</include>
+ </includes>
+ <excludes>
+
<exclude>fluss-test-coverage/**</exclude>
+
<exclude>fluss-test-utils/**</exclude>
+ <!-- exclude adapter classes
to avoid Jacoco error: "Can't add different class with same name" -->
+
<exclude>fluss-flink/**/target/classes/com/alibaba/fluss/flink/adapter/**</exclude>
+ </excludes>
+ </resource>
+ </resources>
+
<outputDirectory>${project.build.directory}/classes</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ <profile>
+ <id>test-lake</id>
+ <build>
+ <plugins>
+ <!-- required by jacoco for the goal: check to work -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-class-files</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <overwrite>false</overwrite>
+ <resources>
+ <resource>
+
<directory>${project.basedir}/../</directory>
+ <includes>
<include>fluss-lake/**/target/classes/**</include>
+
<include>fluss-flink/fluss-flink-1.19/**/target/classes/**</include>
+
<include>fluss-flink/fluss-flink-1.18/**/target/classes/**</include>
</includes>
<excludes>
<exclude>fluss-test-coverage/**</exclude>
<exclude>fluss-test-utils/**</exclude>
+ <!-- exclude adapter classes
to avoid Jacoco error: "Can't add different class with same name" -->
+
<exclude>fluss-flink/**/target/classes/com/alibaba/fluss/flink/adapter/**</exclude>
</excludes>
</resource>
</resources>
@@ -329,7 +379,6 @@
<!-- exclude for dummy class -->
<exclude>com.alibaba.fluss.dist.DummyClass</exclude>
<exclude>com.alibaba.fluss.flink.DummyClass120</exclude>
-
<exclude>com.alibaba.fluss.flink.DummyClass119</exclude>
<exclude>com.alibaba.fluss.lake.batch.ArrowRecordBatch</exclude>
<exclude>com.alibaba.fluss.lake.committer.CommittedLakeSnapshot</exclude>
<exclude>com.alibaba.fluss.lake.paimon.FlussDataTypeToPaimonDataType</exclude>
@@ -361,7 +410,7 @@
<exclude>com.alibaba.fluss.flink.tiering.committer.CommittableMessageTypeInfo*
</exclude>
<exclude>
-
com.alibaba.fluss.flink.tiering.committer.LakeTieringCommitOperatorFactory
+
com.alibaba.fluss.flink.tiering.LakeTieringJobBuilder
</exclude>
<exclude>com.alibaba.fluss.flink.tiering.FlussLakeTieringEntrypoint</exclude>
<!-- end exclude for flink tiering
service -->
diff --git a/pom.xml b/pom.xml
index c033ea79b..c91242dfc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,6 +89,7 @@
<test.skip.coverage>true</test.skip.coverage>
<fluss.forkCount>2</fluss.forkCount>
<fluss.reuseForks>true</fluss.reuseForks>
+ <skip.on.java8>false</skip.on.java8>
<!-- library versions in fluss-shaded -->
<fluss.shaded.version>2.2</fluss.shaded.version>
@@ -448,6 +449,7 @@
<id>java8</id>
<properties>
<target.java.version>1.8</target.java.version>
+ <skip.on.java8>true</skip.on.java8>
</properties>
<build>
<plugins>
@@ -467,6 +469,7 @@
</plugins>
</build>
</profile>
+
<profile>
<id>test-coverage</id>
<properties>