This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b08a8206fe [core] Add manifest cache to the FallbackReadFileStoreTable
(#5118)
b08a8206fe is described below
commit b08a8206fee5986f7cf89800d31fee06c537ab97
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Feb 20 15:28:09 2025 +0800
[core] Add manifest cache to the FallbackReadFileStoreTable (#5118)
---
.../paimon/table/FallbackReadFileStoreTable.java | 15 ++++++++
.../apache/paimon/catalog/CachingCatalogTest.java | 40 ++++++++++++++++++++--
2 files changed, 53 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index eb405a5d5d..535bd6a270 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -23,6 +23,7 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.options.Options;
@@ -41,6 +42,7 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SegmentsCache;
import java.io.IOException;
import java.util.ArrayList;
@@ -101,6 +103,12 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
return new
FallbackReadFileStoreTable(switchWrappedToBranch(branchName), fallback);
}
+ @Override
+ public void setManifestCache(SegmentsCache<Path> manifestCache) {
+ super.setManifestCache(manifestCache);
+ fallback.setManifestCache(manifestCache);
+ }
+
private FileStoreTable switchWrappedToBranch(String branchName) {
Optional<TableSchema> optionalSchema =
wrapped.schemaManager().copyWithBranch(branchName).latest();
@@ -287,6 +295,13 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
return this;
}
+ @Override
+ public InnerTableScan dropStats() {
+ mainScan.dropStats();
+ fallbackScan.dropStats();
+ return this;
+ }
+
@Override
public TableScan.Plan plan() {
List<DataSplit> splits = new ArrayList<>();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index f23b07c767..c8a43f9013 100644
---
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.catalog;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
@@ -34,12 +35,14 @@ import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.table.system.SystemTableLoader;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.FakeTicker;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -47,9 +50,11 @@ import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.FileNotFoundException;
+import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -384,8 +389,36 @@ class CachingCatalogTest extends CatalogTestBase {
catalog.dropTable(tableIdent, true);
catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
- // write
+ // normal table
Table table = catalog.getTable(tableIdent);
+ writeTableForTestManifestCache(table);
+ readTableForTestManifestCache(catalog, tableIdent);
+
+ // fallback branch table
+ tableIdent = new Identifier("db", "fallback_t");
+ Schema fallbackSchema =
+ new Schema(
+ Lists.newArrayList(
+ new DataField(0, "pk", DataTypes.INT()),
+ new DataField(1, "col1", DataTypes.STRING()),
+ new DataField(2, "col2", DataTypes.STRING())),
+ Collections.singletonList("col2"),
+ Collections.emptyList(),
+ new HashMap<>(),
+ "");
+ catalog.createTable(tableIdent, fallbackSchema, false);
+ table = catalog.getTable(tableIdent);
+ table.createBranch("fallback");
+ catalog.alterTable(
+ tableIdent,
+ SchemaChange.setOption(CoreOptions.SCAN_FALLBACK_BRANCH.key(),
"fallback"),
+ false);
+ table = catalog.getTable(new Identifier("db",
"fallback_t$branch_fallback"));
+ writeTableForTestManifestCache(table);
+ readTableForTestManifestCache(catalog, tableIdent);
+ }
+
+ private static void writeTableForTestManifestCache(Table table) throws
Exception {
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
try (BatchTableWrite write = writeBuilder.newWrite();
BatchTableCommit commit = writeBuilder.newCommit()) {
@@ -393,8 +426,11 @@ class CachingCatalogTest extends CatalogTestBase {
write.write(GenericRow.of(2, fromString("2"), fromString("2")));
commit.commit(write.prepareCommit());
}
+ }
- // repeat read
+ private void readTableForTestManifestCache(Catalog catalog, Identifier
tableIdent)
+ throws Catalog.TableNotExistException, IOException {
+ Table table;
for (int i = 0; i < 5; i++) {
// test copy too
table =
catalog.getTable(tableIdent).copy(Collections.singletonMap("a", "b"));