This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b08a8206fe [core] Add manifest cache to the FallbackReadFileStoreTable 
(#5118)
b08a8206fe is described below

commit b08a8206fee5986f7cf89800d31fee06c537ab97
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Feb 20 15:28:09 2025 +0800

    [core] Add manifest cache to the FallbackReadFileStoreTable (#5118)
---
 .../paimon/table/FallbackReadFileStoreTable.java   | 15 ++++++++
 .../apache/paimon/catalog/CachingCatalogTest.java  | 40 ++++++++++++++++++++--
 2 files changed, 53 insertions(+), 2 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index eb405a5d5d..535bd6a270 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -23,6 +23,7 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.options.Options;
@@ -41,6 +42,7 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SegmentsCache;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -101,6 +103,12 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
         return new 
FallbackReadFileStoreTable(switchWrappedToBranch(branchName), fallback);
     }
 
+    @Override
+    public void setManifestCache(SegmentsCache<Path> manifestCache) {
+        super.setManifestCache(manifestCache);
+        fallback.setManifestCache(manifestCache);
+    }
+
     private FileStoreTable switchWrappedToBranch(String branchName) {
         Optional<TableSchema> optionalSchema =
                 wrapped.schemaManager().copyWithBranch(branchName).latest();
@@ -287,6 +295,13 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
             return this;
         }
 
+        @Override
+        public InnerTableScan dropStats() {
+            mainScan.dropStats();
+            fallbackScan.dropStats();
+            return this;
+        }
+
         @Override
         public TableScan.Plan plan() {
             List<DataSplit> splits = new ArrayList<>();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index f23b07c767..c8a43f9013 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.catalog;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.Path;
@@ -34,12 +35,14 @@ import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.table.system.SystemTableLoader;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.FakeTicker;
 
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
 
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -47,9 +50,11 @@ import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -384,8 +389,36 @@ class CachingCatalogTest extends CatalogTestBase {
         catalog.dropTable(tableIdent, true);
         catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
 
-        // write
+        // normal table
         Table table = catalog.getTable(tableIdent);
+        writeTableForTestManifestCache(table);
+        readTableForTestManifestCache(catalog, tableIdent);
+
+        // fallback branch table
+        tableIdent = new Identifier("db", "fallback_t");
+        Schema fallbackSchema =
+                new Schema(
+                        Lists.newArrayList(
+                                new DataField(0, "pk", DataTypes.INT()),
+                                new DataField(1, "col1", DataTypes.STRING()),
+                                new DataField(2, "col2", DataTypes.STRING())),
+                        Collections.singletonList("col2"),
+                        Collections.emptyList(),
+                        new HashMap<>(),
+                        "");
+        catalog.createTable(tableIdent, fallbackSchema, false);
+        table = catalog.getTable(tableIdent);
+        table.createBranch("fallback");
+        catalog.alterTable(
+                tableIdent,
+                SchemaChange.setOption(CoreOptions.SCAN_FALLBACK_BRANCH.key(), 
"fallback"),
+                false);
+        table = catalog.getTable(new Identifier("db", 
"fallback_t$branch_fallback"));
+        writeTableForTestManifestCache(table);
+        readTableForTestManifestCache(catalog, tableIdent);
+    }
+
+    private static void writeTableForTestManifestCache(Table table) throws 
Exception {
         BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
         try (BatchTableWrite write = writeBuilder.newWrite();
                 BatchTableCommit commit = writeBuilder.newCommit()) {
@@ -393,8 +426,11 @@ class CachingCatalogTest extends CatalogTestBase {
             write.write(GenericRow.of(2, fromString("2"), fromString("2")));
             commit.commit(write.prepareCommit());
         }
+    }
 
-        // repeat read
+    private void readTableForTestManifestCache(Catalog catalog, Identifier 
tableIdent)
+            throws Catalog.TableNotExistException, IOException {
+        Table table;
         for (int i = 0; i < 5; i++) {
             // test copy too
             table = 
catalog.getTable(tableIdent).copy(Collections.singletonMap("a", "b"));

Reply via email to