This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 1f9de7085 [flink] Union read in stream mode should support read from
given timestamp (#1598)
1f9de7085 is described below
commit 1f9de708519cd72314790b8934976e382ef3b179
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Aug 27 18:10:56 2025 +0800
[flink] Union read in stream mode should support read from given timestamp
(#1598)
---
.../fluss/flink/source/FlinkTableSource.java | 53 +++++-
.../apache/fluss/flink/utils/LakeSourceUtils.java | 21 ++-
.../source/testutils/FlinkRowAssertionsUtils.java | 9 +
.../flink/FlinkUnionReadFromTimestampITCase.java | 190 +++++++++++++++++++++
.../lake/paimon/flink/FlinkUnionReadTestBase.java | 16 +-
.../testutils/FlinkPaimonTieringTestBase.java | 30 ++--
.../lake/paimon/tiering/PaimonTieringITCase.java | 17 +-
.../tiering/ReCreateSameTableAfterTieringTest.java | 18 +-
.../fluss/server/log/remote/RemoteLogManager.java | 8 +-
.../fluss/server/replica/ReplicaManager.java | 2 +-
10 files changed, 336 insertions(+), 28 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index ef0d04a25..5739381d3 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -32,8 +32,12 @@ import org.apache.fluss.lake.source.LakeSource;
import org.apache.fluss.lake.source.LakeSplit;
import org.apache.fluss.metadata.MergeEngineType;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.predicate.GreaterOrEqual;
+import org.apache.fluss.predicate.LeafPredicate;
import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.predicate.PredicateBuilder;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.types.DataTypes;
import org.apache.fluss.types.RowType;
import org.apache.flink.annotation.VisibleForTesting;
@@ -86,7 +90,9 @@ import static
org.apache.fluss.flink.utils.LakeSourceUtils.createLakeSource;
import static
org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
import static
org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLUSS_INTERNAL_VALUE;
import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals;
+import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
/** Flink table source to scan Fluss data. */
public class FlinkTableSource
@@ -258,12 +264,15 @@ public class FlinkTableSource
flussRowType = flussRowType.project(projectedFields);
}
OffsetsInitializer offsetsInitializer;
+ boolean enableLakeSource = lakeSource != null;
switch (startupOptions.startupMode) {
case EARLIEST:
offsetsInitializer = OffsetsInitializer.earliest();
break;
case LATEST:
offsetsInitializer = OffsetsInitializer.latest();
+ // since it's scan from latest, don't consider lake data
+ enableLakeSource = false;
break;
case FULL:
offsetsInitializer = OffsetsInitializer.full();
@@ -271,6 +280,18 @@ public class FlinkTableSource
case TIMESTAMP:
offsetsInitializer =
OffsetsInitializer.timestamp(startupOptions.startupTimestampMs);
+ if (hasPrimaryKey()) {
+ // Currently, for primary key tables, we do not consider
lake data
+ // when reading from a given timestamp. This is because we
will need
+ // to read the change log of primary key table.
+ // TODO: consider support it using paimon change log data?
+ enableLakeSource = false;
+ } else {
+ if (enableLakeSource) {
+ enableLakeSource =
+ pushTimeStampFilterToLakeSource(lakeSource,
flussRowType);
+ }
+ }
break;
default:
throw new IllegalArgumentException(
@@ -290,7 +311,7 @@ public class FlinkTableSource
new RowDataDeserializationSchema(),
streaming,
partitionFilters,
- lakeSource);
+ enableLakeSource ? lakeSource : null);
if (!streaming) {
// return a bounded source provide to make planner happy,
@@ -321,6 +342,36 @@ public class FlinkTableSource
}
}
+ private boolean pushTimeStampFilterToLakeSource(
+ LakeSource<?> lakeSource, RowType flussRowType) {
+ // will push timestamp to lake
+ // we will have three additional system columns, __bucket, __offset,
__timestamp
+ // in lake, get the __timestamp index in lake table
+ final int timestampFieldIndex = flussRowType.getFieldCount() + 2;
+ Predicate timestampFilter =
+ new LeafPredicate(
+ GreaterOrEqual.INSTANCE,
+ DataTypes.TIMESTAMP_LTZ(),
+ timestampFieldIndex,
+ TIMESTAMP_COLUMN_NAME,
+ Collections.singletonList(
+
TimestampLtz.fromEpochMillis(startupOptions.startupTimestampMs)));
+ List<Predicate> acceptedPredicates =
+ lakeSource
+
.withFilters(Collections.singletonList(timestampFilter))
+ .acceptedPredicates();
+ if (acceptedPredicates.isEmpty()) {
+ LOG.warn(
+ "The lake source doesn't accept the filter {}, won't read
data from lake.",
+ timestampFilter);
+ return false;
+ }
+ checkState(
+ acceptedPredicates.size() == 1
+ && acceptedPredicates.get(0).equals(timestampFilter));
+ return true;
+ }
+
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext
context) {
LookupNormalizer lookupNormalizer =
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
index 88e0b5ddf..0f1796a78 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
@@ -26,6 +26,11 @@ import org.apache.fluss.lake.source.LakeSource;
import org.apache.fluss.lake.source.LakeSplit;
import org.apache.fluss.metadata.TablePath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
import java.util.Map;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -33,7 +38,14 @@ import static
org.apache.fluss.utils.Preconditions.checkNotNull;
/** Utils for create lake source. */
public class LakeSourceUtils {
+ public static final Logger LOG =
LoggerFactory.getLogger(LakeSourceUtils.class);
+
+ /**
+ * Return the lake source of the given table. Return null when the lake
storage doesn't support
+ * create lake source.
+ */
@SuppressWarnings("unchecked")
+ @Nullable
public static LakeSource<LakeSplit> createLakeSource(
TablePath tablePath, Map<String, String> properties) {
Map<String, String> catalogProperties =
@@ -47,6 +59,13 @@ public class LakeSourceUtils {
LakeStoragePlugin lakeStoragePlugin =
LakeStoragePluginSetUp.fromDataLakeFormat(dataLake, null);
LakeStorage lakeStorage =
checkNotNull(lakeStoragePlugin).createLakeStorage(lakeConfig);
- return (LakeSource<LakeSplit>) lakeStorage.createLakeSource(tablePath);
+ try {
+ return (LakeSource<LakeSplit>)
lakeStorage.createLakeSource(tablePath);
+ } catch (UnsupportedOperationException e) {
+ LOG.info(
+ "method createLakeSource throw
UnsupportedOperationException for datalake format {}, return null as lakeSource
to disable reading from lake source.",
+ dataLake);
+ return null;
+ }
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
index 2dede7ca9..092220244 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.util.CloseableIterator;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -31,6 +32,14 @@ public class FlinkRowAssertionsUtils {
private FlinkRowAssertionsUtils() {}
+ public static void assertRowResultsIgnoreOrder(
+ CloseableIterator<Row> actual, List<Row> expectedRows, boolean
closeIterator) {
+ assertResultsIgnoreOrder(
+ actual,
+
expectedRows.stream().map(Row::toString).collect(Collectors.toList()),
+ closeIterator);
+ }
+
public static void assertResultsIgnoreOrder(
CloseableIterator<Row> iterator, List<String> expected, boolean
closeIterator) {
List<String> actual = collectRowsWithTimeout(iterator,
expected.size(), closeIterator);
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
new file mode 100644
index 000000000..efa1a84df
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.paimon.flink;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.MemorySize;
+import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.replica.Replica;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.utils.clock.ManualClock;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
+import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+
+/** The ITCase for Flink union read from a timestamp. */
+class FlinkUnionReadFromTimestampITCase extends FlinkPaimonTieringTestBase {
+
+ private static final ManualClock CLOCK = new ManualClock();
+
+ @RegisterExtension
+ public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+ FlussClusterExtension.builder()
+ .setClusterConf(initConfig())
+ .setNumOfTabletServers(3)
+ .setClock(CLOCK)
+ .build();
+
+ private StreamTableEnvironment streamTEnv;
+
+ protected static Configuration initConfig() {
+ Configuration configuration = FlinkPaimonTieringTestBase.initConfig();
+ // set file size to 10b to make log segment roll frequently
+ configuration.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE,
MemorySize.parse("10b"));
+ configuration.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION,
Duration.ofMillis(100));
+ return configuration;
+ }
+
+ @BeforeAll
+ static void beforeAll() {
+
FlinkPaimonTieringTestBase.beforeAll(FLUSS_CLUSTER_EXTENSION.getClientConfig());
+ }
+
+ @BeforeEach
+ public void beforeEach() {
+ super.beforeEach();
+ buildStreamTEnv();
+ }
+
+ @Override
+ protected FlussClusterExtension getFlussClusterExtension() {
+ return FLUSS_CLUSTER_EXTENSION;
+ }
+
+ @Test
+ void testUnionReadFromTimestamp() throws Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+ try {
+ String tableName = "logTable_read_timestamp";
+ TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+ long tableId = createLogTable(tablePath, 1);
+ TableBucket t1Bucket = new TableBucket(tableId, 0);
+
+ List<Row> rows = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ rows.addAll(writeRows(tablePath, 3));
+ // each round advance 1s to make sure each round of writing has
+ // different timestamp
+ CLOCK.advanceTime(Duration.ofSeconds(1));
+ }
+ assertReplicaStatus(t1Bucket, rows.size());
+
+ Replica t1Replica =
FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(t1Bucket);
+
+ // wait util only 2(default keep 2 segments in local) log segments
in local
+ waitUtil(
+ () -> t1Replica.getLogTablet().logSegments().size() == 2,
+ Duration.ofMinutes(1),
+ "Fail to wait util only 2 segments in local.");
+
+ // advance 10 days to mock remote log ttl
+ CLOCK.advanceTime(Duration.ofDays(10));
+ // wait util remote log ttl, should can't fetch from remote log
for offset 10
+ waitUtil(
+ () -> !t1Replica.getLogTablet().canFetchFromRemoteLog(10),
+ Duration.ofMinutes(1),
+ "Fail to wait log offset 10 ttl from remote log.");
+
+ // verify scan from timestamp 0, should read full data
+ assertRowResultsIgnoreOrder(
+ streamTEnv
+ .executeSql(
+ "select * from "
+ + tableName
+ + " /*+
OPTIONS('scan.startup.mode' = 'timestamp',\n"
+ + "'scan.startup.timestamp' = '0')
*/")
+ .collect(),
+ rows,
+ true);
+
+ // verify scan from timestamp 2000, shouldn't read the rows
written in first two
+ // rounds,
+ CloseableIterator<Row> actualRows =
+ streamTEnv
+ .executeSql(
+ "select * from "
+ + tableName
+ + " /*+
OPTIONS('scan.startup.mode' = 'timestamp',\n"
+ + "'scan.startup.timestamp' =
'2000') */")
+ .collect();
+ List<Row> expectedRows = rows.stream().skip(2 *
3).collect(Collectors.toList());
+ assertRowResultsIgnoreOrder(actualRows, expectedRows, true);
+
+ // verify scan from earliest
+ assertRowResultsIgnoreOrder(
+ streamTEnv
+ .executeSql(
+ "select * from "
+ + tableName
+ + " /*+
OPTIONS('scan.startup.mode' = 'earliest') */")
+ .collect(),
+ rows,
+ true);
+
+ } finally {
+ jobClient.cancel();
+ }
+ }
+
+ private List<Row> writeRows(TablePath tablePath, int rows) throws
Exception {
+ List<InternalRow> writtenRows = new ArrayList<>();
+ List<Row> flinkRow = new ArrayList<>();
+ for (int i = 0; i < rows; i++) {
+ writtenRows.add(row(i, "v" + i));
+ flinkRow.add(Row.of(i, "v" + i));
+ }
+ writeRows(tablePath, writtenRows, true);
+ return flinkRow;
+ }
+
+ private void buildStreamTEnv() {
+ String bootstrapServers = String.join(",",
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+ // create table environment
+ streamTEnv = StreamTableEnvironment.create(execEnv,
EnvironmentSettings.inStreamingMode());
+ // crate catalog using sql
+ streamTEnv.executeSql(
+ String.format(
+ "create catalog %s with ('type' = 'fluss', '%s' =
'%s')",
+ CATALOG_NAME, BOOTSTRAP_SERVERS.key(),
bootstrapServers));
+ streamTEnv.executeSql("use catalog " + CATALOG_NAME);
+ streamTEnv.executeSql("use " + DEFAULT_DB);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
index 1ad885da9..277593720 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
@@ -19,24 +19,33 @@ package org.apache.fluss.lake.paimon.flink;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
/** Base class for Flink union read test. */
public class FlinkUnionReadTestBase extends FlinkPaimonTieringTestBase {
+ @RegisterExtension
+ public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+ FlussClusterExtension.builder()
+ .setClusterConf(initConfig())
+ .setNumOfTabletServers(3)
+ .build();
+
protected static final int DEFAULT_BUCKET_NUM = 1;
StreamTableEnvironment batchTEnv;
StreamTableEnvironment streamTEnv;
@BeforeAll
protected static void beforeAll() {
- FlinkPaimonTieringTestBase.beforeAll();
+
FlinkPaimonTieringTestBase.beforeAll(FLUSS_CLUSTER_EXTENSION.getClientConfig());
}
@BeforeEach
@@ -46,6 +55,11 @@ public class FlinkUnionReadTestBase extends
FlinkPaimonTieringTestBase {
buildStreamTEnv();
}
+ @Override
+ protected FlussClusterExtension getFlussClusterExtension() {
+ return FLUSS_CLUSTER_EXTENSION;
+ }
+
private void buildStreamTEnv() {
String bootstrapServers = String.join(",",
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
// create table environment
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index 39b1cd5f2..5c2ab1f1a 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -53,9 +53,7 @@ import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.CloseableIterator;
import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.extension.RegisterExtension;
import java.nio.file.Files;
import java.time.Duration;
@@ -75,16 +73,9 @@ import static
org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
import static org.assertj.core.api.Assertions.assertThat;
/** Test base for sync to paimon by Flink. */
-public class FlinkPaimonTieringTestBase {
+public abstract class FlinkPaimonTieringTestBase {
protected static final String DEFAULT_DB = "fluss";
- @RegisterExtension
- public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
- FlussClusterExtension.builder()
- .setClusterConf(initConfig())
- .setNumOfTabletServers(3)
- .build();
-
protected static final String CATALOG_NAME = "testcatalog";
protected StreamExecutionEnvironment execEnv;
@@ -94,7 +85,7 @@ public class FlinkPaimonTieringTestBase {
protected static String warehousePath;
protected static Catalog paimonCatalog;
- private static Configuration initConfig() {
+ protected static Configuration initConfig() {
Configuration conf = new Configuration();
conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
// not to clean snapshots for test purpose
@@ -113,9 +104,8 @@ public class FlinkPaimonTieringTestBase {
return conf;
}
- @BeforeAll
- protected static void beforeAll() {
- clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+ public static void beforeAll(Configuration conf) {
+ clientConf = conf;
conn = ConnectionFactory.createConnection(clientConf);
admin = conn.getAdmin();
paimonCatalog = getPaimonCatalog();
@@ -158,6 +148,8 @@ public class FlinkPaimonTieringTestBase {
}
}
+ protected abstract FlussClusterExtension getFlussClusterExtension();
+
protected long createTable(TablePath tablePath, TableDescriptor
tableDescriptor)
throws Exception {
admin.createTable(tablePath, tableDescriptor, true).get();
@@ -167,7 +159,7 @@ public class FlinkPaimonTieringTestBase {
protected void waitUntilSnapshot(long tableId, int bucketNum, long
snapshotId) {
for (int i = 0; i < bucketNum; i++) {
TableBucket tableBucket = new TableBucket(tableId, i);
- FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket,
snapshotId);
+ getFlussClusterExtension().waitUntilSnapshotFinished(tableBucket,
snapshotId);
}
}
@@ -224,9 +216,9 @@ public class FlinkPaimonTieringTestBase {
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
}
- public static Map<Long, String> waitUntilPartitions(TablePath tablePath) {
+ public Map<Long, String> waitUntilPartitions(TablePath tablePath) {
return waitUntilPartitions(
- FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(),
+ getFlussClusterExtension().getZooKeeperClient(),
tablePath,
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
}
@@ -255,7 +247,7 @@ public class FlinkPaimonTieringTestBase {
}
protected Replica getLeaderReplica(TableBucket tableBucket) {
- return FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket);
+ return getFlussClusterExtension().waitAndGetLeaderReplica(tableBucket);
}
protected long createLogTable(TablePath tablePath) throws Exception {
@@ -367,7 +359,7 @@ public class FlinkPaimonTieringTestBase {
Map<TableBucket, Long> expectedLogEndOffset) {
if (isPartitioned) {
Map<Long, String> partitionById =
-
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), tablePath);
+
waitUntilPartitions(getFlussClusterExtension().getZooKeeperClient(), tablePath);
for (Long partitionId : partitionById.keySet()) {
for (int i = 0; i < bucketCount; i++) {
TableBucket tableBucket = new TableBucket(tableId,
partitionId, i);
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
index 0a4c3e0c9..301427186 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
@@ -25,6 +25,7 @@ import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.utils.types.Tuple2;
@@ -37,6 +38,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.CloseableIterator;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.time.Duration;
import java.util.ArrayList;
@@ -54,6 +56,13 @@ import static org.assertj.core.api.Assertions.assertThat;
/** IT case for tiering tables to paimon. */
class PaimonTieringITCase extends FlinkPaimonTieringTestBase {
+ @RegisterExtension
+ public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+ FlussClusterExtension.builder()
+ .setClusterConf(initConfig())
+ .setNumOfTabletServers(3)
+ .build();
+
protected static final String DEFAULT_DB = "fluss";
private static StreamExecutionEnvironment execEnv;
@@ -61,7 +70,8 @@ class PaimonTieringITCase extends FlinkPaimonTieringTestBase {
@BeforeAll
protected static void beforeAll() {
- FlinkPaimonTieringTestBase.beforeAll();
+
FlinkPaimonTieringTestBase.beforeAll(FLUSS_CLUSTER_EXTENSION.getClientConfig());
+
execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.setParallelism(2);
execEnv.enableCheckpointing(1000);
@@ -252,4 +262,9 @@ class PaimonTieringITCase extends
FlinkPaimonTieringTestBase {
.plan());
return reader.toCloseableIterator();
}
+
+ @Override
+ protected FlussClusterExtension getFlussClusterExtension() {
+ return FLUSS_CLUSTER_EXTENSION;
+ }
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
index a3dd5a244..5a0b8ccd9 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
@@ -21,11 +21,13 @@ import
org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.Arrays;
import java.util.List;
@@ -34,13 +36,22 @@ import static org.apache.fluss.testutils.DataTestUtils.row;
/** A Test case for dropping a pktable after tiering and creating one with the
same tablePath. */
class ReCreateSameTableAfterTieringTest extends FlinkPaimonTieringTestBase {
+
+ @RegisterExtension
+ public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+ FlussClusterExtension.builder()
+ .setClusterConf(initConfig())
+ .setNumOfTabletServers(3)
+ .build();
+
protected static final String DEFAULT_DB = "fluss";
private static StreamExecutionEnvironment execEnv;
@BeforeAll
protected static void beforeAll() {
- FlinkPaimonTieringTestBase.beforeAll();
+
FlinkPaimonTieringTestBase.beforeAll(FLUSS_CLUSTER_EXTENSION.getClientConfig());
+
execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.setParallelism(2);
execEnv.enableCheckpointing(1000);
@@ -82,4 +93,9 @@ class ReCreateSameTableAfterTieringTest extends
FlinkPaimonTieringTestBase {
// stop the tiering job
jobClient.cancel().get();
}
+
+ @Override
+ protected FlussClusterExtension getFlussClusterExtension() {
+ return FLUSS_CLUSTER_EXTENSION;
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
index 1bca4e363..cc23be0be 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
@@ -33,7 +33,6 @@ import
org.apache.fluss.server.zk.data.RemoteLogManifestHandle;
import org.apache.fluss.utils.IOUtils;
import org.apache.fluss.utils.MapUtils;
import org.apache.fluss.utils.clock.Clock;
-import org.apache.fluss.utils.clock.SystemClock;
import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
@@ -81,7 +80,10 @@ public class RemoteLogManager implements Closeable {
private final Map<TableBucket, RemoteLogTablet> remoteLogs =
MapUtils.newConcurrentHashMap();
public RemoteLogManager(
- Configuration conf, ZooKeeperClient zkClient, CoordinatorGateway
coordinatorGateway)
+ Configuration conf,
+ ZooKeeperClient zkClient,
+ CoordinatorGateway coordinatorGateway,
+ Clock clock)
throws IOException {
this(
conf,
@@ -91,7 +93,7 @@ public class RemoteLogManager implements Closeable {
Executors.newScheduledThreadPool(
conf.getInt(ConfigOptions.REMOTE_LOG_MANAGER_THREAD_POOL_SIZE),
new
ExecutorThreadFactory(RLM_SCHEDULED_THREAD_PREFIX)),
- SystemClock.getInstance());
+ clock);
}
@VisibleForTesting
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index e85985767..26b31bdd5 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -211,7 +211,7 @@ public class ReplicaManager {
completedKvSnapshotCommitter,
fatalErrorHandler,
serverMetricGroup,
- new RemoteLogManager(conf, zkClient, coordinatorGateway),
+ new RemoteLogManager(conf, zkClient, coordinatorGateway,
clock),
clock);
}