This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 00e3e789ca [python] Introduce schema cache in SchemaManager
00e3e789ca is described below
commit 00e3e789ca69dbc3670a724ca89d1d3d558fa993
Author: JingsongLi <[email protected]>
AuthorDate: Thu Oct 23 15:31:47 2025 +0800
[python] Introduce schema cache in SchemaManager
---
.../pypaimon/manifest/manifest_file_manager.py | 12 +-------
.../pypaimon/manifest/simple_stats_evolutions.py | 2 --
.../pypaimon/read/scanner/full_starting_scanner.py | 6 +---
paimon-python/pypaimon/read/split_read.py | 34 ++++++----------------
paimon-python/pypaimon/read/table_read.py | 10 ++-----
paimon-python/pypaimon/schema/schema_manager.py | 22 +++++++++-----
6 files changed, 28 insertions(+), 58 deletions(-)
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index 927dad4674..567bba1cbd 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -26,7 +26,6 @@ from pypaimon.manifest.schema.manifest_entry import
(MANIFEST_ENTRY_SCHEMA,
ManifestEntry)
from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
from pypaimon.manifest.schema.simple_stats import SimpleStats
-from pypaimon.schema.table_schema import TableSchema
from pypaimon.table.row.generic_row import (GenericRowDeserializer,
GenericRowSerializer)
from pypaimon.table.row.binary_row import BinaryRow
@@ -44,7 +43,6 @@ class ManifestFileManager:
self.partition_keys_fields = self.table.partition_keys_fields
self.primary_keys_fields = self.table.primary_keys_fields
self.trimmed_primary_keys_fields =
self.table.trimmed_primary_keys_fields
- self.schema_cache = {}
def read_entries_parallel(self, manifest_files: List[ManifestFileMeta],
manifest_entry_filter=None,
drop_stats=True, max_workers=8) ->
List[ManifestEntry]:
@@ -88,7 +86,7 @@ class ManifestFileManager:
null_counts=key_dict['_NULL_COUNTS'],
)
- schema_fields = self._get_schema(file_dict['_SCHEMA_ID']).fields
+ schema_fields =
self.table.schema_manager.get_schema(file_dict['_SCHEMA_ID']).fields
fields = self._get_value_stats_fields(file_dict, schema_fields)
value_dict = dict(file_dict['_VALUE_STATS'])
value_stats = SimpleStats(
@@ -148,14 +146,6 @@ class ManifestFileManager:
fields = [self.table.field_dict[col] for col in
file_dict['_VALUE_STATS_COLS']]
return fields
- def _get_schema(self, schema_id: int) -> TableSchema:
- if schema_id not in self.schema_cache:
- schema = self.table.schema_manager.read_schema(schema_id)
- if schema is None:
- raise ValueError(f"Schema {schema_id} not found")
- self.schema_cache[schema_id] = schema
- return self.schema_cache[schema_id]
-
def write(self, file_name, entries: List[ManifestEntry]):
avro_records = []
for entry in entries:
diff --git a/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
index df417d595b..f7f5946162 100644
--- a/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
+++ b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
@@ -39,12 +39,10 @@ class SimpleStatsEvolutions:
if self.table_schema_id == data_schema_id:
evolution =
SimpleStatsEvolution(self.schema_fields(data_schema_id), None, None)
else:
-
data_fields = self.schema_fields(data_schema_id)
index_cast_mapping =
self._create_index_cast_mapping(self.table_fields, data_fields)
index_mapping = index_cast_mapping.get('index_mapping')
cast_mapping = index_cast_mapping.get('cast_mapping')
-
evolution = SimpleStatsEvolution(data_fields, index_mapping,
cast_mapping)
self.evolutions[data_schema_id] = evolution
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index 44223b761a..36dba3bdd1 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -65,11 +65,7 @@ class FullStartingScanner(StartingScanner):
self.data_evolution =
self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, 'false').lower() ==
'true'
def schema_fields_func(schema_id: int):
- if schema_id not in self.manifest_file_manager.schema_cache:
- schema = self.table.schema_manager.read_schema(schema_id)
- self.manifest_file_manager.schema_cache[schema_id] = schema
- return self.manifest_file_manager.schema_cache[schema_id].fields
if self.manifest_file_manager.schema_cache[
- schema_id] else []
+ return self.table.schema_manager.get_schema(schema_id).fields
self.simple_stats_evolutions = SimpleStatsEvolutions(
schema_fields_func,
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 5c75cf6506..fec59e4a04 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -19,7 +19,7 @@
import os
from abc import ABC, abstractmethod
from functools import partial
-from typing import List, Optional, Tuple, Any, Dict
+from typing import List, Optional, Tuple, Any
from pypaimon.common.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
@@ -46,7 +46,6 @@ from pypaimon.read.reader.key_value_wrap_reader import
KeyValueWrapReader
from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap
from pypaimon.read.split import Split
from pypaimon.schema.data_types import AtomicType, DataField
-from pypaimon.schema.table_schema import TableSchema
KEY_PREFIX = "_KEY_"
KEY_FIELD_ID_START = 1000000
@@ -56,8 +55,7 @@ NULL_FIELD_INDEX = -1
class SplitRead(ABC):
"""Abstract base class for split reading operations."""
- def __init__(self, table, predicate: Optional[Predicate], read_type:
List[DataField], split: Split,
- schema_fields_cache: Dict):
+ def __init__(self, table, predicate: Optional[Predicate], read_type:
List[DataField], split: Split):
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
@@ -71,7 +69,6 @@ class SplitRead(ABC):
self.read_fields = read_type
if isinstance(self, MergeFileSplitRead):
self.read_fields = self._create_key_value_fields(read_type)
- self.schema_fields_cache = schema_fields_cache
def _push_down_predicate(self) -> Any:
if self.predicate is None:
@@ -89,9 +86,14 @@ class SplitRead(ABC):
"""Create a record reader for the given split."""
def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
read_fields: List[str]):
- read_file_fields, file_filter = self._get_schema(file.schema_id,
read_fields)
- if not file_filter:
+ schema = self.table.schema_manager.get_schema(file.schema_id)
+ schema_field_names = set(field.name for field in schema.fields)
+ if self.table.is_primary_key_table:
+ schema_field_names.add('_SEQUENCE_NUMBER')
+ schema_field_names.add('_VALUE_KIND')
+ if self.predicate_fields and self.predicate_fields -
schema_field_names:
return None
+ read_file_fields = [read_field for read_field in read_fields if
read_field in schema_field_names]
file_path = file.file_path
_, extension = os.path.splitext(file_path)
@@ -120,24 +122,6 @@ class SplitRead(ABC):
return DataFileBatchReader(format_reader, index_mapping,
partition_info, None,
self.table.table_schema.fields)
- def _get_schema(self, schema_id: int, read_fields) -> TableSchema:
- if schema_id not in self.schema_fields_cache[0]:
- schema = self.table.schema_manager.read_schema(schema_id)
- if schema is None:
- raise ValueError(f"Schema {schema_id} not found")
- self.schema_fields_cache[0][schema_id] = schema
- schema = self.schema_fields_cache[0][schema_id]
- fields_key = (schema_id, tuple(read_fields))
- if fields_key not in self.schema_fields_cache[1]:
- schema_field_names = set(field.name for field in schema.fields)
- if self.table.is_primary_key_table:
- schema_field_names.add('_SEQUENCE_NUMBER')
- schema_field_names.add('_VALUE_KIND')
- self.schema_fields_cache[1][fields_key] = (
- [read_field for read_field in read_fields if read_field in
schema_field_names],
- False if self.predicate_fields and self.predicate_fields -
schema_field_names else True)
- return self.schema_fields_cache[1][fields_key]
-
@abstractmethod
def _get_all_data_fields(self):
"""Get all data fields"""
diff --git a/paimon-python/pypaimon/read/table_read.py
b/paimon-python/pypaimon/read/table_read.py
index 6cd544e745..cbc80faa28 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -39,7 +39,6 @@ class TableRead:
self.table: FileStoreTable = table
self.predicate = predicate
self.read_type = read_type
- self.schema_fields_cache = ({}, {})
def to_iterator(self, splits: List[Split]) -> Iterator:
def _record_generator():
@@ -135,24 +134,21 @@ class TableRead:
table=self.table,
predicate=self.predicate,
read_type=self.read_type,
- split=split,
- schema_fields_cache=self.schema_fields_cache
+ split=split
)
elif self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED,
'false').lower() == 'true':
return DataEvolutionSplitRead(
table=self.table,
predicate=self.predicate,
read_type=self.read_type,
- split=split,
- schema_fields_cache=self.schema_fields_cache
+ split=split
)
else:
return RawFileSplitRead(
table=self.table,
predicate=self.predicate,
read_type=self.read_type,
- split=split,
- schema_fields_cache=self.schema_fields_cache
+ split=split
)
@staticmethod
diff --git a/paimon-python/pypaimon/schema/schema_manager.py
b/paimon-python/pypaimon/schema/schema_manager.py
index b9c4cdbddc..33c6d4af8b 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -31,6 +31,7 @@ class SchemaManager:
self.file_io = file_io
self.table_path = table_path
self.schema_path = table_path / "schema"
+ self.schema_cache = {}
def latest(self) -> Optional['TableSchema']:
try:
@@ -39,7 +40,7 @@ class SchemaManager:
return None
max_version = max(versions)
- return self.read_schema(max_version)
+ return self.get_schema(max_version)
except Exception as e:
raise RuntimeError(f"Failed to load schema from path:
{self.schema_path}") from e
@@ -57,19 +58,24 @@ class SchemaManager:
def commit(self, new_schema: TableSchema) -> bool:
schema_path = self._to_schema_path(new_schema.id)
try:
- return self.file_io.try_to_write_atomic(schema_path,
JSON.to_json(new_schema, indent=2))
+ result = self.file_io.try_to_write_atomic(schema_path,
JSON.to_json(new_schema, indent=2))
+ if result:
+ self.schema_cache[new_schema.id] = new_schema
+ return result
except Exception as e:
raise RuntimeError(f"Failed to commit schema: {e}") from e
def _to_schema_path(self, schema_id: int) -> Path:
return self.schema_path / f"{self.schema_prefix}{schema_id}"
- def read_schema(self, schema_id: int) -> Optional['TableSchema']:
- schema_path = self._to_schema_path(schema_id)
- if not self.file_io.exists(schema_path):
- return None
-
- return TableSchema.from_path(self.file_io, schema_path)
+ def get_schema(self, schema_id: int) -> Optional[TableSchema]:
+ if schema_id not in self.schema_cache:
+ schema_path = self._to_schema_path(schema_id)
+ if not self.file_io.exists(schema_path):
+ return None
+ schema = TableSchema.from_path(self.file_io, schema_path)
+ self.schema_cache[schema_id] = schema
+ return self.schema_cache[schema_id]
def _list_versioned_files(self) -> List[int]:
if not self.file_io.exists(self.schema_path):