This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 53fbc2d8f3a3a73ce03fa50d0ac3fb7216aed8af
Author: JingsongLi <[email protected]>
AuthorDate: Thu Oct 23 15:31:47 2025 +0800

    [python] Introduce schema cache in SchemaManager
---
 .../pypaimon/manifest/manifest_file_manager.py     | 12 +-------
 .../pypaimon/manifest/simple_stats_evolutions.py   |  2 --
 .../pypaimon/read/scanner/full_starting_scanner.py |  6 +---
 paimon-python/pypaimon/read/split_read.py          | 34 ++++++----------------
 paimon-python/pypaimon/read/table_read.py          | 10 ++-----
 paimon-python/pypaimon/schema/schema_manager.py    | 22 +++++++++-----
 6 files changed, 28 insertions(+), 58 deletions(-)

diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py 
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index 927dad4674..567bba1cbd 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -26,7 +26,6 @@ from pypaimon.manifest.schema.manifest_entry import 
(MANIFEST_ENTRY_SCHEMA,
                                                      ManifestEntry)
 from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
 from pypaimon.manifest.schema.simple_stats import SimpleStats
-from pypaimon.schema.table_schema import TableSchema
 from pypaimon.table.row.generic_row import (GenericRowDeserializer,
                                             GenericRowSerializer)
 from pypaimon.table.row.binary_row import BinaryRow
@@ -44,7 +43,6 @@ class ManifestFileManager:
         self.partition_keys_fields = self.table.partition_keys_fields
         self.primary_keys_fields = self.table.primary_keys_fields
         self.trimmed_primary_keys_fields = 
self.table.trimmed_primary_keys_fields
-        self.schema_cache = {}
 
     def read_entries_parallel(self, manifest_files: List[ManifestFileMeta], 
manifest_entry_filter=None,
                               drop_stats=True, max_workers=8) -> 
List[ManifestEntry]:
@@ -88,7 +86,7 @@ class ManifestFileManager:
                 null_counts=key_dict['_NULL_COUNTS'],
             )
 
-            schema_fields = self._get_schema(file_dict['_SCHEMA_ID']).fields
+            schema_fields = 
self.table.schema_manager.get_schema(file_dict['_SCHEMA_ID']).fields
             fields = self._get_value_stats_fields(file_dict, schema_fields)
             value_dict = dict(file_dict['_VALUE_STATS'])
             value_stats = SimpleStats(
@@ -148,14 +146,6 @@ class ManifestFileManager:
             fields = [self.table.field_dict[col] for col in 
file_dict['_VALUE_STATS_COLS']]
         return fields
 
-    def _get_schema(self, schema_id: int) -> TableSchema:
-        if schema_id not in self.schema_cache:
-            schema = self.table.schema_manager.read_schema(schema_id)
-            if schema is None:
-                raise ValueError(f"Schema {schema_id} not found")
-            self.schema_cache[schema_id] = schema
-        return self.schema_cache[schema_id]
-
     def write(self, file_name, entries: List[ManifestEntry]):
         avro_records = []
         for entry in entries:
diff --git a/paimon-python/pypaimon/manifest/simple_stats_evolutions.py 
b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
index df417d595b..f7f5946162 100644
--- a/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
+++ b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
@@ -39,12 +39,10 @@ class SimpleStatsEvolutions:
         if self.table_schema_id == data_schema_id:
             evolution = 
SimpleStatsEvolution(self.schema_fields(data_schema_id), None, None)
         else:
-
             data_fields = self.schema_fields(data_schema_id)
             index_cast_mapping = 
self._create_index_cast_mapping(self.table_fields, data_fields)
             index_mapping = index_cast_mapping.get('index_mapping')
             cast_mapping = index_cast_mapping.get('cast_mapping')
-
             evolution = SimpleStatsEvolution(data_fields, index_mapping, 
cast_mapping)
 
         self.evolutions[data_schema_id] = evolution
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index 44223b761a..36dba3bdd1 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -65,11 +65,7 @@ class FullStartingScanner(StartingScanner):
         self.data_evolution = 
self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, 'false').lower() == 
'true'
 
         def schema_fields_func(schema_id: int):
