This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new f94f8e29a [flink] Support Flink 2.1 (#1176)
f94f8e29a is described below

commit f94f8e29ab42e52854ca4d77643b376f3a77964f
Author: Hongshun Wang <[email protected]>
AuthorDate: Sun Aug 24 17:37:17 2025 +0800

    [flink] Support Flink 2.1 (#1176)
    
    1. support flink 2.1 in module fluss-flink-2.1
    2. skip compilation and test for flink 2.1 module when Java8 is activated
    3. split CI matrix into core, flink and lake
---
 .github/workflows/ci-template.yaml                 |  2 +-
 .github/workflows/stage.sh                         | 14 ++++-
 fluss-flink/fluss-flink-1.18/pom.xml               |  2 +-
 .../fluss/flink/adapter/CatalogTableAdapter.java}  | 25 ++++++++-
 .../adapter/SingleThreadFetcherManagerAdapter.java | 43 ++++++++++++++
 ...ngleThreadMultiplexSourceReaderBaseAdapter.java | 44 +++++++++++++++
 .../CatalogTableAdapter.java}                      | 25 ++++++++-
 .../{fluss-flink-1.18 => fluss-flink-2.1}/pom.xml  | 65 +++++++++++-----------
 .../org/apache/flink/api/connector/sink2/Sink.java | 59 ++++++++++++++++++++
 .../org.apache.flink.table.factories.Factory       | 17 ++++++
 .../fluss/flink/catalog/Flink21CatalogITCase.java} |  6 +-
 .../fluss/flink/metrics/Flink21MetricsITCase.java  | 20 +++++++
 .../flink/procedure/Flink21ProcedureITCase.java    | 20 +++++++
 .../security/acl/Flink21AuthorizationITCase.java   | 20 +++++++
 .../fluss/flink/sink/Flink21TableSinkITCase.java   | 20 +++++++
 .../source/Flink21TableSourceBatchITCase.java      | 20 +++++++
 .../source/Flink21TableSourceFailOverITCase.java   | 20 +++++++
 .../flink/source/Flink21TableSourceITCase.java     | 20 +++++++
 .../fluss/flink/adapter/CatalogTableAdapter.java}  | 30 +++++++++-
 .../adapter/SingleThreadFetcherManagerAdapter.java | 43 ++++++++++++++
 ...ngleThreadMultiplexSourceReaderBaseAdapter.java | 44 +++++++++++++++
 .../alibaba/fluss/flink/catalog/FlinkCatalog.java  | 27 ++++-----
 .../flink/source/reader/FlinkSourceReader.java     |  4 +-
 .../reader/fetcher/FlinkSourceFetcherManager.java  | 11 +++-
 .../fluss/flink/utils/FlinkConversions.java        |  4 +-
 .../fluss/flink/catalog/FlinkCatalogTest.java      | 20 ++++---
 fluss-flink/pom.xml                                |  1 +
 fluss-test-coverage/pom.xml                        | 53 +++++++++++++++++-
 pom.xml                                            |  3 +
 29 files changed, 605 insertions(+), 77 deletions(-)

diff --git a/.github/workflows/ci-template.yaml 
b/.github/workflows/ci-template.yaml
index 26c81ab59..c5f77af3a 100644
--- a/.github/workflows/ci-template.yaml
+++ b/.github/workflows/ci-template.yaml
@@ -36,7 +36,7 @@ jobs:
     strategy:
       fail-fast: false
       matrix:
-        module: [ core, flink ]
+        module: [ core, flink, lake ]
     name: "${{ matrix.module }}"
     steps:
       - name: Checkout code
diff --git a/.github/workflows/stage.sh b/.github/workflows/stage.sh
index e96d34ddf..49a3ed57e 100755
--- a/.github/workflows/stage.sh
+++ b/.github/workflows/stage.sh
@@ -19,11 +19,17 @@
 
 STAGE_CORE="core"
 STAGE_FLINK="flink"
