This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f9de7085 [flink] Union read in stream mode should support read from 
given timestamp (#1598)
1f9de7085 is described below

commit 1f9de708519cd72314790b8934976e382ef3b179
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Aug 27 18:10:56 2025 +0800

    [flink] Union read in stream mode should support read from given timestamp 
(#1598)
---
 .../fluss/flink/source/FlinkTableSource.java       |  53 +++++-
 .../apache/fluss/flink/utils/LakeSourceUtils.java  |  21 ++-
 .../source/testutils/FlinkRowAssertionsUtils.java  |   9 +
 .../flink/FlinkUnionReadFromTimestampITCase.java   | 190 +++++++++++++++++++++
 .../lake/paimon/flink/FlinkUnionReadTestBase.java  |  16 +-
 .../testutils/FlinkPaimonTieringTestBase.java      |  30 ++--
 .../lake/paimon/tiering/PaimonTieringITCase.java   |  17 +-
 .../tiering/ReCreateSameTableAfterTieringTest.java |  18 +-
 .../fluss/server/log/remote/RemoteLogManager.java  |   8 +-
 .../fluss/server/replica/ReplicaManager.java       |   2 +-
 10 files changed, 336 insertions(+), 28 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index ef0d04a25..5739381d3 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -32,8 +32,12 @@ import org.apache.fluss.lake.source.LakeSource;
 import org.apache.fluss.lake.source.LakeSplit;
 import org.apache.fluss.metadata.MergeEngineType;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.predicate.GreaterOrEqual;
+import org.apache.fluss.predicate.LeafPredicate;
 import org.apache.fluss.predicate.Predicate;
 import org.apache.fluss.predicate.PredicateBuilder;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.types.DataTypes;
 import org.apache.fluss.types.RowType;
 
 import org.apache.flink.annotation.VisibleForTesting;
@@ -86,7 +90,9 @@ import static 
org.apache.fluss.flink.utils.LakeSourceUtils.createLakeSource;
 import static 
org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
 import static 
org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLUSS_INTERNAL_VALUE;
 import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals;
+import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
 
 /** Flink table source to scan Fluss data. */
 public class FlinkTableSource
@@ -258,12 +264,15 @@ public class FlinkTableSource
             flussRowType = flussRowType.project(projectedFields);
         }
         OffsetsInitializer offsetsInitializer;
+        boolean enableLakeSource = lakeSource != null;
         switch (startupOptions.startupMode) {
             case EARLIEST:
                 offsetsInitializer = OffsetsInitializer.earliest();
                 break;
             case LATEST:
                 offsetsInitializer = OffsetsInitializer.latest();
+                // since it's scan from latest, don't consider lake data
+                enableLakeSource = false;
                 break;
             case FULL:
                 offsetsInitializer = OffsetsInitializer.full();
@@ -271,6 +280,18 @@ public class FlinkTableSource
             case TIMESTAMP:
                 offsetsInitializer =
                         
OffsetsInitializer.timestamp(startupOptions.startupTimestampMs);
+                if (hasPrimaryKey()) {
+                    // Currently, for primary key tables, we do not consider 
lake data
+                    // when reading from a given timestamp. This is because we 
will need
+                    // to read the change log of primary key table.
+                    // TODO: consider support it using paimon change log data?
+                    enableLakeSource = false;
+                } else {
+                    if (enableLakeSource) {
+                        enableLakeSource =
+                                pushTimeStampFilterToLakeSource(lakeSource, 
flussRowType);
+                    }
+                }
                 break;
             default:
                 throw new IllegalArgumentException(
@@ -290,7 +311,7 @@ public class FlinkTableSource
                         new RowDataDeserializationSchema(),
                         streaming,
                         partitionFilters,
-                        lakeSource);
+                        enableLakeSource ? lakeSource : null);
 
         if (!streaming) {
             // return a bounded source provide to make planner happy,
@@ -321,6 +342,36 @@ public class FlinkTableSource
         }
     }
 
