This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new f8d389c5 [lake] TieringEnumerator will always fail to generate splits
when re-create table with same name (#1244)
f8d389c5 is described below
commit f8d389c5f3450f4ca3aaa2c13fd788454a0a1d72
Author: naivedogger <[email protected]>
AuthorDate: Thu Jul 3 12:10:53 2025 +0800
[lake] TieringEnumerator will always fail to generate splits when re-create
table with same name (#1244)
---
.../com/alibaba/fluss/client/admin/FlussAdmin.java | 5 +-
.../testutils/FlinkPaimonTieringTestBase.java | 44 +++++++++++
.../lake/paimon/tiering/PaimonTieringITCase.java | 23 ------
.../tiering/ReCreateSameTableAfterTieringTest.java | 85 ++++++++++++++++++++++
4 files changed, 131 insertions(+), 26 deletions(-)
diff --git
a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java
b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java
index 01a60f42..414e0c25 100644
--- a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java
+++ b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java
@@ -377,12 +377,11 @@ public class FlussAdmin implements Admin {
Collection<Integer> buckets,
OffsetSpec offsetSpec) {
Long partitionId = null;
- metadataUpdater.checkAndUpdateTableMetadata(
- Collections.singleton(physicalTablePath.getTablePath()));
+
metadataUpdater.updateTableOrPartitionMetadata(physicalTablePath.getTablePath(),
null);
long tableId =
metadataUpdater.getTableId(physicalTablePath.getTablePath());
// if partition name is not null, we need to check and update
partition metadata
if (physicalTablePath.getPartitionName() != null) {
- metadataUpdater.checkAndUpdatePartitionMetadata(physicalTablePath);
+
metadataUpdater.updatePhysicalTableMetadata(Collections.singleton(physicalTablePath));
partitionId =
metadataUpdater.getPartitionIdOrElseThrow(physicalTablePath);
}
Map<Integer, ListOffsetsRequest> requestMap =
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index d82ec2a5..f8a8641c 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -46,7 +46,11 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.CloseableIterator;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -57,6 +61,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -354,6 +359,20 @@ public class FlinkPaimonTieringTestBase {
return createTable(tablePath, table1Descriptor);
}
+ protected void dropTable(TablePath tablePath) throws Exception {
+ admin.dropTable(tablePath, false).get();
+ Identifier tableIdentifier = toPaimonIdentifier(tablePath);
+ try {
+ paimonCatalog.dropTable(tableIdentifier, false);
+ } catch (Catalog.TableNotExistException e) {
+ // do nothing, table not exists
+ }
+ }
+
+ private Identifier toPaimonIdentifier(TablePath tablePath) {
+ return Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName());
+ }
+
protected void assertReplicaStatus(
TablePath tablePath,
long tableId,
@@ -416,4 +435,29 @@ public class FlinkPaimonTieringTestBase {
Duration.ofMinutes(2),
"bucket " + tb + "not synced");
}
+
+ protected void checkDataInPaimonPrimayKeyTable(
+ TablePath tablePath, List<InternalRow> expectedRows) throws
Exception {
+ Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
+ getPaimonRowCloseableIterator(tablePath);
+ for (InternalRow expectedRow : expectedRows) {
+ org.apache.paimon.data.InternalRow row = paimonRowIterator.next();
+ assertThat(row.getInt(0)).isEqualTo(expectedRow.getInt(0));
+
assertThat(row.getString(1).toString()).isEqualTo(expectedRow.getString(1).toString());
+ }
+ }
+
+ protected CloseableIterator<org.apache.paimon.data.InternalRow>
getPaimonRowCloseableIterator(
+ TablePath tablePath) throws Exception {
+ Identifier tableIdentifier =
+ Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName());
+
+ paimonCatalog = getPaimonCatalog();
+
+ FileStoreTable table = (FileStoreTable)
paimonCatalog.getTable(tableIdentifier);
+
+ RecordReader<org.apache.paimon.data.InternalRow> reader =
+
table.newRead().createReader(table.newReadBuilder().newScan().plan());
+ return reader.toCloseableIterator();
+ }
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
index 071cc602..4309a617 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
@@ -189,17 +189,6 @@ class PaimonTieringITCase extends
FlinkPaimonTieringTestBase {
assertThat(flussRowIterator.hasNext()).isFalse();
}
- private void checkDataInPaimonPrimayKeyTable(
- TablePath tablePath, List<InternalRow> expectedRows) throws
Exception {
- Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
- getPaimonRowCloseableIterator(tablePath);
- for (InternalRow expectedRow : expectedRows) {
- org.apache.paimon.data.InternalRow row = paimonRowIterator.next();
- assertThat(row.getInt(0)).isEqualTo(expectedRow.getInt(0));
-
assertThat(row.getString(1).toString()).isEqualTo(expectedRow.getString(1).toString());
- }
- }
-
private void checkDataInPaimonAppendOnlyPartitionedTable(
TablePath tablePath,
Map<String, String> partitionSpec,
@@ -221,18 +210,6 @@ class PaimonTieringITCase extends
FlinkPaimonTieringTestBase {
assertThat(flussRowIterator.hasNext()).isFalse();
}
- private CloseableIterator<org.apache.paimon.data.InternalRow>
getPaimonRowCloseableIterator(
- TablePath tablePath) throws Exception {
- Identifier tableIdentifier =
- Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName());
-
- FileStoreTable table = (FileStoreTable)
paimonCatalog.getTable(tableIdentifier);
-
- RecordReader<org.apache.paimon.data.InternalRow> reader =
-
table.newRead().createReader(table.newReadBuilder().newScan().plan());
- return reader.toCloseableIterator();
- }
-
private CloseableIterator<org.apache.paimon.data.InternalRow>
getPaimonRowCloseableIterator(
TablePath tablePath, Map<String, String> partitionSpec) throws
Exception {
Identifier tableIdentifier =
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
new file mode 100644
index 00000000..3378f650
--- /dev/null
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.tiering;
+
+import com.alibaba.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+
+/** A Test case for dropping a pktable after tiering and creating one with the
same tablePath. */
+class ReCreateSameTableAfterTieringTest extends FlinkPaimonTieringTestBase {
+ protected static final String DEFAULT_DB = "fluss";
+
+ private static StreamExecutionEnvironment execEnv;
+
+ @BeforeAll
+ protected static void beforeAll() {
+ FlinkPaimonTieringTestBase.beforeAll();
+ execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ execEnv.setParallelism(2);
+ execEnv.enableCheckpointing(1000);
+ }
+
+ @Test
+ void testReCreateSameTable() throws Exception {
+ // create a pk table, write some records and wait until snapshot
finished
+ TablePath t1 = TablePath.of(DEFAULT_DB, "pkTable_drop");
+ long t1Id = createPkTable(t1);
+ TableBucket t1Bucket = new TableBucket(t1Id, 0);
+ // write records
+ List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"),
row(3, "v3"));
+ writeRows(t1, rows, false);
+ waitUntilSnapshot(t1Id, 1, 0);
+ // then start tiering job
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ // check the status of replica after synced
+ assertReplicaStatus(t1Bucket, 3);
+ // check data in paimon
+ checkDataInPaimonPrimayKeyTable(t1, rows);
+
+ // then drop the table
+ dropTable(t1);
+ // and create a new table with the same table path
+ long t2Id = createPkTable(t1);
+ TableBucket t2Bucket = new TableBucket(t2Id, 0);
+ // write some new records
+ List<InternalRow> newRows = Arrays.asList(row(4, "v4"), row(5, "v5"));
+ writeRows(t1, newRows, false);
+ // new table, so the snapshot id should be 0
+ waitUntilSnapshot(t2Id, 1, 0);
+ // check the status of replica after synced
+ assertReplicaStatus(t2Bucket, 2);
+ // check data in paimon
+ checkDataInPaimonPrimayKeyTable(t1, newRows);
+
+ // stop the tiering job
+ jobClient.cancel().get();
+ }
+}