-            if schema_id not in self.manifest_file_manager.schema_cache:
-                schema = self.table.schema_manager.read_schema(schema_id)
-                self.manifest_file_manager.schema_cache[schema_id] = schema
-            return self.manifest_file_manager.schema_cache[schema_id].fields 
if self.manifest_file_manager.schema_cache[
-                schema_id] else []
+            return self.table.schema_manager.get_schema(schema_id).fields
 
         self.simple_stats_evolutions = SimpleStatsEvolutions(
             schema_fields_func,
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 5c75cf6506..fec59e4a04 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -19,7 +19,7 @@
 import os
 from abc import ABC, abstractmethod
 from functools import partial
-from typing import List, Optional, Tuple, Any, Dict
+from typing import List, Optional, Tuple, Any
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
@@ -46,7 +46,6 @@ from pypaimon.read.reader.key_value_wrap_reader import 
KeyValueWrapReader
 from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap
 from pypaimon.read.split import Split
 from pypaimon.schema.data_types import AtomicType, DataField
-from pypaimon.schema.table_schema import TableSchema
 
 KEY_PREFIX = "_KEY_"
 KEY_FIELD_ID_START = 1000000
@@ -56,8 +55,7 @@ NULL_FIELD_INDEX = -1
 class SplitRead(ABC):
     """Abstract base class for split reading operations."""
 
-    def __init__(self, table, predicate: Optional[Predicate], read_type: 
List[DataField], split: Split,
-                 schema_fields_cache: Dict):
+    def __init__(self, table, predicate: Optional[Predicate], read_type: 
List[DataField], split: Split):
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.table: FileStoreTable = table
@@ -71,7 +69,6 @@ class SplitRead(ABC):
         self.read_fields = read_type
         if isinstance(self, MergeFileSplitRead):
             self.read_fields = self._create_key_value_fields(read_type)
-        self.schema_fields_cache = schema_fields_cache
 
     def _push_down_predicate(self) -> Any:
         if self.predicate is None:
@@ -89,9 +86,14 @@ class SplitRead(ABC):
         """Create a record reader for the given split."""
 
     def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, 
read_fields: List[str]):
-        read_file_fields, file_filter = self._get_schema(file.schema_id, 
read_fields)
-        if not file_filter:
+        schema = self.table.schema_manager.get_schema(file.schema_id)
+        schema_field_names = set(field.name for field in schema.fields)
+        if self.table.is_primary_key_table:
+            schema_field_names.add('_SEQUENCE_NUMBER')
+            schema_field_names.add('_VALUE_KIND')
+        if self.predicate_fields and self.predicate_fields - 
schema_field_names:
             return None
+        read_file_fields = [read_field for read_field in read_fields if 
read_field in schema_field_names]
 
         file_path = file.file_path
         _, extension = os.path.splitext(file_path)
@@ -120,24 +122,6 @@ class SplitRead(ABC):
             return DataFileBatchReader(format_reader, index_mapping, 
partition_info, None,
                                        self.table.table_schema.fields)
 
