This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5f9a8fb65d [core] Add cache for snapshots in CachingCatalog (#4565)
5f9a8fb65d is described below
commit 5f9a8fb65d079fb572f80dc682728bc1e048d46b
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Nov 22 19:11:22 2024 +0800
[core] Add cache for snapshots in CachingCatalog (#4565)
---
.../generated/catalog_configuration.html | 6 +++
.../org/apache/paimon/options/CatalogOptions.java | 7 +++
.../java/org/apache/paimon/AbstractFileStore.java | 10 +++-
.../src/main/java/org/apache/paimon/FileStore.java | 4 ++
.../src/main/java/org/apache/paimon/Snapshot.java | 58 ++++++++++++----------
.../org/apache/paimon/catalog/CachingCatalog.java | 35 ++++++++++---
.../apache/paimon/operation/OrphanFilesClean.java | 3 +-
.../paimon/privilege/PrivilegedFileStore.java | 8 +++
.../org/apache/paimon/schema/SchemaManager.java | 15 +-----
.../java/org/apache/paimon/schema/TableSchema.java | 39 ++++++++++-----
.../paimon/table/AbstractFileStoreTable.java | 10 +++-
.../java/org/apache/paimon/table/DataTable.java | 3 ++
.../paimon/table/DelegatedFileStoreTable.java | 13 +++++
.../paimon/table/FallbackReadFileStoreTable.java | 3 +-
.../org/apache/paimon/table/FileStoreTable.java | 5 ++
.../table/system/AggregationFieldsTable.java | 7 +--
.../apache/paimon/table/system/AuditLogTable.java | 6 +++
.../paimon/table/system/CompactBucketsTable.java | 6 +++
.../paimon/table/system/FileMonitorTable.java | 6 +++
.../org/apache/paimon/table/system/FilesTable.java | 3 +-
.../apache/paimon/table/system/OptionsTable.java | 24 ++++-----
.../paimon/table/system/ReadOptimizedTable.java | 6 +++
.../apache/paimon/table/system/SchemasTable.java | 57 ++++-----------------
.../apache/paimon/table/system/SnapshotsTable.java | 6 +--
.../src/main/java/org/apache/paimon/tag/Tag.java | 48 +++++++++---------
.../org/apache/paimon/utils/BranchManager.java | 4 +-
.../org/apache/paimon/utils/SnapshotManager.java | 54 ++++++++++++++------
.../java/org/apache/paimon/utils/TagManager.java | 8 +--
.../apache/paimon/catalog/CachingCatalogTest.java | 25 +++++++++-
.../paimon/catalog/TestableCachingCatalog.java | 9 +++-
.../paimon/table/AppendOnlyFileDataTableTest.java | 2 +-
.../AppendOnlyTableColumnTypeFileDataTest.java | 2 +-
.../AppendOnlyTableColumnTypeFileMetaTest.java | 2 +-
.../table/AppendOnlyTableFileMetaFilterTest.java | 2 +-
.../paimon/table/FileStoreTableTestBase.java | 4 +-
.../table/PrimaryKeyColumnTypeFileDataTest.java | 2 +-
.../paimon/table/PrimaryKeyFileDataTableTest.java | 2 +-
.../paimon/table/PrimaryKeyFileMetaFilterTest.java | 2 +-
.../PrimaryKeyTableColumnTypeFileMetaTest.java | 2 +-
.../apache/paimon/utils/SnapshotManagerTest.java | 4 +-
40 files changed, 316 insertions(+), 196 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index 3686fa20c6..6706d5c421 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -68,6 +68,12 @@ under the License.
<td>Long</td>
<td>Controls the max number for which partitions in the catalog
are cached.</td>
</tr>
+ <tr>
+ <td><h5>cache.snapshot.max-num-per-table</h5></td>
+ <td style="word-wrap: break-word;">20</td>
+ <td>Integer</td>
+ <td>Controls the max number for snapshots per table in the catalog
are cached.</td>
+ </tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 6ad9f3350a..f69af2d599 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -123,6 +123,13 @@ public class CatalogOptions {
.noDefaultValue()
.withDescription("Controls the maximum memory to cache
manifest content.");
+ public static final ConfigOption<Integer> CACHE_SNAPSHOT_MAX_NUM_PER_TABLE
=
+ key("cache.snapshot.max-num-per-table")
+ .intType()
+ .defaultValue(20)
+ .withDescription(
+ "Controls the max number for snapshots per table
in the catalog are cached.");
+
public static final ConfigOption<String> LINEAGE_META =
key("lineage-meta")
.stringType()
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 14665961a8..ae4552aa71 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -54,6 +54,8 @@ import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
import javax.annotation.Nullable;
import java.time.Duration;
@@ -79,6 +81,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> {
@Nullable private final SegmentsCache<Path> writeManifestCache;
@Nullable private SegmentsCache<Path> readManifestCache;
+ @Nullable private Cache<Path, Snapshot> snapshotCache;
protected AbstractFileStore(
FileIO fileIO,
@@ -116,7 +119,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
@Override
public SnapshotManager snapshotManager() {
- return new SnapshotManager(fileIO, options.path(), options.branch());
+ return new SnapshotManager(fileIO, options.path(), options.branch(),
snapshotCache);
}
@Override
@@ -340,4 +343,9 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
public void setManifestCache(SegmentsCache<Path> manifestCache) {
this.readManifestCache = manifestCache;
}
+
+ @Override
+ public void setSnapshotCache(Cache<Path, Snapshot> cache) {
+ this.snapshotCache = cache;
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index f9bf4c8440..e50d4ada13 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -44,6 +44,8 @@ import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
import javax.annotation.Nullable;
import java.util.List;
@@ -107,4 +109,6 @@ public interface FileStore<T> {
List<TagCallback> createTagCallbacks();
void setManifestCache(SegmentsCache<Path> manifestCache);
+
+ void setSnapshotCache(Cache<Path, Snapshot> cache);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
index 3b8d2fa15b..baee7bad95 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
@@ -29,9 +29,6 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgn
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import javax.annotation.Nullable;
import java.io.FileNotFoundException;
@@ -65,7 +62,6 @@ import java.util.Objects;
@Public
@JsonIgnoreProperties(ignoreUnknown = true)
public class Snapshot {
- private static final Logger LOG = LoggerFactory.getLogger(Snapshot.class);
public static final long FIRST_SNAPSHOT_ID = 1;
@@ -355,28 +351,6 @@ public class Snapshot {
return JsonSerdeUtil.toJson(this);
}
- public static Snapshot fromJson(String json) {
- return JsonSerdeUtil.fromJson(json, Snapshot.class);
- }
-
- public static Snapshot fromPath(FileIO fileIO, Path path) {
- try {
- return Snapshot.fromJson(fileIO.readFileUtf8(path));
- } catch (FileNotFoundException e) {
- String errorMessage =
- String.format(
- "Snapshot file %s does not exist. "
- + "It might have been expired by other
jobs operating on this table. "
- + "In this case, you can avoid concurrent
modification issues by configuring "
- + "write-only = true and use a dedicated
compaction job, or configuring "
- + "different expiration thresholds for
different jobs.",
- path);
- throw new RuntimeException(errorMessage, e);
- } catch (IOException e) {
- throw new RuntimeException("Fails to read snapshot from path " +
path, e);
- }
- }
-
@Override
public int hashCode() {
return Objects.hash(
@@ -437,4 +411,36 @@ public class Snapshot {
/** Collect statistics. */
ANALYZE
}
+
+ // =================== Utils for reading =========================
+
+ public static Snapshot fromJson(String json) {
+ return JsonSerdeUtil.fromJson(json, Snapshot.class);
+ }
+
+ public static Snapshot fromPath(FileIO fileIO, Path path) {
+ try {
+ return tryFromPath(fileIO, path);
+ } catch (FileNotFoundException e) {
+ String errorMessage =
+ String.format(
+ "Snapshot file %s does not exist. "
+ + "It might have been expired by other
jobs operating on this table. "
+ + "In this case, you can avoid concurrent
modification issues by configuring "
+ + "write-only = true and use a dedicated
compaction job, or configuring "
+ + "different expiration thresholds for
different jobs.",
+ path);
+ throw new RuntimeException(errorMessage, e);
+ }
+ }
+
+ public static Snapshot tryFromPath(FileIO fileIO, Path path) throws
FileNotFoundException {
+ try {
+ return Snapshot.fromJson(fileIO.readFileUtf8(path));
+ } catch (FileNotFoundException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new RuntimeException("Fails to read snapshot from path " +
path, e);
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index 003f0edb4f..1912ad6062 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -55,6 +55,7 @@ import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM;
+import static
org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE;
import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;
/** A {@link Catalog} to cache databases and tables and manifests. */
@@ -62,6 +63,9 @@ public class CachingCatalog extends DelegateCatalog {
private static final Logger LOG =
LoggerFactory.getLogger(CachingCatalog.class);
+ private final Duration expirationInterval;
+ private final int snapshotMaxNumPerTable;
+
protected final Cache<String, Database> databaseCache;
protected final Cache<Identifier, Table> tableCache;
@Nullable protected final SegmentsCache<Path> manifestCache;
@@ -75,7 +79,8 @@ public class CachingCatalog extends DelegateCatalog {
CACHE_EXPIRATION_INTERVAL_MS.defaultValue(),
CACHE_MANIFEST_SMALL_FILE_MEMORY.defaultValue(),
CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes(),
- CACHE_PARTITION_MAX_NUM.defaultValue());
+ CACHE_PARTITION_MAX_NUM.defaultValue(),
+ CACHE_SNAPSHOT_MAX_NUM_PER_TABLE.defaultValue());
}
public CachingCatalog(
@@ -83,13 +88,15 @@ public class CachingCatalog extends DelegateCatalog {
Duration expirationInterval,
MemorySize manifestMaxMemory,
long manifestCacheThreshold,
- long cachedPartitionMaxNum) {
+ long cachedPartitionMaxNum,
+ int snapshotMaxNumPerTable) {
this(
wrapped,
expirationInterval,
manifestMaxMemory,
manifestCacheThreshold,
cachedPartitionMaxNum,
+ snapshotMaxNumPerTable,
Ticker.systemTicker());
}
@@ -99,6 +106,7 @@ public class CachingCatalog extends DelegateCatalog {
MemorySize manifestMaxMemory,
long manifestCacheThreshold,
long cachedPartitionMaxNum,
+ int snapshotMaxNumPerTable,
Ticker ticker) {
super(wrapped);
if (expirationInterval.isZero() || expirationInterval.isNegative()) {
@@ -106,6 +114,9 @@ public class CachingCatalog extends DelegateCatalog {
"When cache.expiration-interval is set to negative or 0,
the catalog cache should be disabled.");
}
+ this.expirationInterval = expirationInterval;
+ this.snapshotMaxNumPerTable = snapshotMaxNumPerTable;
+
this.databaseCache =
Caffeine.newBuilder()
.softValues()
@@ -121,6 +132,7 @@ public class CachingCatalog extends DelegateCatalog {
.expireAfterAccess(expirationInterval)
.ticker(ticker)
.build();
+ this.manifestCache = SegmentsCache.create(manifestMaxMemory,
manifestCacheThreshold);
this.partitionCache =
cachedPartitionMaxNum == 0
? null
@@ -134,7 +146,6 @@ public class CachingCatalog extends DelegateCatalog {
.maximumWeight(cachedPartitionMaxNum)
.ticker(ticker)
.build();
- this.manifestCache = SegmentsCache.create(manifestMaxMemory,
manifestCacheThreshold);
}
public static Catalog tryToCreate(Catalog catalog, Options options) {
@@ -155,7 +166,8 @@ public class CachingCatalog extends DelegateCatalog {
options.get(CACHE_EXPIRATION_INTERVAL_MS),
manifestMaxMemory,
manifestThreshold,
- options.get(CACHE_PARTITION_MAX_NUM));
+ options.get(CACHE_PARTITION_MAX_NUM),
+ options.get(CACHE_SNAPSHOT_MAX_NUM_PER_TABLE));
}
@Override
@@ -244,9 +256,20 @@ public class CachingCatalog extends DelegateCatalog {
}
private void putTableCache(Identifier identifier, Table table) {
- if (manifestCache != null && table instanceof FileStoreTable) {
- ((FileStoreTable) table).setManifestCache(manifestCache);
+ if (table instanceof FileStoreTable) {
+ FileStoreTable storeTable = (FileStoreTable) table;
+ storeTable.setSnapshotCache(
+ Caffeine.newBuilder()
+ .softValues()
+ .expireAfterAccess(expirationInterval)
+ .maximumSize(snapshotMaxNumPerTable)
+ .executor(Runnable::run)
+ .build());
+ if (manifestCache != null) {
+ storeTable.setManifestCache(manifestCache);
+ }
}
+
tableCache.put(identifier, table);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index 5698908cb9..869100d9cf 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -105,7 +105,8 @@ public abstract class OrphanFilesClean implements
Serializable {
List<String> abnormalBranches = new ArrayList<>();
for (String branch : branches) {
- if (!new SchemaManager(table.fileIO(), table.location(),
branch).latest().isPresent()) {
+ SchemaManager schemaManager =
table.schemaManager().copyWithBranch(branch);
+ if (!schemaManager.latest().isPresent()) {
abnormalBranches.add(branch);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index 59243a5356..3ee0d5fa9b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -20,6 +20,7 @@ package org.apache.paimon.privilege;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
@@ -47,6 +48,8 @@ import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
import javax.annotation.Nullable;
import java.util.List;
@@ -210,4 +213,9 @@ public class PrivilegedFileStore<T> implements FileStore<T>
{
public void setManifestCache(SegmentsCache<Path> manifestCache) {
wrapped.setManifestCache(manifestCache);
}
+
+ @Override
+ public void setSnapshotCache(Cache<Path, Snapshot> cache) {
+ wrapped.setSnapshotCache(cache);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index a84348810b..d827ffd0fb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -45,7 +45,6 @@ import org.apache.paimon.types.MapType;
import org.apache.paimon.types.ReassignFieldId;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
-import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.StringUtils;
@@ -769,11 +768,7 @@ public class SchemaManager implements Serializable {
/** Read schema for schema id. */
public TableSchema schema(long id) {
- try {
- return
JsonSerdeUtil.fromJson(fileIO.readFileUtf8(toSchemaPath(id)),
TableSchema.class);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
+ return TableSchema.fromPath(fileIO, toSchemaPath(id));
}
/** Check if a schema exists. */
@@ -789,14 +784,6 @@ public class SchemaManager implements Serializable {
}
}
- public static TableSchema fromPath(FileIO fileIO, Path path) {
- try {
- return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(path),
TableSchema.class);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
private String branchPath() {
return BranchManager.branchPath(tableRoot, branch);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
index b5bdeccf10..a0a149d1ae 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -29,8 +29,10 @@ import org.apache.paimon.utils.StringUtils;
import javax.annotation.Nullable;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
+import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@@ -296,19 +298,6 @@ public class TableSchema implements Serializable {
timeMillis);
}
- public static TableSchema fromJson(String json) {
- return JsonSerdeUtil.fromJson(json, TableSchema.class);
- }
-
- public static TableSchema fromPath(FileIO fileIO, Path path) {
- try {
- String json = fileIO.readFileUtf8(path);
- return TableSchema.fromJson(json);
- } catch (IOException e) {
- throw new RuntimeException("Fails to read schema from path " +
path, e);
- }
- }
-
@Override
public String toString() {
return JsonSerdeUtil.toJson(this);
@@ -341,4 +330,28 @@ public class TableSchema implements Serializable {
public static List<DataField> newFields(RowType rowType) {
return rowType.getFields();
}
+
+ // =================== Utils for reading =========================
+
+ public static TableSchema fromJson(String json) {
+ return JsonSerdeUtil.fromJson(json, TableSchema.class);
+ }
+
+ public static TableSchema fromPath(FileIO fileIO, Path path) {
+ try {
+ return tryFromPath(fileIO, path);
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ public static TableSchema tryFromPath(FileIO fileIO, Path path) throws
FileNotFoundException {
+ try {
+ return fromJson(fileIO.readFileUtf8(path));
+ } catch (FileNotFoundException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 07c0e88645..4180ff11c1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -68,6 +68,8 @@ import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.SnapshotNotExistException;
import org.apache.paimon.utils.TagManager;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
import javax.annotation.Nullable;
import java.io.IOException;
@@ -123,6 +125,11 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
store().setManifestCache(manifestCache);
}
+ @Override
+ public void setSnapshotCache(Cache<Path, Snapshot> cache) {
+ store().setSnapshotCache(cache);
+ }
+
@Override
public OptionalLong latestSnapshotId() {
Long snapshot = store().snapshotManager().latestSnapshotId();
@@ -340,7 +347,8 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
: new PrimaryKeyFileStoreTable(fileIO, path, newTableSchema,
catalogEnvironment);
}
- protected SchemaManager schemaManager() {
+ @Override
+ public SchemaManager schemaManager() {
return new SchemaManager(fileIO(), path, currentBranch());
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
index e330db0e04..7979daccf7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.utils.BranchManager;
@@ -39,6 +40,8 @@ public interface DataTable extends InnerTable {
SnapshotManager snapshotManager();
+ SchemaManager schemaManager();
+
TagManager tagManager();
BranchManager branchManager();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 2b369e5005..624476b5b4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -27,6 +27,7 @@ import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.query.LocalTableQuery;
@@ -44,6 +45,8 @@ import org.apache.paimon.utils.SimpleFileReader;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
@@ -92,6 +95,11 @@ public abstract class DelegatedFileStoreTable implements
FileStoreTable {
return wrapped.snapshotManager();
}
+ @Override
+ public SchemaManager schemaManager() {
+ return wrapped.schemaManager();
+ }
+
@Override
public TagManager tagManager() {
return wrapped.tagManager();
@@ -117,6 +125,11 @@ public abstract class DelegatedFileStoreTable implements
FileStoreTable {
wrapped.setManifestCache(manifestCache);
}
+ @Override
+ public void setSnapshotCache(Cache<Path, Snapshot> cache) {
+ wrapped.setSnapshotCache(cache);
+ }
+
@Override
public TableSchema schema() {
return wrapped.schema();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index f1a60b9713..e3e290f060 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -28,7 +28,6 @@ import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
@@ -103,7 +102,7 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
private FileStoreTable switchWrappedToBranch(String branchName) {
Optional<TableSchema> optionalSchema =
- new SchemaManager(wrapped.fileIO(), wrapped.location(),
branchName).latest();
+ wrapped.schemaManager().copyWithBranch(branchName).latest();
Preconditions.checkArgument(
optionalSchema.isPresent(), "Branch " + branchName + " does
not exist");
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index 01227dd354..d37e57e4e5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table;
import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestCacheFilter;
@@ -30,6 +31,8 @@ import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.SegmentsCache;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -42,6 +45,8 @@ public interface FileStoreTable extends DataTable {
void setManifestCache(SegmentsCache<Path> manifestCache);
+ void setSnapshotCache(Cache<Path, Snapshot> cache);
+
@Override
default RowType rowType() {
return schema().logicalRowType();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
index 10a046ca70..8c0eed4d6b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
@@ -18,7 +18,6 @@
package org.apache.paimon.table.system;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -27,7 +26,6 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
@@ -78,14 +76,12 @@ public class AggregationFieldsTable implements
ReadonlyTable {
private final FileIO fileIO;
private final Path location;
- private final String branch;
private final FileStoreTable dataTable;
public AggregationFieldsTable(FileStoreTable dataTable) {
this.fileIO = dataTable.fileIO();
this.location = dataTable.location();
- this.branch = CoreOptions.branch(dataTable.schema().options());
this.dataTable = dataTable;
}
@@ -192,8 +188,7 @@ public class AggregationFieldsTable implements
ReadonlyTable {
if (!(split instanceof AggregationSplit)) {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
- Path location = ((AggregationSplit) split).location;
- TableSchema schemas = new SchemaManager(fileIO, location,
branch).latest().get();
+ TableSchema schemas = dataTable.schemaManager().latest().get();
Iterator<InternalRow> rows = createInternalRowIterator(schemas);
if (readType != null) {
rows =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index b0cbe0772b..1cb967f8d1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -39,6 +39,7 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.PredicateReplaceVisitor;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
@@ -187,6 +188,11 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return wrapped.snapshotManager();
}
+ @Override
+ public SchemaManager schemaManager() {
+ return wrapped.schemaManager();
+ }
+
@Override
public TagManager tagManager() {
return wrapped.tagManager();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
index ff40c9502e..31cecbfb15 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
@@ -33,6 +33,7 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
@@ -145,6 +146,11 @@ public class CompactBucketsTable implements DataTable,
ReadonlyTable {
return wrapped.snapshotManager();
}
+ @Override
+ public SchemaManager schemaManager() {
+ return wrapped.schemaManager();
+ }
+
@Override
public TagManager tagManager() {
return wrapped.tagManager();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
index fc1bb2a5b1..522335aaa6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
@@ -34,6 +34,7 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
@@ -131,6 +132,11 @@ public class FileMonitorTable implements DataTable,
ReadonlyTable {
return wrapped.snapshotManager();
}
+ @Override
+ public SchemaManager schemaManager() {
+ return wrapped.schemaManager();
+ }
+
@Override
public TagManager tagManager() {
return wrapped.tagManager();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 0232fc2d2d..6dcbb322d6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -143,8 +143,7 @@ public class FilesTable implements ReadonlyTable {
@Override
public InnerTableRead newRead() {
- return new FilesRead(
- new SchemaManager(storeTable.fileIO(), storeTable.location()),
storeTable);
+ return new FilesRead(storeTable.schemaManager(), storeTable);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
index c7dec03343..ed20896646 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
@@ -18,7 +18,6 @@
package org.apache.paimon.table.system;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -27,7 +26,6 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
@@ -44,7 +42,6 @@ import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
-import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@@ -70,14 +67,12 @@ public class OptionsTable implements ReadonlyTable {
private final FileIO fileIO;
private final Path location;
- private final String branch;
private final FileStoreTable dataTable;
public OptionsTable(FileStoreTable dataTable) {
this.fileIO = dataTable.fileIO();
this.location = dataTable.location();
- this.branch = CoreOptions.branch(dataTable.schema().options());
this.dataTable = dataTable;
}
@@ -178,14 +173,20 @@ public class OptionsTable implements ReadonlyTable {
}
@Override
- public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ public RecordReader<InternalRow> createReader(Split split) {
if (!(split instanceof OptionsSplit)) {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
- Path location = ((OptionsSplit) split).location;
Iterator<InternalRow> rows =
Iterators.transform(
- options(fileIO, location,
branch).entrySet().iterator(), this::toRow);
+ dataTable
+ .schemaManager()
+ .latest()
+ .orElseThrow(() -> new
RuntimeException("Table not exists."))
+ .options()
+ .entrySet()
+ .iterator(),
+ this::toRow);
if (readType != null) {
rows =
Iterators.transform(
@@ -203,11 +204,4 @@ public class OptionsTable implements ReadonlyTable {
BinaryString.fromString(option.getValue()));
}
}
-
- private static Map<String, String> options(FileIO fileIO, Path location,
String branchName) {
- return new SchemaManager(fileIO, location, branchName)
- .latest()
- .orElseThrow(() -> new RuntimeException("Table not exists."))
- .options();
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index deb149791c..5308005053 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -26,6 +26,7 @@ import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.operation.DefaultValueAssigner;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
@@ -165,6 +166,11 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
return wrapped.snapshotManager();
}
+ @Override
+ public SchemaManager schemaManager() {
+ return wrapped.schemaManager();
+ }
+
@Override
public TagManager tagManager() {
return wrapped.tagManager();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
index d0df75b34f..3cb0ff4783 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
@@ -18,13 +18,11 @@
package org.apache.paimon.table.system;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.And;
import org.apache.paimon.predicate.CompoundPredicate;
@@ -61,8 +59,6 @@ import org.apache.paimon.utils.SerializationUtils;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
-import javax.annotation.Nullable;
-
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
@@ -73,7 +69,6 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
@@ -98,16 +93,12 @@ public class SchemasTable implements ReadonlyTable {
new DataField(5, "comment",
SerializationUtils.newStringType(true)),
new DataField(6, "update_time", new
TimestampType(false, 3))));
- private final FileIO fileIO;
private final Path location;
- private final String branch;
private final FileStoreTable dataTable;
public SchemasTable(FileStoreTable dataTable) {
- this.fileIO = dataTable.fileIO();
this.location = dataTable.location();
- this.branch = CoreOptions.branch(dataTable.schema().options());
this.dataTable = dataTable;
}
@@ -133,7 +124,7 @@ public class SchemasTable implements ReadonlyTable {
@Override
public InnerTableRead newRead() {
- return new SchemasRead(fileIO);
+ return new SchemasRead();
}
@Override
@@ -141,24 +132,16 @@ public class SchemasTable implements ReadonlyTable {
return new SchemasTable(dataTable.copy(dynamicOptions));
}
- private class SchemasScan extends ReadOnceTableScan {
- private @Nullable LeafPredicate schemaId;
+ private static class SchemasScan extends ReadOnceTableScan {
@Override
- public InnerTableScan withFilter(Predicate predicate) {
- if (predicate == null) {
- return this;
- }
-
- Map<String, LeafPredicate> leafPredicates =
- predicate.visit(LeafPredicateExtractor.INSTANCE);
- schemaId = leafPredicates.get("schema_id");
- return this;
+ public Plan innerPlan() {
+ return () -> Collections.singletonList(new SchemasSplit());
}
@Override
- public Plan innerPlan() {
- return () -> Collections.singletonList(new SchemasSplit(location,
schemaId));
+ public InnerTableScan withFilter(Predicate predicate) {
+ return this;
}
}
@@ -167,47 +150,29 @@ public class SchemasTable implements ReadonlyTable {
private static final long serialVersionUID = 1L;
- private final Path location;
-
- private final @Nullable LeafPredicate schemaId;
-
- private SchemasSplit(Path location, @Nullable LeafPredicate schemaId) {
- this.location = location;
- this.schemaId = schemaId;
- }
-
+ @Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- SchemasSplit that = (SchemasSplit) o;
- return Objects.equals(location, that.location)
- && Objects.equals(schemaId, that.schemaId);
+ return o != null && getClass() == o.getClass();
}
@Override
public int hashCode() {
- return Objects.hash(location, schemaId);
+ return 0;
}
}
/** {@link TableRead} implementation for {@link SchemasTable}. */
private class SchemasRead implements InnerTableRead {
- private final FileIO fileIO;
private RowType readType;
private Optional<Long> optionalFilterSchemaIdMax = Optional.empty();
private Optional<Long> optionalFilterSchemaIdMin = Optional.empty();
private final List<Long> schemaIds = new ArrayList<>();
- public SchemasRead(FileIO fileIO) {
- this.fileIO = fileIO;
- }
-
@Override
public InnerTableRead withFilter(Predicate predicate) {
if (predicate == null) {
@@ -287,9 +252,7 @@ public class SchemasTable implements ReadonlyTable {
if (!(split instanceof SchemasSplit)) {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
- SchemasSplit schemasSplit = (SchemasSplit) split;
- Path location = schemasSplit.location;
- SchemaManager manager = new SchemaManager(fileIO, location,
branch);
+ SchemaManager manager = dataTable.schemaManager();
Collection<TableSchema> tableSchemas;
if (!schemaIds.isEmpty()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
index 10e5b691ac..2af13ee937 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
@@ -18,7 +18,6 @@
package org.apache.paimon.table.system;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
@@ -111,14 +110,12 @@ public class SnapshotsTable implements ReadonlyTable {
private final FileIO fileIO;
private final Path location;
- private final String branch;
private final FileStoreTable dataTable;
public SnapshotsTable(FileStoreTable dataTable) {
this.fileIO = dataTable.fileIO();
this.location = dataTable.location();
- this.branch = CoreOptions.branch(dataTable.schema().options());
this.dataTable = dataTable;
}
@@ -289,9 +286,8 @@ public class SnapshotsTable implements ReadonlyTable {
if (!(split instanceof SnapshotsSplit)) {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
- SnapshotManager snapshotManager =
- new SnapshotManager(fileIO, ((SnapshotsSplit)
split).location, branch);
+ SnapshotManager snapshotManager = dataTable.snapshotManager();
Iterator<Snapshot> snapshots;
if (!snapshotIds.isEmpty()) {
snapshots = snapshotManager.snapshotsWithId(snapshotIds);
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
index f1ac879d33..53641a2eb6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
@@ -33,6 +33,7 @@ import javax.annotation.Nullable;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Map;
@@ -113,29 +114,6 @@ public class Tag extends Snapshot {
return JsonSerdeUtil.toJson(this);
}
- public static Tag fromJson(String json) {
- return JsonSerdeUtil.fromJson(json, Tag.class);
- }
-
- public static Tag fromPath(FileIO fileIO, Path path) {
- try {
- String json = fileIO.readFileUtf8(path);
- return Tag.fromJson(json);
- } catch (IOException e) {
- throw new RuntimeException("Fails to read tag from path " + path,
e);
- }
- }
-
- @Nullable
- public static Tag safelyFromPath(FileIO fileIO, Path path) throws
IOException {
- try {
- String json = fileIO.readFileUtf8(path);
- return Tag.fromJson(json);
- } catch (FileNotFoundException e) {
- return null;
- }
- }
-
public static Tag fromSnapshotAndTagTtl(
Snapshot snapshot, Duration tagTimeRetained, LocalDateTime
tagCreateTime) {
return new Tag(
@@ -201,4 +179,28 @@ public class Tag extends Snapshot {
return Objects.equals(tagCreateTime, that.tagCreateTime)
&& Objects.equals(tagTimeRetained, that.tagTimeRetained);
}
+
+ // =================== Utils for reading =========================
+
+ public static Tag fromJson(String json) {
+ return JsonSerdeUtil.fromJson(json, Tag.class);
+ }
+
+ public static Tag fromPath(FileIO fileIO, Path path) {
+ try {
+ return tryFromPath(fileIO, path);
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ public static Tag tryFromPath(FileIO fileIO, Path path) throws
FileNotFoundException {
+ try {
+ return fromJson(fileIO.readFileUtf8(path));
+ } catch (FileNotFoundException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index bc353bb10d..2ea5f542f4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -23,6 +23,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.tag.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -178,7 +179,7 @@ public class BranchManager {
List<Path> deleteSchemaPaths = schemaManager.schemaPaths(id -> id
>= earliestSchemaId);
List<Path> deleteTagPaths =
tagManager.tagPaths(
- path -> Snapshot.fromPath(fileIO, path).id() >=
earliestSnapshotId);
+ path -> Tag.fromPath(fileIO, path).id() >=
earliestSnapshotId);
List<Path> deletePaths =
Stream.of(deleteSnapshotPaths, deleteSchemaPaths,
deleteTagPaths)
@@ -201,6 +202,7 @@ public class BranchManager {
tagManager.copyWithBranch(branchName).tagDirectory(),
tagManager.tagDirectory(),
true);
+ snapshotManager.invalidateCache();
} catch (IOException e) {
throw new RuntimeException(
String.format(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 5902d4c84c..9a120042ea 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -24,6 +24,8 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,16 +76,26 @@ public class SnapshotManager implements Serializable {
private final FileIO fileIO;
private final Path tablePath;
private final String branch;
+ @Nullable private final Cache<Path, Snapshot> cache;
public SnapshotManager(FileIO fileIO, Path tablePath) {
this(fileIO, tablePath, DEFAULT_MAIN_BRANCH);
}
/** Specify the default branch for data writing. */
- public SnapshotManager(FileIO fileIO, Path tablePath, String branchName) {
+ public SnapshotManager(FileIO fileIO, Path tablePath, @Nullable String
branchName) {
+ this(fileIO, tablePath, branchName, null);
+ }
+
+ public SnapshotManager(
+ FileIO fileIO,
+ Path tablePath,
+ @Nullable String branchName,
+ @Nullable Cache<Path, Snapshot> cache) {
this.fileIO = fileIO;
this.tablePath = tablePath;
this.branch = BranchManager.normalizeBranch(branchName);
+ this.cache = cache;
}
public SnapshotManager copyWithBranch(String branchName) {
@@ -120,20 +132,34 @@ public class SnapshotManager implements Serializable {
return new Path(branchPath(tablePath, branch) + "/snapshot");
}
+ public void invalidateCache() {
+ if (cache != null) {
+ cache.invalidateAll();
+ }
+ }
+
public Snapshot snapshot(long snapshotId) {
- Path snapshotPath = snapshotPath(snapshotId);
- return Snapshot.fromPath(fileIO, snapshotPath);
+ Path path = snapshotPath(snapshotId);
+ Snapshot snapshot = cache == null ? null : cache.getIfPresent(path);
+ if (snapshot == null) {
+ snapshot = Snapshot.fromPath(fileIO, path);
+ if (cache != null) {
+ cache.put(path, snapshot);
+ }
+ }
+ return snapshot;
}
public Snapshot tryGetSnapshot(long snapshotId) throws
FileNotFoundException {
- try {
- Path snapshotPath = snapshotPath(snapshotId);
- return Snapshot.fromJson(fileIO.readFileUtf8(snapshotPath));
- } catch (FileNotFoundException fileNotFoundException) {
- throw fileNotFoundException;
- } catch (IOException ioException) {
- throw new RuntimeException(ioException);
+ Path path = snapshotPath(snapshotId);
+ Snapshot snapshot = cache == null ? null : cache.getIfPresent(path);
+ if (snapshot == null) {
+ snapshot = Snapshot.tryFromPath(fileIO, path);
+ if (cache != null) {
+ cache.put(path, snapshot);
+ }
}
+ return snapshot;
}
public Changelog changelog(long snapshotId) {
@@ -486,11 +512,9 @@ public class SnapshotManager implements Serializable {
collectSnapshots(
path -> {
try {
-
snapshots.add(Snapshot.fromJson(fileIO.readFileUtf8(path)));
- } catch (IOException e) {
- if (!(e instanceof FileNotFoundException)) {
- throw new RuntimeException(e);
- }
+ // do not pollution cache
+ snapshots.add(Snapshot.tryFromPath(fileIO, path));
+ } catch (FileNotFoundException ignored) {
}
},
paths);
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 65963aafdf..1e05a100d7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -353,7 +353,7 @@ public class TagManager {
// If the tag file is not found, it might be deleted by
// other processes, so just skip this tag
try {
- Snapshot snapshot =
Snapshot.fromJson(fileIO.readFileUtf8(path));
+ Snapshot snapshot = Tag.tryFromPath(fileIO,
path).trimToSnapshot();
tags.computeIfAbsent(snapshot, s -> new
ArrayList<>()).add(tagName);
} catch (FileNotFoundException ignored) {
}
@@ -371,9 +371,9 @@ public class TagManager {
List<Pair<Tag, String>> tags = new ArrayList<>();
for (Path path : paths) {
String tagName = path.getName().substring(TAG_PREFIX.length());
- Tag tag = Tag.safelyFromPath(fileIO, path);
- if (tag != null) {
- tags.add(Pair.of(tag, tagName));
+ try {
+ tags.add(Pair.of(Tag.tryFromPath(fileIO, path), tagName));
+ } catch (FileNotFoundException ignored) {
}
}
return tags;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index 65ed5ce0b7..e4f0a1510b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.catalog;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
@@ -332,6 +333,27 @@ class CachingCatalogTest extends CatalogTestBase {
.toArray(Identifier[]::new);
}
+ @Test
+ public void testSnapshotCache() throws Exception {
+ TestableCachingCatalog wrappedCatalog =
+ new TestableCachingCatalog(this.catalog, EXPIRATION_TTL,
ticker);
+ Identifier tableIdent = new Identifier("db", "tbl");
+ wrappedCatalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+ Table table = wrappedCatalog.getTable(tableIdent);
+
+ // write
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(1, fromString("1"), fromString("1")));
+ write.write(GenericRow.of(2, fromString("2"), fromString("2")));
+ commit.commit(write.prepareCommit());
+ }
+
+ Snapshot snapshot = table.snapshot(1);
+ assertThat(snapshot).isSameAs(table.snapshot(1));
+ }
+
@Test
public void testManifestCache() throws Exception {
innerTestManifestCache(Long.MAX_VALUE);
@@ -346,7 +368,8 @@ class CachingCatalogTest extends CatalogTestBase {
Duration.ofSeconds(10),
MemorySize.ofMebiBytes(1),
manifestCacheThreshold,
- 0L);
+ 0L,
+ 10);
Identifier tableIdent = new Identifier("db", "tbl");
catalog.dropTable(tableIdent, true);
catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
index 4c70a0232c..1d4a9b0e8a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
@@ -38,7 +38,14 @@ public class TestableCachingCatalog extends CachingCatalog {
private final Duration cacheExpirationInterval;
public TestableCachingCatalog(Catalog catalog, Duration
expirationInterval, Ticker ticker) {
- super(catalog, expirationInterval, MemorySize.ZERO, Long.MAX_VALUE,
Long.MAX_VALUE, ticker);
+ super(
+ catalog,
+ expirationInterval,
+ MemorySize.ZERO,
+ Long.MAX_VALUE,
+ Long.MAX_VALUE,
+ Integer.MAX_VALUE,
+ ticker);
this.cacheExpirationInterval = expirationInterval;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileDataTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileDataTableTest.java
index 93854e7661..9ce3db0b1a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileDataTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileDataTableTest.java
@@ -33,7 +33,7 @@ public class AppendOnlyFileDataTableTest extends
FileDataFilterTestBase {
return new AppendOnlyFileStoreTable(
FileIOFinder.find(tablePath), tablePath,
schemaManager.latest().get()) {
@Override
- protected SchemaManager schemaManager() {
+ public SchemaManager schemaManager() {
return schemaManager;
}
};
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileDataTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileDataTest.java
index b4c16cef20..64d0c728d1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileDataTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileDataTest.java
@@ -37,7 +37,7 @@ public class AppendOnlyTableColumnTypeFileDataTest extends
ColumnTypeFileDataTes
SchemaManager schemaManager = new TestingSchemaManager(tablePath,
tableSchemas);
return new AppendOnlyFileStoreTable(fileIO, tablePath,
schemaManager.latest().get()) {
@Override
- protected SchemaManager schemaManager() {
+ public SchemaManager schemaManager() {
return schemaManager;
}
};
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileMetaTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileMetaTest.java
index f398d28cc5..300483a9f3 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileMetaTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileMetaTest.java
@@ -40,7 +40,7 @@ public class AppendOnlyTableColumnTypeFileMetaTest extends
ColumnTypeFileMetaTes
SchemaManager schemaManager = new TestingSchemaManager(tablePath,
tableSchemas);
return new AppendOnlyFileStoreTable(fileIO, tablePath,
schemaManager.latest().get()) {
@Override
- protected SchemaManager schemaManager() {
+ public SchemaManager schemaManager() {
return schemaManager;
}
};
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableFileMetaFilterTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableFileMetaFilterTest.java
index f65546af75..85ed802997 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableFileMetaFilterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableFileMetaFilterTest.java
@@ -40,7 +40,7 @@ public class AppendOnlyTableFileMetaFilterTest extends
FileMetaFilterTestBase {
SchemaManager schemaManager = new TestingSchemaManager(tablePath,
tableSchemas);
return new AppendOnlyFileStoreTable(fileIO, tablePath,
schemaManager.latest().get()) {
@Override
- protected SchemaManager schemaManager() {
+ public SchemaManager schemaManager() {
return schemaManager;
}
};
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 4d8408955d..75e284a68c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -1193,7 +1193,7 @@ public abstract class FileStoreTableTestBase {
SchemaManager schemaManager =
new SchemaManager(new TraceableFileIO(), tablePath,
"test-branch");
TableSchema branchSchema =
- SchemaManager.fromPath(new TraceableFileIO(),
schemaManager.toSchemaPath(0));
+ TableSchema.fromPath(new TraceableFileIO(),
schemaManager.toSchemaPath(0));
TableSchema schema0 = schemaManager.schema(0);
assertThat(branchSchema.equals(schema0)).isTrue();
}
@@ -1344,7 +1344,7 @@ public abstract class FileStoreTableTestBase {
// verify schema in branch1 and main branch is same
SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(),
tablePath);
TableSchema branchSchema =
- SchemaManager.fromPath(
+ TableSchema.fromPath(
new TraceableFileIO(),
schemaManager.copyWithBranch(BRANCH_NAME).toSchemaPath(0));
TableSchema schema0 = schemaManager.schema(0);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
index 8ba25c6617..64bb5f21ab 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
@@ -91,7 +91,7 @@ public class PrimaryKeyColumnTypeFileDataTest extends
ColumnTypeFileDataTestBase
SchemaManager schemaManager = new TestingSchemaManager(tablePath,
tableSchemas);
return new PrimaryKeyFileStoreTable(fileIO, tablePath,
schemaManager.latest().get()) {
@Override
- protected SchemaManager schemaManager() {
+ public SchemaManager schemaManager() {
return schemaManager;
}
};
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java
index 1be3219754..ba98138044 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java
@@ -247,7 +247,7 @@ public class PrimaryKeyFileDataTableTest extends
FileDataFilterTestBase {
return new PrimaryKeyFileStoreTable(fileIO, tablePath,
schemaManager.latest().get()) {
@Override
- protected SchemaManager schemaManager() {
+ public SchemaManager schemaManager() {
return schemaManager;
}
};
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java
index 618e8691c6..88928fe991 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java
@@ -146,7 +146,7 @@ public class PrimaryKeyFileMetaFilterTest extends
FileMetaFilterTestBase {
SchemaManager schemaManager = new TestingSchemaManager(tablePath,
tableSchemas);
return new PrimaryKeyFileStoreTable(fileIO, tablePath,
schemaManager.latest().get()) {
@Override
- protected SchemaManager schemaManager() {
+ public SchemaManager schemaManager() {
return schemaManager;
}
};
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
index 489c1ba052..32a4138be5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
@@ -54,7 +54,7 @@ public class PrimaryKeyTableColumnTypeFileMetaTest extends
ColumnTypeFileMetaTes
SchemaManager schemaManager = new TestingSchemaManager(tablePath,
tableSchemas);
return new PrimaryKeyFileStoreTable(fileIO, tablePath,
schemaManager.latest().get()) {
@Override
- protected SchemaManager schemaManager() {
+ public SchemaManager schemaManager() {
return schemaManager;
}
};
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index 6b7b28263a..26480cf411 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -281,8 +281,8 @@ public class SnapshotManagerTest {
@Test
public void testTraversalSnapshotsFromLatestSafely() throws IOException,
InterruptedException {
FileIO localFileIO = LocalFileIO.create();
- SnapshotManager snapshotManager =
- new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+ Path path = new Path(tempDir.toString());
+ SnapshotManager snapshotManager = new SnapshotManager(localFileIO,
path);
// create 10 snapshots
for (long i = 0; i < 10; i++) {
Snapshot snapshot =