+STAGE_LAKE="lake"
 
 MODULES_FLINK="\
 fluss-flink,\
 fluss-flink/fluss-flink-common,\
+fluss-flink/fluss-flink-2.1,\
 fluss-flink/fluss-flink-1.20,\
+"
+
+# we move Flink legacy version tests to "lake" module for balancing testing 
time
+MODULES_LAKE="\
 fluss-flink/fluss-flink-1.19,\
 fluss-flink/fluss-flink-1.18,\
 fluss-lake,\
@@ -36,7 +42,10 @@ function get_test_modules_for_stage() {
     local stage=$1
 
     local modules_flink=$MODULES_FLINK
-    local modules_core=\!${MODULES_FLINK//,/,\!}
+    local modules_lake=$MODULES_LAKE
+    local negated_flink=\!${MODULES_FLINK//,/,\!}
+    local negated_lake=\!${MODULES_LAKE//,/,\!}
+    local modules_core="$negated_flink,$negated_lake"
 
     case ${stage} in
         (${STAGE_CORE})
@@ -45,6 +54,9 @@ function get_test_modules_for_stage() {
         (${STAGE_FLINK})
             echo "-pl fluss-test-coverage,$modules_flink"
         ;;
+        (${STAGE_LAKE})
+            echo "-pl fluss-test-coverage,$modules_lake"
+        ;;
     esac
 }
 
diff --git a/fluss-flink/fluss-flink-1.18/pom.xml 
b/fluss-flink/fluss-flink-1.18/pom.xml
index 93b9f65f2..8e71d0d12 100644
--- a/fluss-flink/fluss-flink-1.18/pom.xml
+++ b/fluss-flink/fluss-flink-1.18/pom.xml
@@ -105,7 +105,7 @@
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-connector-base</artifactId>
             <version>${flink.minor.version}</version>
-            <scope>test</scope>
+            <scope>provided</scope>
         </dependency>
 
 
diff --git 
a/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
 
b/fluss-flink/fluss-flink-1.18/src/main/java/com/alibaba/fluss/flink/adapter/CatalogTableAdapter.java
similarity index 54%
copy from 
fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
copy to 
fluss-flink/fluss-flink-1.18/src/main/java/com/alibaba/fluss/flink/adapter/CatalogTableAdapter.java
index 566c42ff3..a047202f1 100644
--- 
a/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
+++ 
b/fluss-flink/fluss-flink-1.18/src/main/java/com/alibaba/fluss/flink/adapter/CatalogTableAdapter.java
@@ -15,7 +15,26 @@
  * limitations under the License.
  */
 
-package com.alibaba.fluss.flink;
+package com.alibaba.fluss.flink.adapter;
 
-/** This is an empty package to generate a javadoc jar to make Sonatype OSS 
happy. */
-public class DummyClass119 {}
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogTable;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A adapter for {@link CatalogTable} constructor. TODO: remove this class 
when no longer support
+ * flink 1.18 and 1.19.
+ */
+public class CatalogTableAdapter {
+    public static CatalogTable toCatalogTable(
+            Schema schema,
+            @Nullable String comment,
+            List<String> partitionKeys,
+            Map<String, String> options) {
+        return CatalogTable.of(schema, comment, partitionKeys, options);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-1.18/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadFetcherManagerAdapter.java
 
b/fluss-flink/fluss-flink-1.18/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadFetcherManagerAdapter.java
new file mode 100644
index 000000000..31989c16b
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.18/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadFetcherManagerAdapter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.adapter;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+
+import java.util.Collection;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * Adapter for {@link SingleThreadFetcherManager}.TODO: remove it until not 
supported in flink 1.18.
+ */
+public class SingleThreadFetcherManagerAdapter<E, SplitT extends SourceSplit>
+        extends SingleThreadFetcherManager<E, SplitT> {
+    public SingleThreadFetcherManagerAdapter(
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
+            Configuration configuration,
+            Consumer<Collection<String>> splitFinishedHook) {
+        super(elementsQueue, splitReaderSupplier, configuration, 
splitFinishedHook);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-1.18/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
 
b/fluss-flink/fluss-flink-1.18/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
new file mode 100644
index 000000000..9dd864145
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-1.18/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.adapter;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+
+/**
+ * Adapter for {@link SingleThreadMultiplexSourceReaderBase}.TODO: remove it 
until not supported in
+ * flink 1.18.
+ */
+public abstract class SingleThreadMultiplexSourceReaderBaseAdapter<
+                E, T, SplitT extends SourceSplit, SplitStateT>
+        extends SingleThreadMultiplexSourceReaderBase<E, T, SplitT, 
SplitStateT> {
+    public SingleThreadMultiplexSourceReaderBaseAdapter(
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
+            RecordEmitter<E, T, SplitStateT> recordEmitter,
+            Configuration config,
+            SourceReaderContext context) {
+        super(elementsQueue, splitFetcherManager, recordEmitter, config, 
context);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
 
b/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/adapter/CatalogTableAdapter.java
similarity index 54%
copy from 
fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
copy to 
fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/adapter/CatalogTableAdapter.java
index 566c42ff3..a047202f1 100644
--- 
a/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
+++ 
b/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/adapter/CatalogTableAdapter.java
@@ -15,7 +15,26 @@
  * limitations under the License.
  */
 
-package com.alibaba.fluss.flink;
+package com.alibaba.fluss.flink.adapter;
 
-/** This is an empty package to generate a javadoc jar to make Sonatype OSS 
happy. */
-public class DummyClass119 {}
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogTable;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A adapter for {@link CatalogTable} constructor. TODO: remove this class 
when no longer support
+ * flink 1.18 and 1.19.
+ */
+public class CatalogTableAdapter {
+    public static CatalogTable toCatalogTable(
+            Schema schema,
+            @Nullable String comment,
+            List<String> partitionKeys,
+            Map<String, String> options) {
+        return CatalogTable.of(schema, comment, partitionKeys, options);
+    }
+}
diff --git a/fluss-flink/fluss-flink-1.18/pom.xml 
b/fluss-flink/fluss-flink-2.1/pom.xml
similarity index 83%
copy from fluss-flink/fluss-flink-1.18/pom.xml
copy to fluss-flink/fluss-flink-2.1/pom.xml
index 93b9f65f2..79493a587 100644
--- a/fluss-flink/fluss-flink-1.18/pom.xml
+++ b/fluss-flink/fluss-flink-2.1/pom.xml
@@ -1,22 +1,19 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
+  ~  Copyright (c) 2025 Alibaba Group Holding Ltd.
+  ~
+  ~  Licensed under the Apache License, Version 2.0 (the "License");
+  ~  you may not use this file except in compliance with the License.
+  ~  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
 <project xmlns="http://maven.apache.org/POM/4.0.0";
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
@@ -27,13 +24,11 @@
         <version>0.8-SNAPSHOT</version>
     </parent>
 
-    <artifactId>fluss-flink-1.18</artifactId>
-
-    <name>Fluss : Engine Flink : 1.18</name>
-
+    <artifactId>fluss-flink-2.1</artifactId>
+    <name>Fluss : Engine Flink : 2.1 </name>
     <properties>
-        <flink.major.version>1.18</flink.major.version>
-        <flink.minor.version>1.18.1</flink.minor.version>
+        <flink.major.version>2.1</flink.major.version>
+        <flink.minor.version>2.1.0</flink.minor.version>
     </properties>
 
     <dependencies>
@@ -71,13 +66,6 @@
             <scope>provided</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java</artifactId>
-            <version>${flink.minor.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
         <!-- test dependency -->
         <dependency>
             <groupId>com.alibaba.fluss</groupId>
@@ -108,7 +96,6 @@
             <scope>test</scope>
         </dependency>
 
-
         <dependency>
             <groupId>com.alibaba.fluss</groupId>
             <artifactId>fluss-server</artifactId>
@@ -165,6 +152,17 @@
 
     <build>
         <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <!-- compilation of main sources -->
+                    <skipMain>${skip.on.java8}</skipMain>
+                    <!-- compilation of test sources -->
+                    <skip>${skip.on.java8}</skip>
+                </configuration>
+            </plugin>
+
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
@@ -179,6 +177,7 @@
                             <goal>test</goal>
                         </goals>
                         <configuration>
+                            <skip>${skip.on.java8}</skip>
                             <includes>
                                 <include>**/*ITCase.*</include>
                             </includes>
@@ -195,6 +194,7 @@
                             <goal>test</goal>
                         </goals>
                         <configuration>
+                            <skip>${skip.on.java8}</skip>
                             <excludes>
                                 <exclude>**/*ITCase.*</exclude>
                             </excludes>
@@ -202,6 +202,7 @@
                     </execution>
                 </executions>
             </plugin>
+
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
@@ -226,4 +227,4 @@
         </plugins>
     </build>
 
-</project>
\ No newline at end of file
+</project>
diff --git 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
 
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
new file mode 100644
index 000000000..5dfc628ac
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Placeholder class to resolve compatibility issues. This placeholder class 
can be removed once we
+ * drop the support of Flink v1.18 which requires the legacy InitContext 
interface.
+ */
+public interface Sink<InputT> extends Serializable {
+
+    /**
+     * Creates a {@link SinkWriter}.
+     *
+     * @param context the runtime context.
+     * @return A sink writer.
+     * @throws IOException for any failure during creation.
+     */
+    SinkWriter<InputT> createWriter(WriterInitContext context) throws 
IOException;
+
+    /** The interface exposes some runtime info for creating a {@link 
SinkWriter}. */
+    interface InitContext {
+
+        /**
+         * Returns the mailbox executor that allows to execute {@link 
Runnable}s inside the task
+         * thread in between record processing.
+         *
+         * <p>Note that this method should not be used per-record for 
performance reasons in the
+         * same way as records should not be sent to the external system 
individually. Rather,
+         * implementers are expected to batch records and only enqueue a 
single {@link Runnable} per
+         * batch to handle the result.
+         */
+        MailboxExecutor getMailboxExecutor();
+
+        /** @return The metric group this writer belongs to. */
+        SinkWriterMetricGroup metricGroup();
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..fb9a3d86b
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.1/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,17 @@
+#
+# Copyright (c) 2025 Alibaba Group Holding Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+com.alibaba.fluss.flink.catalog.FlinkCatalogFactory
\ No newline at end of file
diff --git 
a/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
 
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/catalog/Flink21CatalogITCase.java
similarity index 84%
copy from 
fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
copy to 
fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/catalog/Flink21CatalogITCase.java
index 566c42ff3..78d4c0a4b 100644
--- 
a/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
+++ 
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/catalog/Flink21CatalogITCase.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package com.alibaba.fluss.flink;
+package com.alibaba.fluss.flink.catalog;
 
-/** This is an empty package to generate a javadoc jar to make Sonatype OSS 
happy. */
-public class DummyClass119 {}
+/** IT case for catalog in Flink 2.1. */
+public class Flink21CatalogITCase extends FlinkCatalogITCase {}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/metrics/Flink21MetricsITCase.java
 
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/metrics/Flink21MetricsITCase.java
new file mode 100644
index 000000000..72b690bbe
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/metrics/Flink21MetricsITCase.java
@@ -0,0 +1,20 @@
+/*
+ *  Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.metrics;
+
+/** IT case for metrics in Flink 2.1. */
+public class Flink21MetricsITCase extends FlinkMetricsITCase {}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/procedure/Flink21ProcedureITCase.java
 
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/procedure/Flink21ProcedureITCase.java
new file mode 100644
index 000000000..9183b24cc
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/procedure/Flink21ProcedureITCase.java
@@ -0,0 +1,20 @@
+/*
+ *  Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.procedure;
+
+/** IT case for procedure in Flink 2.1. */
+public class Flink21ProcedureITCase extends FlinkProcedureITCase {}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/security/acl/Flink21AuthorizationITCase.java
 
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/security/acl/Flink21AuthorizationITCase.java
new file mode 100644
index 000000000..aa75e4851
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/security/acl/Flink21AuthorizationITCase.java
@@ -0,0 +1,20 @@
+/*
+ *  Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.security.acl;
+
+/** IT case for authorization in Flink 2.1. */
+public class Flink21AuthorizationITCase extends FlinkAuthorizationITCase {}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/sink/Flink21TableSinkITCase.java
 
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/sink/Flink21TableSinkITCase.java
new file mode 100644
index 000000000..ba29e5609
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/sink/Flink21TableSinkITCase.java
@@ -0,0 +1,20 @@
+/*
+ *  Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.sink;
+
+/** IT case for {@link FlinkTableSink} in Flink 2.1. */
+public class Flink21TableSinkITCase extends FlinkTableSinkITCase {}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/source/Flink21TableSourceBatchITCase.java
 
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/source/Flink21TableSourceBatchITCase.java
new file mode 100644
index 000000000..f96de12e0
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/source/Flink21TableSourceBatchITCase.java
@@ -0,0 +1,20 @@
+/*
+ *  Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.source;
+
+/** IT case for batch source in Flink 2.1. */
+public class Flink21TableSourceBatchITCase extends FlinkTableSourceBatchITCase 
{}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/source/Flink21TableSourceFailOverITCase.java
 
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/source/Flink21TableSourceFailOverITCase.java
new file mode 100644
index 000000000..150c142c6
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/source/Flink21TableSourceFailOverITCase.java
@@ -0,0 +1,20 @@
+/*
+ *  Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.source;
+
+/** IT case for source failover and recovery in Flink 2.1. */
+public class Flink21TableSourceFailOverITCase extends 
FlinkTableSourceFailOverITCase {}
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/source/Flink21TableSourceITCase.java
 
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/source/Flink21TableSourceITCase.java
new file mode 100644
index 000000000..22078bb15
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-2.1/src/test/java/com/alibaba/fluss/flink/source/Flink21TableSourceITCase.java
@@ -0,0 +1,20 @@
+/*
+ *  Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.source;
+
+/** IT case for {@link FlinkTableSource} in Flink 2.1. */
+public class Flink21TableSourceITCase extends FlinkTableSourceITCase {}
diff --git 
a/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/adapter/CatalogTableAdapter.java
similarity index 50%
rename from 
fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
rename to 
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/adapter/CatalogTableAdapter.java
index 566c42ff3..7b3abe86d 100644
--- 
a/fluss-flink/fluss-flink-1.19/src/main/java/com/alibaba/fluss/flink/DummyClass119.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/adapter/CatalogTableAdapter.java
@@ -15,7 +15,31 @@
  * limitations under the License.
  */
 
-package com.alibaba.fluss.flink;
+package com.alibaba.fluss.flink.adapter;
 
-/** This is an empty package to generate a javadoc jar to make Sonatype OSS 
happy. */
-public class DummyClass119 {}
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogTable;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A adapter for {@link CatalogTable} constructor. TODO: remove this class 
when no longer support
+ * flink 1.18 and 1.19.
+ */
+public class CatalogTableAdapter {
+    public static CatalogTable toCatalogTable(
+            Schema schema,
+            @Nullable String comment,
+            List<String> partitionKeys,
+            Map<String, String> options) {
+        return CatalogTable.newBuilder()
+                .schema(schema)
+                .comment(comment)
+                .partitionKeys(partitionKeys)
+                .options(options)
+                .build();
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadFetcherManagerAdapter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadFetcherManagerAdapter.java
new file mode 100644
index 000000000..bc97e2f22
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadFetcherManagerAdapter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.adapter;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+
+import java.util.Collection;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * Adapter for {@link SingleThreadFetcherManager}.TODO: remove it until not 
supported in flink 1.18.
+ */
+public class SingleThreadFetcherManagerAdapter<E, SplitT extends SourceSplit>
+        extends SingleThreadFetcherManager<E, SplitT> {
+    public SingleThreadFetcherManagerAdapter(
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
+            Configuration configuration,
+            Consumer<Collection<String>> splitFinishedHook) {
+        super(splitReaderSupplier, configuration, splitFinishedHook);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
new file mode 100644
index 000000000..f6b858cbb
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/adapter/SingleThreadMultiplexSourceReaderBaseAdapter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.adapter;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+
+/**
+ * Adapter for {@link SingleThreadMultiplexSourceReaderBase}.TODO: remove it 
until not supported in
+ * flink 1.18.
+ */
+public abstract class SingleThreadMultiplexSourceReaderBaseAdapter<
+                E, T, SplitT extends SourceSplit, SplitStateT>
+        extends SingleThreadMultiplexSourceReaderBase<E, T, SplitT, 
SplitStateT> {
+    public SingleThreadMultiplexSourceReaderBaseAdapter(
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
+            RecordEmitter<E, T, SplitStateT> recordEmitter,
+            Configuration config,
+            SourceReaderContext context) {
+        super(splitFetcherManager, recordEmitter, config, context);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java
index 476b1aabf..d2114bd89 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java
@@ -39,7 +39,7 @@ import com.alibaba.fluss.utils.ExceptionUtils;
 import com.alibaba.fluss.utils.IOUtils;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
@@ -90,8 +90,18 @@ import static 
com.alibaba.fluss.flink.utils.CatalogExceptionUtils.isTableNotPart
 import static com.alibaba.fluss.flink.utils.FlinkConversions.toFlussDatabase;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
-/** A Flink Catalog for fluss. */
-public class FlinkCatalog implements Catalog {
+/**
+ * A Flink Catalog for fluss.
+ *
+ * <p>Currently, this class must extend the internal Flink class {@link 
AbstractCatalog} because an
+ * incompatibility bug ( <a
+ * href="https://issues.apache.org/jira/browse/FLINK-38030";>FLINK-38030</a>) 
in flink 2.0.0.
+ *
+ * <p>TODO: Once this issue is resolved in a future version of Flink (likely 
2.1+), refactor this
+ * class to implement the public interface {@link 
org.apache.flink.table.catalog.Catalog} instead of
+ * extending the internal class {@link AbstractCatalog}.
+ */
+public class FlinkCatalog extends AbstractCatalog {
 
     public static final String LAKE_TABLE_SPLITTER = "$lake";
 
@@ -111,6 +121,7 @@ public class FlinkCatalog implements Catalog {
             String bootstrapServers,
             ClassLoader classLoader,
             Map<String, String> securityConfigs) {
+        super(name, defaultDatabase);
         this.catalogName = name;
         this.defaultDatabase = defaultDatabase;
         this.bootstrapServers = bootstrapServers;
@@ -139,16 +150,6 @@ public class FlinkCatalog implements Catalog {
         IOUtils.closeQuietly(connection, "fluss-connection");
     }
 
-    public String getName() {
-        return catalogName;
-    }
-
-    @Nullable
-    @Override
-    public String getDefaultDatabase() throws CatalogException {
-        return defaultDatabase;
-    }
-
     @Override
     public List<String> listDatabases() throws CatalogException {
         try {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java
index d2db9ac8c..3efe46aa6 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java
@@ -18,6 +18,7 @@
 package com.alibaba.fluss.flink.source.reader;
 
 import com.alibaba.fluss.config.Configuration;
+import 
com.alibaba.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter;
 import com.alibaba.fluss.flink.lake.LakeSplitStateInitializer;
 import com.alibaba.fluss.flink.source.emitter.FlinkRecordEmitter;
 import com.alibaba.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent;
@@ -37,7 +38,6 @@ import com.alibaba.fluss.types.RowType;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
-import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 
 import javax.annotation.Nullable;
@@ -48,7 +48,7 @@ import java.util.function.Consumer;
 
 /** The source reader for Fluss. */
 public class FlinkSourceReader<OUT>
-        extends SingleThreadMultiplexSourceReaderBase<
+        extends SingleThreadMultiplexSourceReaderBaseAdapter<
                 RecordAndPos, OUT, SourceSplitBase, SourceSplitState> {
 
     public FlinkSourceReader(
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/fetcher/FlinkSourceFetcherManager.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/fetcher/FlinkSourceFetcherManager.java
index dc48aa601..001cd4a16 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/fetcher/FlinkSourceFetcherManager.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/fetcher/FlinkSourceFetcherManager.java
@@ -17,6 +17,7 @@
 
 package com.alibaba.fluss.flink.source.reader.fetcher;
 
+import com.alibaba.fluss.flink.adapter.SingleThreadFetcherManagerAdapter;
 import com.alibaba.fluss.flink.source.reader.FlinkSourceSplitReader;
 import com.alibaba.fluss.flink.source.reader.RecordAndPos;
 import com.alibaba.fluss.flink.source.split.SourceSplitBase;
@@ -26,7 +27,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.SourceReaderBase;
-import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
 import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
 import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
@@ -46,10 +46,17 @@ import java.util.function.Supplier;
  */
 @Internal
 public class FlinkSourceFetcherManager
-        extends SingleThreadFetcherManager<RecordAndPos, SourceSplitBase> {
+        extends SingleThreadFetcherManagerAdapter<RecordAndPos, 
SourceSplitBase> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkSourceFetcherManager.class);
 
+    /**
+     * Creates a new SplitFetcherManager with a single I/O threads.
+     *
+     * @param splitReaderSupplier The factory for the split reader that 
connects to the source
+     *     system.
+     * @param splitFinishedHook Hook for handling finished splits in split 
fetchers.
+     */
     /**
      * Creates a new SplitFetcherManager with a single I/O threads.
      *
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java
index 1c95cf87e..49452d706 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java
@@ -57,6 +57,7 @@ import java.util.stream.Collectors;
 
 import static com.alibaba.fluss.flink.FlinkConnectorOptions.BUCKET_KEY;
 import static com.alibaba.fluss.flink.FlinkConnectorOptions.BUCKET_NUMBER;
+import static 
com.alibaba.fluss.flink.adapter.CatalogTableAdapter.toCatalogTable;
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 
 /** Utils for conversion between Flink and Fluss. */
@@ -148,8 +149,7 @@ public class FlinkConversions {
 
         // deserialize watermark
         CatalogPropertiesUtils.deserializeWatermark(newOptions, schemaBuilder);
-
-        return CatalogTable.of(
+        return toCatalogTable(
                 schemaBuilder.build(),
                 tableInfo.getComment().orElse(null),
                 tableInfo.getPartitionKeys(),
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/catalog/FlinkCatalogTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/catalog/FlinkCatalogTest.java
index 9a22ff4e2..7a93bf60b 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/catalog/FlinkCatalogTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/catalog/FlinkCatalogTest.java
@@ -438,15 +438,17 @@ class FlinkCatalogTest {
 
         // Test catalog with null default database
         Configuration flussConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
-        Catalog catalogWithoutDefault =
-                new FlinkCatalog(
-                        "test-catalog-no-default",
-                        null, // null default database
-                        String.join(",", 
flussConf.get(ConfigOptions.BOOTSTRAP_SERVERS)),
-                        Thread.currentThread().getContextClassLoader(),
-                        Collections.emptyMap());
-
-        assertThat(catalogWithoutDefault.getDefaultDatabase()).isNull();
+        assertThatThrownBy(
+                        () ->
+                                new FlinkCatalog(
+                                        "test-catalog-no-default",
+                                        null, // null default database
+                                        String.join(
+                                                ",",
+                                                
flussConf.get(ConfigOptions.BOOTSTRAP_SERVERS)),
+                                        
Thread.currentThread().getContextClassLoader(),
+                                        Collections.emptyMap()))
+                .hasMessageContaining("defaultDatabase cannot be null or 
empty");
     }
 
     @Test
diff --git a/fluss-flink/pom.xml b/fluss-flink/pom.xml
index 4d402426a..fcdc6c05c 100644
--- a/fluss-flink/pom.xml
+++ b/fluss-flink/pom.xml
@@ -36,6 +36,7 @@
         <module>fluss-flink-1.20</module>
         <module>fluss-flink-1.19</module>
         <module>fluss-flink-1.18</module>
+        <module>fluss-flink-2.1</module>
         <module>fluss-flink-tiering</module>
     </modules>
 
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index 0b1e3ca0d..811d6e353 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -68,6 +68,13 @@
             <scope>compile</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.alibaba.fluss</groupId>
+            <artifactId>fluss-flink-2.1</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
+
         <dependency>
             <groupId>com.alibaba.fluss</groupId>
             <artifactId>fluss-flink-1.20</artifactId>
@@ -153,11 +160,54 @@
                                             
<directory>${project.basedir}/../</directory>
                                             <includes>
                                                 
<include>fluss-flink/**/target/classes/**</include>
+                                            </includes>
+                                            <excludes>
+                                                
<exclude>fluss-test-coverage/**</exclude>
+                                                
<exclude>fluss-test-utils/**</exclude>
+                                                <!-- exclude adapter classes 
to avoid Jacoco error: "Can't add different class with same name" -->
+                                                
<exclude>fluss-flink/**/target/classes/com/alibaba/fluss/flink/adapter/**</exclude>
+                                            </excludes>
+                                        </resource>
+                                    </resources>
+                                    
<outputDirectory>${project.build.directory}/classes</outputDirectory>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+
+        <profile>
+            <id>test-lake</id>
+            <build>
+                <plugins>
+                    <!-- required by jacoco for the goal: check to work -->
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-resources-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>copy-class-files</id>
+                                <phase>generate-resources</phase>
+                                <goals>
+                                    <goal>copy-resources</goal>
+                                </goals>
+                                <configuration>
+                                    <overwrite>false</overwrite>
+                                    <resources>
+                                        <resource>
+                                            
<directory>${project.basedir}/../</directory>
+                                            <includes>
                                                 
<include>fluss-lake/**/target/classes/**</include>
+                                                
<include>fluss-flink/fluss-flink-1.19/**/target/classes/**</include>
+                                                
<include>fluss-flink/fluss-flink-1.18/**/target/classes/**</include>
                                             </includes>
                                             <excludes>
                                                 
<exclude>fluss-test-coverage/**</exclude>
                                                 
<exclude>fluss-test-utils/**</exclude>
+                                                <!-- exclude adapter classes 
to avoid Jacoco error: "Can't add different class with same name" -->
+                                                
<exclude>fluss-flink/**/target/classes/com/alibaba/fluss/flink/adapter/**</exclude>
                                             </excludes>
                                         </resource>
                                     </resources>
@@ -329,7 +379,6 @@
                                         <!-- exclude for dummy class -->
                                         
<exclude>com.alibaba.fluss.dist.DummyClass</exclude>
                                         
<exclude>com.alibaba.fluss.flink.DummyClass120</exclude>
-                                        
<exclude>com.alibaba.fluss.flink.DummyClass119</exclude>
                                         
<exclude>com.alibaba.fluss.lake.batch.ArrowRecordBatch</exclude>
                                         
<exclude>com.alibaba.fluss.lake.committer.CommittedLakeSnapshot</exclude>
                                         
<exclude>com.alibaba.fluss.lake.paimon.FlussDataTypeToPaimonDataType</exclude>
@@ -361,7 +410,7 @@
                                         
<exclude>com.alibaba.fluss.flink.tiering.committer.CommittableMessageTypeInfo*
                                         </exclude>
                                         <exclude>
-                                            
com.alibaba.fluss.flink.tiering.committer.LakeTieringCommitOperatorFactory
+                                            
com.alibaba.fluss.flink.tiering.LakeTieringJobBuilder
                                         </exclude>
                                         
<exclude>com.alibaba.fluss.flink.tiering.FlussLakeTieringEntrypoint</exclude>
                                         <!-- end exclude for flink tiering 
service -->
diff --git a/pom.xml b/pom.xml
index c033ea79b..c91242dfc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,6 +89,7 @@
         <test.skip.coverage>true</test.skip.coverage>
         <fluss.forkCount>2</fluss.forkCount>
         <fluss.reuseForks>true</fluss.reuseForks>
+        <skip.on.java8>false</skip.on.java8>
 
         <!-- library versions in fluss-shaded -->
         <fluss.shaded.version>2.2</fluss.shaded.version>
@@ -448,6 +449,7 @@
             <id>java8</id>
             <properties>
                 <target.java.version>1.8</target.java.version>
+                <skip.on.java8>true</skip.on.java8>
             </properties>
             <build>
                 <plugins>
@@ -467,6 +469,7 @@
                 </plugins>
             </build>
         </profile>
+
         <profile>
             <id>test-coverage</id>
             <properties>

Reply via email to