This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 53fbc2d8f3a3a73ce03fa50d0ac3fb7216aed8af Author: JingsongLi <[email protected]> AuthorDate: Thu Oct 23 15:31:47 2025 +0800 [python] Introduce schema cache in SchemaManager --- .../pypaimon/manifest/manifest_file_manager.py | 12 +------- .../pypaimon/manifest/simple_stats_evolutions.py | 2 -- .../pypaimon/read/scanner/full_starting_scanner.py | 6 +--- paimon-python/pypaimon/read/split_read.py | 34 ++++++---------------- paimon-python/pypaimon/read/table_read.py | 10 ++----- paimon-python/pypaimon/schema/schema_manager.py | 22 +++++++++----- 6 files changed, 28 insertions(+), 58 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index 927dad4674..567bba1cbd 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -26,7 +26,6 @@ from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA, ManifestEntry) from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta from pypaimon.manifest.schema.simple_stats import SimpleStats -from pypaimon.schema.table_schema import TableSchema from pypaimon.table.row.generic_row import (GenericRowDeserializer, GenericRowSerializer) from pypaimon.table.row.binary_row import BinaryRow @@ -44,7 +43,6 @@ class ManifestFileManager: self.partition_keys_fields = self.table.partition_keys_fields self.primary_keys_fields = self.table.primary_keys_fields self.trimmed_primary_keys_fields = self.table.trimmed_primary_keys_fields - self.schema_cache = {} def read_entries_parallel(self, manifest_files: List[ManifestFileMeta], manifest_entry_filter=None, drop_stats=True, max_workers=8) -> List[ManifestEntry]: @@ -88,7 +86,7 @@ class ManifestFileManager: null_counts=key_dict['_NULL_COUNTS'], ) - schema_fields = self._get_schema(file_dict['_SCHEMA_ID']).fields + schema_fields = self.table.schema_manager.get_schema(file_dict['_SCHEMA_ID']).fields fields = self._get_value_stats_fields(file_dict, schema_fields) value_dict = dict(file_dict['_VALUE_STATS']) value_stats = SimpleStats( @@ -148,14 +146,6 @@ class ManifestFileManager: fields = [self.table.field_dict[col] for col in file_dict['_VALUE_STATS_COLS']] return fields - def _get_schema(self, schema_id: int) -> TableSchema: - if schema_id not in self.schema_cache: - schema = self.table.schema_manager.read_schema(schema_id) - if schema is None: - raise ValueError(f"Schema {schema_id} not found") - self.schema_cache[schema_id] = schema - return self.schema_cache[schema_id] - def write(self, file_name, entries: List[ManifestEntry]): avro_records = [] for entry in entries: diff --git a/paimon-python/pypaimon/manifest/simple_stats_evolutions.py b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py index df417d595b..f7f5946162 100644 --- a/paimon-python/pypaimon/manifest/simple_stats_evolutions.py +++ b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py @@ -39,12 +39,10 @@ class SimpleStatsEvolutions: if self.table_schema_id == data_schema_id: evolution = SimpleStatsEvolution(self.schema_fields(data_schema_id), None, None) else: - data_fields = self.schema_fields(data_schema_id) index_cast_mapping = self._create_index_cast_mapping(self.table_fields, data_fields) index_mapping = index_cast_mapping.get('index_mapping') cast_mapping = index_cast_mapping.get('cast_mapping') - evolution = SimpleStatsEvolution(data_fields, index_mapping, cast_mapping) self.evolutions[data_schema_id] = evolution diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py index 44223b761a..36dba3bdd1 100644 --- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -65,11 +65,7 @@ class FullStartingScanner(StartingScanner): self.data_evolution = self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, 'false').lower() == 'true' def schema_fields_func(schema_id: int): - if schema_id not in self.manifest_file_manager.schema_cache: - schema = self.table.schema_manager.read_schema(schema_id) - self.manifest_file_manager.schema_cache[schema_id] = schema - return self.manifest_file_manager.schema_cache[schema_id].fields if self.manifest_file_manager.schema_cache[ - schema_id] else [] + return self.table.schema_manager.get_schema(schema_id).fields self.simple_stats_evolutions = SimpleStatsEvolutions( schema_fields_func, diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 5c75cf6506..fec59e4a04 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -19,7 +19,7 @@ import os from abc import ABC, abstractmethod from functools import partial -from typing import List, Optional, Tuple, Any, Dict +from typing import List, Optional, Tuple, Any from pypaimon.common.core_options import CoreOptions from pypaimon.common.predicate import Predicate @@ -46,7 +46,6 @@ from pypaimon.read.reader.key_value_wrap_reader import KeyValueWrapReader from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap from pypaimon.read.split import Split from pypaimon.schema.data_types import AtomicType, DataField -from pypaimon.schema.table_schema import TableSchema KEY_PREFIX = "_KEY_" KEY_FIELD_ID_START = 1000000 @@ -56,8 +55,7 @@ NULL_FIELD_INDEX = -1 class SplitRead(ABC): """Abstract base class for split reading operations.""" - def __init__(self, table, predicate: Optional[Predicate], read_type: List[DataField], split: Split, - schema_fields_cache: Dict): + def __init__(self, table, predicate: Optional[Predicate], read_type: List[DataField], split: Split): from pypaimon.table.file_store_table import FileStoreTable self.table: FileStoreTable = table @@ -71,7 +69,6 @@ class SplitRead(ABC): self.read_fields = read_type if isinstance(self, MergeFileSplitRead): self.read_fields = self._create_key_value_fields(read_type) - self.schema_fields_cache = schema_fields_cache def _push_down_predicate(self) -> Any: if self.predicate is None: @@ -89,9 +86,14 @@ class SplitRead(ABC): """Create a record reader for the given split.""" def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, read_fields: List[str]): - read_file_fields, file_filter = self._get_schema(file.schema_id, read_fields) - if not file_filter: + schema = self.table.schema_manager.get_schema(file.schema_id) + schema_field_names = set(field.name for field in schema.fields) + if self.table.is_primary_key_table: + schema_field_names.add('_SEQUENCE_NUMBER') + schema_field_names.add('_VALUE_KIND') + if self.predicate_fields and self.predicate_fields - schema_field_names: return None + read_file_fields = [read_field for read_field in read_fields if read_field in schema_field_names] file_path = file.file_path _, extension = os.path.splitext(file_path) @@ -120,24 +122,6 @@ class SplitRead(ABC): return DataFileBatchReader(format_reader, index_mapping, partition_info, None, self.table.table_schema.fields) - def _get_schema(self, schema_id: int, read_fields) -> TableSchema: - if schema_id not in self.schema_fields_cache[0]: - schema = self.table.schema_manager.read_schema(schema_id) - if schema is None: - raise ValueError(f"Schema {schema_id} not found") - self.schema_fields_cache[0][schema_id] = schema - schema = self.schema_fields_cache[0][schema_id] - fields_key = (schema_id, tuple(read_fields)) - if fields_key not in self.schema_fields_cache[1]: - schema_field_names = set(field.name for field in schema.fields) - if self.table.is_primary_key_table: - schema_field_names.add('_SEQUENCE_NUMBER') - schema_field_names.add('_VALUE_KIND') - self.schema_fields_cache[1][fields_key] = ( - [read_field for read_field in read_fields if read_field in schema_field_names], - False if self.predicate_fields and self.predicate_fields - schema_field_names else True) - return self.schema_fields_cache[1][fields_key] - @abstractmethod def _get_all_data_fields(self): """Get all data fields""" diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index 6cd544e745..cbc80faa28 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -39,7 +39,6 @@ class TableRead: self.table: FileStoreTable = table self.predicate = predicate self.read_type = read_type - self.schema_fields_cache = ({}, {}) def to_iterator(self, splits: List[Split]) -> Iterator: def _record_generator(): @@ -135,24 +134,21 @@ class TableRead: table=self.table, predicate=self.predicate, read_type=self.read_type, - split=split, - schema_fields_cache=self.schema_fields_cache + split=split ) elif self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, 'false').lower() == 'true': return DataEvolutionSplitRead( table=self.table, predicate=self.predicate, read_type=self.read_type, - split=split, - schema_fields_cache=self.schema_fields_cache + split=split ) else: return RawFileSplitRead( table=self.table, predicate=self.predicate, read_type=self.read_type, - split=split, - schema_fields_cache=self.schema_fields_cache + split=split ) @staticmethod diff --git a/paimon-python/pypaimon/schema/schema_manager.py b/paimon-python/pypaimon/schema/schema_manager.py index b9c4cdbddc..33c6d4af8b 100644 --- a/paimon-python/pypaimon/schema/schema_manager.py +++ b/paimon-python/pypaimon/schema/schema_manager.py @@ -31,6 +31,7 @@ class SchemaManager: self.file_io = file_io self.table_path = table_path self.schema_path = table_path / "schema" + self.schema_cache = {} def latest(self) -> Optional['TableSchema']: try: @@ -39,7 +40,7 @@ class SchemaManager: return None max_version = max(versions) - return self.read_schema(max_version) + return self.get_schema(max_version) except Exception as e: raise RuntimeError(f"Failed to load schema from path: {self.schema_path}") from e @@ -57,19 +58,24 @@ class SchemaManager: def commit(self, new_schema: TableSchema) -> bool: schema_path = self._to_schema_path(new_schema.id) try: - return self.file_io.try_to_write_atomic(schema_path, JSON.to_json(new_schema, indent=2)) + result = self.file_io.try_to_write_atomic(schema_path, JSON.to_json(new_schema, indent=2)) + if result: + self.schema_cache[new_schema.id] = new_schema + return result except Exception as e: raise RuntimeError(f"Failed to commit schema: {e}") from e def _to_schema_path(self, schema_id: int) -> Path: return self.schema_path / f"{self.schema_prefix}{schema_id}" - def read_schema(self, schema_id: int) -> Optional['TableSchema']: - schema_path = self._to_schema_path(schema_id) - if not self.file_io.exists(schema_path): - return None - - return TableSchema.from_path(self.file_io, schema_path) + def get_schema(self, schema_id: int) -> Optional[TableSchema]: + if schema_id not in self.schema_cache: + schema_path = self._to_schema_path(schema_id) + if not self.file_io.exists(schema_path): + return None + schema = TableSchema.from_path(self.file_io, schema_path) + self.schema_cache[schema_id] = schema + return self.schema_cache[schema_id] def _list_versioned_files(self) -> List[int]: if not self.file_io.exists(self.schema_path):