+    private boolean pushTimeStampFilterToLakeSource(
+            LakeSource<?> lakeSource, RowType flussRowType) {
+        // will push timestamp to lake
+        // we will have three additional system columns, __bucket, __offset, 
__timestamp
+        // in lake, get the  __timestamp index in lake table
+        final int timestampFieldIndex = flussRowType.getFieldCount() + 2;
+        Predicate timestampFilter =
+                new LeafPredicate(
+                        GreaterOrEqual.INSTANCE,
+                        DataTypes.TIMESTAMP_LTZ(),
+                        timestampFieldIndex,
+                        TIMESTAMP_COLUMN_NAME,
+                        Collections.singletonList(
+                                
TimestampLtz.fromEpochMillis(startupOptions.startupTimestampMs)));
+        List<Predicate> acceptedPredicates =
+                lakeSource
+                        
.withFilters(Collections.singletonList(timestampFilter))
+                        .acceptedPredicates();
+        if (acceptedPredicates.isEmpty()) {
+            LOG.warn(
+                    "The lake source doesn't accept the filter {}, won't read 
data from lake.",
+                    timestampFilter);
+            return false;
+        }
+        checkState(
+                acceptedPredicates.size() == 1
+                        && acceptedPredicates.get(0).equals(timestampFilter));
+        return true;
+    }
+
     @Override
     public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
         LookupNormalizer lookupNormalizer =
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
index 88e0b5ddf..0f1796a78 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
@@ -26,6 +26,11 @@ import org.apache.fluss.lake.source.LakeSource;
 import org.apache.fluss.lake.source.LakeSplit;
 import org.apache.fluss.metadata.TablePath;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
 import java.util.Map;
 
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -33,7 +38,14 @@ import static 
org.apache.fluss.utils.Preconditions.checkNotNull;
 /** Utils for create lake source. */
 public class LakeSourceUtils {
 
+    public static final Logger LOG = 
LoggerFactory.getLogger(LakeSourceUtils.class);
+
+    /**
+     * Return the lake source of the given table. Return null when the lake 
storage doesn't support
+     * create lake source.
+     */
     @SuppressWarnings("unchecked")
+    @Nullable
     public static LakeSource<LakeSplit> createLakeSource(
             TablePath tablePath, Map<String, String> properties) {
         Map<String, String> catalogProperties =
@@ -47,6 +59,13 @@ public class LakeSourceUtils {
         LakeStoragePlugin lakeStoragePlugin =
                 LakeStoragePluginSetUp.fromDataLakeFormat(dataLake, null);
         LakeStorage lakeStorage = 
checkNotNull(lakeStoragePlugin).createLakeStorage(lakeConfig);
-        return (LakeSource<LakeSplit>) lakeStorage.createLakeSource(tablePath);
+        try {
+            return (LakeSource<LakeSplit>) 
lakeStorage.createLakeSource(tablePath);
+        } catch (UnsupportedOperationException e) {
+            LOG.info(
+                    "method createLakeSource throw 
UnsupportedOperationException for datalake format {}, return null as lakeSource 
to disable reading from lake source.",
+                    dataLake);
+            return null;
+        }
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
index 2dede7ca9..092220244 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.util.CloseableIterator;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -31,6 +32,14 @@ public class FlinkRowAssertionsUtils {
 
     private FlinkRowAssertionsUtils() {}
 
+    public static void assertRowResultsIgnoreOrder(
+            CloseableIterator<Row> actual, List<Row> expectedRows, boolean 
closeIterator) {
+        assertResultsIgnoreOrder(
+                actual,
+                
expectedRows.stream().map(Row::toString).collect(Collectors.toList()),
+                closeIterator);
+    }
+
     public static void assertResultsIgnoreOrder(
             CloseableIterator<Row> iterator, List<String> expected, boolean 
closeIterator) {
         List<String> actual = collectRowsWithTimeout(iterator, 
expected.size(), closeIterator);
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
new file mode 100644
index 000000000..efa1a84df
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.paimon.flink;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.MemorySize;
+import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.replica.Replica;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.utils.clock.ManualClock;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
+import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+
+/** The ITCase for Flink union read from a timestamp. */
+class FlinkUnionReadFromTimestampITCase extends FlinkPaimonTieringTestBase {
+
+    private static final ManualClock CLOCK = new ManualClock();
+
+    @RegisterExtension
+    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+            FlussClusterExtension.builder()
+                    .setClusterConf(initConfig())
+                    .setNumOfTabletServers(3)
+                    .setClock(CLOCK)
+                    .build();
+
+    private StreamTableEnvironment streamTEnv;
+
+    protected static Configuration initConfig() {
+        Configuration configuration = FlinkPaimonTieringTestBase.initConfig();
+        // set file size to 10b to make log segment roll frequently
+        configuration.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, 
MemorySize.parse("10b"));
+        configuration.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION, 
Duration.ofMillis(100));
+        return configuration;
+    }
+
+    @BeforeAll
+    static void beforeAll() {
+        
FlinkPaimonTieringTestBase.beforeAll(FLUSS_CLUSTER_EXTENSION.getClientConfig());
+    }
+
+    @BeforeEach
+    public void beforeEach() {
+        super.beforeEach();
+        buildStreamTEnv();
+    }
+
+    @Override
+    protected FlussClusterExtension getFlussClusterExtension() {
+        return FLUSS_CLUSTER_EXTENSION;
+    }
+
+    @Test
+    void testUnionReadFromTimestamp() throws Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+        try {
+            String tableName = "logTable_read_timestamp";
+            TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+            long tableId = createLogTable(tablePath, 1);
+            TableBucket t1Bucket = new TableBucket(tableId, 0);
+
+            List<Row> rows = new ArrayList<>();
+            for (int i = 0; i < 10; i++) {
+                rows.addAll(writeRows(tablePath, 3));
+                // each round advance 1s to make sure each round of writing has
+                // different timestamp
+                CLOCK.advanceTime(Duration.ofSeconds(1));
+            }
+            assertReplicaStatus(t1Bucket, rows.size());
+
+            Replica t1Replica = 
FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(t1Bucket);
+
+            // wait util only 2(default keep 2 segments in local) log segments 
in local
+            waitUtil(
+                    () -> t1Replica.getLogTablet().logSegments().size() == 2,
+                    Duration.ofMinutes(1),
+                    "Fail to wait util only 2 segments in local.");
+
+            // advance 10 days to mock remote log ttl
+            CLOCK.advanceTime(Duration.ofDays(10));
+            // wait util remote log ttl, should can't fetch from remote log 
for offset 10
+            waitUtil(
+                    () -> !t1Replica.getLogTablet().canFetchFromRemoteLog(10),
+                    Duration.ofMinutes(1),
+                    "Fail to wait log offset 10 ttl from remote log.");
+
+            // verify scan from timestamp 0, should read full data
+            assertRowResultsIgnoreOrder(
+                    streamTEnv
+                            .executeSql(
+                                    "select * from "
+                                            + tableName
+                                            + " /*+ 
OPTIONS('scan.startup.mode' = 'timestamp',\n"
+                                            + "'scan.startup.timestamp' = '0') 
*/")
+                            .collect(),
+                    rows,
+                    true);
+
+            // verify scan from timestamp 2000, shouldn't read the rows 
written in first two
+            // rounds,
+            CloseableIterator<Row> actualRows =
+                    streamTEnv
+                            .executeSql(
+                                    "select * from "
+                                            + tableName
+                                            + " /*+ 
OPTIONS('scan.startup.mode' = 'timestamp',\n"
+                                            + "'scan.startup.timestamp' = 
'2000') */")
+                            .collect();
+            List<Row> expectedRows = rows.stream().skip(2 * 
3).collect(Collectors.toList());
+            assertRowResultsIgnoreOrder(actualRows, expectedRows, true);
+
+            // verify scan from earliest
+            assertRowResultsIgnoreOrder(
+                    streamTEnv
+                            .executeSql(
+                                    "select * from "
+                                            + tableName
+                                            + " /*+ 
OPTIONS('scan.startup.mode' = 'earliest') */")
+                            .collect(),
+                    rows,
+                    true);
+
+        } finally {
+            jobClient.cancel();
+        }
+    }
+
+    private List<Row> writeRows(TablePath tablePath, int rows) throws 
Exception {
+        List<InternalRow> writtenRows = new ArrayList<>();
+        List<Row> flinkRow = new ArrayList<>();
+        for (int i = 0; i < rows; i++) {
+            writtenRows.add(row(i, "v" + i));
+            flinkRow.add(Row.of(i, "v" + i));
+        }
+        writeRows(tablePath, writtenRows, true);
+        return flinkRow;
+    }
+
+    private void buildStreamTEnv() {
+        String bootstrapServers = String.join(",", 
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+        // create table environment
+        streamTEnv = StreamTableEnvironment.create(execEnv, 
EnvironmentSettings.inStreamingMode());
+        // crate catalog using sql
+        streamTEnv.executeSql(
+                String.format(
+                        "create catalog %s with ('type' = 'fluss', '%s' = 
'%s')",
+                        CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
+        streamTEnv.executeSql("use catalog " + CATALOG_NAME);
+        streamTEnv.executeSql("use " + DEFAULT_DB);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
index 1ad885da9..277593720 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
@@ -19,24 +19,33 @@ package org.apache.fluss.lake.paimon.flink;
 
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
 
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
 
 /** Base class for Flink union read test. */
 public class FlinkUnionReadTestBase extends FlinkPaimonTieringTestBase {
 
+    @RegisterExtension
+    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+            FlussClusterExtension.builder()
+                    .setClusterConf(initConfig())
+                    .setNumOfTabletServers(3)
+                    .build();
+
     protected static final int DEFAULT_BUCKET_NUM = 1;
     StreamTableEnvironment batchTEnv;
     StreamTableEnvironment streamTEnv;
 
     @BeforeAll
     protected static void beforeAll() {
-        FlinkPaimonTieringTestBase.beforeAll();
+        
FlinkPaimonTieringTestBase.beforeAll(FLUSS_CLUSTER_EXTENSION.getClientConfig());
     }
 
     @BeforeEach
@@ -46,6 +55,11 @@ public class FlinkUnionReadTestBase extends 
FlinkPaimonTieringTestBase {
         buildStreamTEnv();
     }
 
+    @Override
+    protected FlussClusterExtension getFlussClusterExtension() {
+        return FLUSS_CLUSTER_EXTENSION;
+    }
+
     private void buildStreamTEnv() {
         String bootstrapServers = String.join(",", 
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
         // create table environment
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index 39b1cd5f2..5c2ab1f1a 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -53,9 +53,7 @@ import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.CloseableIterator;
 import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.nio.file.Files;
 import java.time.Duration;
@@ -75,16 +73,9 @@ import static 
org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test base for sync to paimon by Flink. */
-public class FlinkPaimonTieringTestBase {
+public abstract class FlinkPaimonTieringTestBase {
     protected static final String DEFAULT_DB = "fluss";
 
-    @RegisterExtension
-    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
-            FlussClusterExtension.builder()
-                    .setClusterConf(initConfig())
-                    .setNumOfTabletServers(3)
-                    .build();
-
     protected static final String CATALOG_NAME = "testcatalog";
     protected StreamExecutionEnvironment execEnv;
 
@@ -94,7 +85,7 @@ public class FlinkPaimonTieringTestBase {
     protected static String warehousePath;
     protected static Catalog paimonCatalog;
 
-    private static Configuration initConfig() {
+    protected static Configuration initConfig() {
         Configuration conf = new Configuration();
         conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
                 // not to clean snapshots for test purpose
@@ -113,9 +104,8 @@ public class FlinkPaimonTieringTestBase {
         return conf;
     }
 
-    @BeforeAll
-    protected static void beforeAll() {
-        clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+    public static void beforeAll(Configuration conf) {
+        clientConf = conf;
         conn = ConnectionFactory.createConnection(clientConf);
         admin = conn.getAdmin();
         paimonCatalog = getPaimonCatalog();
@@ -158,6 +148,8 @@ public class FlinkPaimonTieringTestBase {
         }
     }
 
+    protected abstract FlussClusterExtension getFlussClusterExtension();
+
     protected long createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
             throws Exception {
         admin.createTable(tablePath, tableDescriptor, true).get();
@@ -167,7 +159,7 @@ public class FlinkPaimonTieringTestBase {
     protected void waitUntilSnapshot(long tableId, int bucketNum, long 
snapshotId) {
         for (int i = 0; i < bucketNum; i++) {
             TableBucket tableBucket = new TableBucket(tableId, i);
-            FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, 
snapshotId);
+            getFlussClusterExtension().waitUntilSnapshotFinished(tableBucket, 
snapshotId);
         }
     }
 
@@ -224,9 +216,9 @@ public class FlinkPaimonTieringTestBase {
                 
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
     }
 
-    public static Map<Long, String> waitUntilPartitions(TablePath tablePath) {
+    public Map<Long, String> waitUntilPartitions(TablePath tablePath) {
         return waitUntilPartitions(
-                FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(),
+                getFlussClusterExtension().getZooKeeperClient(),
                 tablePath,
                 
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
     }
@@ -255,7 +247,7 @@ public class FlinkPaimonTieringTestBase {
     }
 
     protected Replica getLeaderReplica(TableBucket tableBucket) {
-        return FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket);
+        return getFlussClusterExtension().waitAndGetLeaderReplica(tableBucket);
     }
 
     protected long createLogTable(TablePath tablePath) throws Exception {
@@ -367,7 +359,7 @@ public class FlinkPaimonTieringTestBase {
             Map<TableBucket, Long> expectedLogEndOffset) {
         if (isPartitioned) {
             Map<Long, String> partitionById =
-                    
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), tablePath);
+                    
waitUntilPartitions(getFlussClusterExtension().getZooKeeperClient(), tablePath);
             for (Long partitionId : partitionById.keySet()) {
                 for (int i = 0; i < bucketCount; i++) {
                     TableBucket tableBucket = new TableBucket(tableId, 
partitionId, i);
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
index 0a4c3e0c9..301427186 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
@@ -25,6 +25,7 @@ import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
 import org.apache.fluss.types.DataTypes;
 import org.apache.fluss.utils.types.Tuple2;
 
@@ -37,6 +38,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.CloseableIterator;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -54,6 +56,13 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** IT case for tiering tables to paimon. */
 class PaimonTieringITCase extends FlinkPaimonTieringTestBase {
 
+    @RegisterExtension
+    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+            FlussClusterExtension.builder()
+                    .setClusterConf(initConfig())
+                    .setNumOfTabletServers(3)
+                    .build();
+
     protected static final String DEFAULT_DB = "fluss";
 
     private static StreamExecutionEnvironment execEnv;
@@ -61,7 +70,8 @@ class PaimonTieringITCase extends FlinkPaimonTieringTestBase {
 
     @BeforeAll
     protected static void beforeAll() {
-        FlinkPaimonTieringTestBase.beforeAll();
+        
FlinkPaimonTieringTestBase.beforeAll(FLUSS_CLUSTER_EXTENSION.getClientConfig());
+
         execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
         execEnv.setParallelism(2);
         execEnv.enableCheckpointing(1000);
@@ -252,4 +262,9 @@ class PaimonTieringITCase extends 
FlinkPaimonTieringTestBase {
                                         .plan());
         return reader.toCloseableIterator();
     }
+
+    @Override
+    protected FlussClusterExtension getFlussClusterExtension() {
+        return FLUSS_CLUSTER_EXTENSION;
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
index a3dd5a244..5a0b8ccd9 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
@@ -21,11 +21,13 @@ import 
org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
 
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.Arrays;
 import java.util.List;
@@ -34,13 +36,22 @@ import static org.apache.fluss.testutils.DataTestUtils.row;
 
 /** A Test case for dropping a pktable after tiering and creating one with the 
same tablePath. */
 class ReCreateSameTableAfterTieringTest extends FlinkPaimonTieringTestBase {
+
+    @RegisterExtension
+    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+            FlussClusterExtension.builder()
+                    .setClusterConf(initConfig())
+                    .setNumOfTabletServers(3)
+                    .build();
+
     protected static final String DEFAULT_DB = "fluss";
 
     private static StreamExecutionEnvironment execEnv;
 
     @BeforeAll
     protected static void beforeAll() {
-        FlinkPaimonTieringTestBase.beforeAll();
+        
FlinkPaimonTieringTestBase.beforeAll(FLUSS_CLUSTER_EXTENSION.getClientConfig());
+
         execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
         execEnv.setParallelism(2);
         execEnv.enableCheckpointing(1000);
@@ -82,4 +93,9 @@ class ReCreateSameTableAfterTieringTest extends 
FlinkPaimonTieringTestBase {
         // stop the tiering job
         jobClient.cancel().get();
     }
+
+    @Override
+    protected FlussClusterExtension getFlussClusterExtension() {
+        return FLUSS_CLUSTER_EXTENSION;
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
index 1bca4e363..cc23be0be 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
@@ -33,7 +33,6 @@ import 
org.apache.fluss.server.zk.data.RemoteLogManifestHandle;
 import org.apache.fluss.utils.IOUtils;
 import org.apache.fluss.utils.MapUtils;
 import org.apache.fluss.utils.clock.Clock;
-import org.apache.fluss.utils.clock.SystemClock;
 import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
 
 import org.slf4j.Logger;
@@ -81,7 +80,10 @@ public class RemoteLogManager implements Closeable {
     private final Map<TableBucket, RemoteLogTablet> remoteLogs = 
MapUtils.newConcurrentHashMap();
 
     public RemoteLogManager(
-            Configuration conf, ZooKeeperClient zkClient, CoordinatorGateway 
coordinatorGateway)
+            Configuration conf,
+            ZooKeeperClient zkClient,
+            CoordinatorGateway coordinatorGateway,
+            Clock clock)
             throws IOException {
         this(
                 conf,
@@ -91,7 +93,7 @@ public class RemoteLogManager implements Closeable {
                 Executors.newScheduledThreadPool(
                         
conf.getInt(ConfigOptions.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE),
                         new 
ExecutorThreadFactory(RLM_SCHEDULED_THREAD_PREFIX)),
-                SystemClock.getInstance());
+                clock);
     }
 
     @VisibleForTesting
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index e85985767..26b31bdd5 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -211,7 +211,7 @@ public class ReplicaManager {
                 completedKvSnapshotCommitter,
                 fatalErrorHandler,
                 serverMetricGroup,
-                new RemoteLogManager(conf, zkClient, coordinatorGateway),
+                new RemoteLogManager(conf, zkClient, coordinatorGateway, 
clock),
                 clock);
     }
 

Reply via email to