-    def _get_schema(self, schema_id: int, read_fields) -> TableSchema:
-        if schema_id not in self.schema_fields_cache[0]:
-            schema = self.table.schema_manager.read_schema(schema_id)
-            if schema is None:
-                raise ValueError(f"Schema {schema_id} not found")
-            self.schema_fields_cache[0][schema_id] = schema
-        schema = self.schema_fields_cache[0][schema_id]
-        fields_key = (schema_id, tuple(read_fields))
-        if fields_key not in self.schema_fields_cache[1]:
-            schema_field_names = set(field.name for field in schema.fields)
-            if self.table.is_primary_key_table:
-                schema_field_names.add('_SEQUENCE_NUMBER')
-                schema_field_names.add('_VALUE_KIND')
-            self.schema_fields_cache[1][fields_key] = (
-                [read_field for read_field in read_fields if read_field in 
schema_field_names],
-                False if self.predicate_fields and self.predicate_fields - 
schema_field_names else True)
-        return self.schema_fields_cache[1][fields_key]
-
     @abstractmethod
     def _get_all_data_fields(self):
         """Get all data fields"""
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index 6cd544e745..cbc80faa28 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -39,7 +39,6 @@ class TableRead:
         self.table: FileStoreTable = table
         self.predicate = predicate
         self.read_type = read_type
-        self.schema_fields_cache = ({}, {})
 
     def to_iterator(self, splits: List[Split]) -> Iterator:
         def _record_generator():
@@ -135,24 +134,21 @@ class TableRead:
                 table=self.table,
                 predicate=self.predicate,
                 read_type=self.read_type,
-                split=split,
-                schema_fields_cache=self.schema_fields_cache
+                split=split
             )
         elif self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, 
'false').lower() == 'true':
             return DataEvolutionSplitRead(
                 table=self.table,
                 predicate=self.predicate,
                 read_type=self.read_type,
-                split=split,
-                schema_fields_cache=self.schema_fields_cache
+                split=split
             )
         else:
             return RawFileSplitRead(
                 table=self.table,
                 predicate=self.predicate,
                 read_type=self.read_type,
-                split=split,
-                schema_fields_cache=self.schema_fields_cache
+                split=split
             )
 
     @staticmethod
diff --git a/paimon-python/pypaimon/schema/schema_manager.py 
b/paimon-python/pypaimon/schema/schema_manager.py
index b9c4cdbddc..33c6d4af8b 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -31,6 +31,7 @@ class SchemaManager:
         self.file_io = file_io
         self.table_path = table_path
         self.schema_path = table_path / "schema"
+        self.schema_cache = {}
 
     def latest(self) -> Optional['TableSchema']:
         try:
@@ -39,7 +40,7 @@ class SchemaManager:
                 return None
 
             max_version = max(versions)
-            return self.read_schema(max_version)
+            return self.get_schema(max_version)
         except Exception as e:
             raise RuntimeError(f"Failed to load schema from path: 
{self.schema_path}") from e
 
@@ -57,19 +58,24 @@ class SchemaManager:
     def commit(self, new_schema: TableSchema) -> bool:
         schema_path = self._to_schema_path(new_schema.id)
         try:
-            return self.file_io.try_to_write_atomic(schema_path, 
JSON.to_json(new_schema, indent=2))
+            result = self.file_io.try_to_write_atomic(schema_path, 
JSON.to_json(new_schema, indent=2))
+            if result:
+                self.schema_cache[new_schema.id] = new_schema
+            return result
         except Exception as e:
             raise RuntimeError(f"Failed to commit schema: {e}") from e
 
     def _to_schema_path(self, schema_id: int) -> Path:
         return self.schema_path / f"{self.schema_prefix}{schema_id}"
 
-    def read_schema(self, schema_id: int) -> Optional['TableSchema']:
-        schema_path = self._to_schema_path(schema_id)
-        if not self.file_io.exists(schema_path):
-            return None
-
-        return TableSchema.from_path(self.file_io, schema_path)
+    def get_schema(self, schema_id: int) -> Optional[TableSchema]:
+        if schema_id not in self.schema_cache:
+            schema_path = self._to_schema_path(schema_id)
+            if not self.file_io.exists(schema_path):
+                return None
+            schema = TableSchema.from_path(self.file_io, schema_path)
+            self.schema_cache[schema_id] = schema
+        return self.schema_cache[schema_id]
 
     def _list_versioned_files(self) -> List[int]:
         if not self.file_io.exists(self.schema_path):

Reply via email to