This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new f8d389c5 [lake] TieringEnumerator will always fail to generate splits 
when re-create table with same name (#1244)
f8d389c5 is described below

commit f8d389c5f3450f4ca3aaa2c13fd788454a0a1d72
Author: naivedogger <[email protected]>
AuthorDate: Thu Jul 3 12:10:53 2025 +0800

    [lake] TieringEnumerator will always fail to generate splits when re-create 
table with same name (#1244)
---
 .../com/alibaba/fluss/client/admin/FlussAdmin.java |  5 +-
 .../testutils/FlinkPaimonTieringTestBase.java      | 44 +++++++++++
 .../lake/paimon/tiering/PaimonTieringITCase.java   | 23 ------
 .../tiering/ReCreateSameTableAfterTieringTest.java | 85 ++++++++++++++++++++++
 4 files changed, 131 insertions(+), 26 deletions(-)

diff --git 
a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java 
b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java
index 01a60f42..414e0c25 100644
--- a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java
+++ b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java
@@ -377,12 +377,11 @@ public class FlussAdmin implements Admin {
             Collection<Integer> buckets,
             OffsetSpec offsetSpec) {
         Long partitionId = null;
-        metadataUpdater.checkAndUpdateTableMetadata(
-                Collections.singleton(physicalTablePath.getTablePath()));
+        
metadataUpdater.updateTableOrPartitionMetadata(physicalTablePath.getTablePath(),
 null);
         long tableId = 
metadataUpdater.getTableId(physicalTablePath.getTablePath());
         // if partition name is not null, we need to check and update 
partition metadata
         if (physicalTablePath.getPartitionName() != null) {
-            metadataUpdater.checkAndUpdatePartitionMetadata(physicalTablePath);
+            
metadataUpdater.updatePhysicalTableMetadata(Collections.singleton(physicalTablePath));
             partitionId = 
metadataUpdater.getPartitionIdOrElseThrow(physicalTablePath);
         }
         Map<Integer, ListOffsetsRequest> requestMap =
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index d82ec2a5..f8a8641c 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -46,7 +46,11 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.CloseableIterator;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -57,6 +61,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -354,6 +359,20 @@ public class FlinkPaimonTieringTestBase {
         return createTable(tablePath, table1Descriptor);
     }
 
+    protected void dropTable(TablePath tablePath) throws Exception {
+        admin.dropTable(tablePath, false).get();
+        Identifier tableIdentifier = toPaimonIdentifier(tablePath);
+        try {
+            paimonCatalog.dropTable(tableIdentifier, false);
+        } catch (Catalog.TableNotExistException e) {
+            // do nothing, table not exists
+        }
+    }
+
+    private Identifier toPaimonIdentifier(TablePath tablePath) {
+        return Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName());
+    }
+
     protected void assertReplicaStatus(
             TablePath tablePath,
             long tableId,
@@ -416,4 +435,29 @@ public class FlinkPaimonTieringTestBase {
                 Duration.ofMinutes(2),
                 "bucket " + tb + "not synced");
     }
+
+    protected void checkDataInPaimonPrimayKeyTable(
+            TablePath tablePath, List<InternalRow> expectedRows) throws 
Exception {
+        Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
+                getPaimonRowCloseableIterator(tablePath);
+        for (InternalRow expectedRow : expectedRows) {
+            org.apache.paimon.data.InternalRow row = paimonRowIterator.next();
+            assertThat(row.getInt(0)).isEqualTo(expectedRow.getInt(0));
+            
assertThat(row.getString(1).toString()).isEqualTo(expectedRow.getString(1).toString());
+        }
+    }
+
+    protected CloseableIterator<org.apache.paimon.data.InternalRow> 
getPaimonRowCloseableIterator(
+            TablePath tablePath) throws Exception {
+        Identifier tableIdentifier =
+                Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName());
+
+        paimonCatalog = getPaimonCatalog();
+
+        FileStoreTable table = (FileStoreTable) 
paimonCatalog.getTable(tableIdentifier);
+
+        RecordReader<org.apache.paimon.data.InternalRow> reader =
+                
table.newRead().createReader(table.newReadBuilder().newScan().plan());
+        return reader.toCloseableIterator();
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
index 071cc602..4309a617 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
@@ -189,17 +189,6 @@ class PaimonTieringITCase extends 
FlinkPaimonTieringTestBase {
         assertThat(flussRowIterator.hasNext()).isFalse();
     }
 
-    private void checkDataInPaimonPrimayKeyTable(
-            TablePath tablePath, List<InternalRow> expectedRows) throws 
Exception {
-        Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
-                getPaimonRowCloseableIterator(tablePath);
-        for (InternalRow expectedRow : expectedRows) {
-            org.apache.paimon.data.InternalRow row = paimonRowIterator.next();
-            assertThat(row.getInt(0)).isEqualTo(expectedRow.getInt(0));
-            
assertThat(row.getString(1).toString()).isEqualTo(expectedRow.getString(1).toString());
-        }
-    }
-
     private void checkDataInPaimonAppendOnlyPartitionedTable(
             TablePath tablePath,
             Map<String, String> partitionSpec,
@@ -221,18 +210,6 @@ class PaimonTieringITCase extends 
FlinkPaimonTieringTestBase {
         assertThat(flussRowIterator.hasNext()).isFalse();
     }
 
-    private CloseableIterator<org.apache.paimon.data.InternalRow> 
getPaimonRowCloseableIterator(
-            TablePath tablePath) throws Exception {
-        Identifier tableIdentifier =
-                Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName());
-
-        FileStoreTable table = (FileStoreTable) 
paimonCatalog.getTable(tableIdentifier);
-
-        RecordReader<org.apache.paimon.data.InternalRow> reader =
-                
table.newRead().createReader(table.newReadBuilder().newScan().plan());
-        return reader.toCloseableIterator();
-    }
-
     private CloseableIterator<org.apache.paimon.data.InternalRow> 
getPaimonRowCloseableIterator(
             TablePath tablePath, Map<String, String> partitionSpec) throws 
Exception {
         Identifier tableIdentifier =
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
new file mode 100644
index 00000000..3378f650
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/ReCreateSameTableAfterTieringTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.tiering;
+
+import com.alibaba.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+
+/** A Test case for dropping a pktable after tiering and creating one with the 
same tablePath. */
+class ReCreateSameTableAfterTieringTest extends FlinkPaimonTieringTestBase {
+    protected static final String DEFAULT_DB = "fluss";
+
+    private static StreamExecutionEnvironment execEnv;
+
+    @BeforeAll
+    protected static void beforeAll() {
+        FlinkPaimonTieringTestBase.beforeAll();
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        execEnv.setParallelism(2);
+        execEnv.enableCheckpointing(1000);
+    }
+
+    @Test
+    void testReCreateSameTable() throws Exception {
+        // create a pk table, write some records and wait until snapshot 
finished
+        TablePath t1 = TablePath.of(DEFAULT_DB, "pkTable_drop");
+        long t1Id = createPkTable(t1);
+        TableBucket t1Bucket = new TableBucket(t1Id, 0);
+        // write records
+        List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3"));
+        writeRows(t1, rows, false);
+        waitUntilSnapshot(t1Id, 1, 0);
+        // then start tiering job
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        // check the status of replica after synced
+        assertReplicaStatus(t1Bucket, 3);
+        // check data in paimon
+        checkDataInPaimonPrimayKeyTable(t1, rows);
+
+        // then drop the table
+        dropTable(t1);
+        // and create a new table with the same table path
+        long t2Id = createPkTable(t1);
+        TableBucket t2Bucket = new TableBucket(t2Id, 0);
+        // write some new records
+        List<InternalRow> newRows = Arrays.asList(row(4, "v4"), row(5, "v5"));
+        writeRows(t1, newRows, false);
+        // new table, so the snapshot id should be 0
+        waitUntilSnapshot(t2Id, 1, 0);
+        // check the status of replica after synced
+        assertReplicaStatus(t2Bucket, 2);
+        // check data in paimon
+        checkDataInPaimonPrimayKeyTable(t1, newRows);
+
+        // stop the tiering job
+        jobClient.cancel().get();
+    }
+}

Reply via email to