This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ddff15d59 [Python] Support scan and plan for PyPaimon (#5996)
2ddff15d59 is described below

commit 2ddff15d594d35a77d0d8b6df6563afebb15d670
Author: ChengHui Chen <[email protected]>
AuthorDate: Thu Jul 31 15:24:58 2025 +0800

    [Python] Support scan and plan for PyPaimon (#5996)
---
 paimon-python/pypaimon/api/__init__.py             |   2 +-
 paimon-python/pypaimon/api/api_resquest.py         |   2 +-
 paimon-python/pypaimon/catalog/catalog_factory.py  |   2 +-
 paimon-python/pypaimon/catalog/catalog_utils.py    |   2 +-
 .../pypaimon/{ => catalog}/rest/__init__.py        |   0
 .../pypaimon/{ => catalog}/rest/rest_catalog.py    |   4 +-
 .../{ => catalog}/rest/rest_token_file_io.py       |   2 +-
 .../pypaimon/{api => common}/identifier.py         |   0
 paimon-python/pypaimon/common/predicate.py         |  77 ++++++
 paimon-python/pypaimon/common/predicate_builder.py | 120 +++++++++
 .../catalog_factory.py => read/__init__.py}        |  21 --
 paimon-python/pypaimon/read/interval_partition.py  | 130 +++++++++
 .../catalog_factory.py => read/partition_info.py}  |  50 ++--
 .../{catalog/catalog_factory.py => read/plan.py}   |  27 +-
 paimon-python/pypaimon/read/read_builder.py        |  75 ++++++
 .../{catalog/catalog_factory.py => read/split.py}  |  43 +--
 .../catalog_factory.py => read/table_read.py}      |  22 +-
 paimon-python/pypaimon/read/table_scan.py          | 291 +++++++++++++++++++++
 .../pypaimon/table/catalog_environment.py          |   2 +-
 paimon-python/pypaimon/table/file_store_table.py   |   2 +-
 paimon-python/pypaimon/tests/api_test.py           |   2 +-
 paimon-python/pypaimon/tests/rest_catalog_test.py  |   2 +-
 22 files changed, 771 insertions(+), 107 deletions(-)

diff --git a/paimon-python/pypaimon/api/__init__.py 
b/paimon-python/pypaimon/api/__init__.py
index 8e05df6c01..509c4d91a2 100644
--- a/paimon-python/pypaimon/api/__init__.py
+++ b/paimon-python/pypaimon/api/__init__.py
@@ -33,7 +33,7 @@ from pypaimon.api.api_resquest import CreateDatabaseRequest, 
AlterDatabaseReques
     CreateTableRequest
 from pypaimon.api.config import CatalogOptions
 from pypaimon.api.client import HttpClient
-from pypaimon.api.identifier import Identifier
+from pypaimon.common.identifier import Identifier
 from pypaimon.api.typedef import T
 from pypaimon.schema.schema import Schema
 
diff --git a/paimon-python/pypaimon/api/api_resquest.py 
b/paimon-python/pypaimon/api/api_resquest.py
index dfc517ae89..8c1628120e 100644
--- a/paimon-python/pypaimon/api/api_resquest.py
+++ b/paimon-python/pypaimon/api/api_resquest.py
@@ -20,7 +20,7 @@ from abc import ABC
 from dataclasses import dataclass
 from typing import Dict, List
 
-from .identifier import Identifier
+from pypaimon.common.identifier import Identifier
 from pypaimon.common.rest_json import json_field
 from ..schema.schema import Schema
 
diff --git a/paimon-python/pypaimon/catalog/catalog_factory.py 
b/paimon-python/pypaimon/catalog/catalog_factory.py
index b5ec706ae5..8ae17836d4 100644
--- a/paimon-python/pypaimon/catalog/catalog_factory.py
+++ b/paimon-python/pypaimon/catalog/catalog_factory.py
@@ -18,7 +18,7 @@
 from pypaimon import Catalog
 from pypaimon.api import CatalogOptions
 from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
-from pypaimon.rest.rest_catalog import RESTCatalog
+from pypaimon.catalog.rest.rest_catalog import RESTCatalog
 
 
 class CatalogFactory:
diff --git a/paimon-python/pypaimon/catalog/catalog_utils.py 
b/paimon-python/pypaimon/catalog/catalog_utils.py
index d9877ce53a..2bae27aa4e 100644
--- a/paimon-python/pypaimon/catalog/catalog_utils.py
+++ b/paimon-python/pypaimon/catalog/catalog_utils.py
@@ -19,7 +19,7 @@ from pathlib import Path
 from typing import Callable, Any
 
 from pypaimon.api.core_options import CoreOptions
-from pypaimon.api.identifier import Identifier
+from pypaimon.common.identifier import Identifier
 
 from pypaimon.catalog.table_metadata import TableMetadata
 from pypaimon.table.catalog_environment import CatalogEnvironment
diff --git a/paimon-python/pypaimon/rest/__init__.py 
b/paimon-python/pypaimon/catalog/rest/__init__.py
similarity index 100%
rename from paimon-python/pypaimon/rest/__init__.py
rename to paimon-python/pypaimon/catalog/rest/__init__.py
diff --git a/paimon-python/pypaimon/rest/rest_catalog.py 
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
similarity index 97%
rename from paimon-python/pypaimon/rest/rest_catalog.py
rename to paimon-python/pypaimon/catalog/rest/rest_catalog.py
index 32b5c5c7ad..ab08685840 100644
--- a/paimon-python/pypaimon/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -22,7 +22,7 @@ from pypaimon import Database, Catalog, Schema
 from pypaimon.api import RESTApi, CatalogOptions
 from pypaimon.api.api_response import PagedList, GetTableResponse
 from pypaimon.api.core_options import CoreOptions
-from pypaimon.api.identifier import Identifier
+from pypaimon.common.identifier import Identifier
 from pypaimon.api.options import Options
 
 
@@ -30,7 +30,7 @@ from pypaimon.catalog.catalog_context import CatalogContext
 from pypaimon.catalog.catalog_utils import CatalogUtils
 from pypaimon.catalog.property_change import PropertyChange
 from pypaimon.catalog.table_metadata import TableMetadata
-from pypaimon.rest.rest_token_file_io import RESTTokenFileIO
+from pypaimon.catalog.rest.rest_token_file_io import RESTTokenFileIO
 from pypaimon.table.file_store_table import FileStoreTable
 
 
diff --git a/paimon-python/pypaimon/rest/rest_token_file_io.py 
b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
similarity index 96%
rename from paimon-python/pypaimon/rest/rest_token_file_io.py
rename to paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
index 5b179b95f0..283225d507 100644
--- a/paimon-python/pypaimon/rest/rest_token_file_io.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
@@ -18,7 +18,7 @@ limitations under the License.
 from pathlib import Path
 from typing import Optional
 
-from pypaimon.api.identifier import Identifier
+from pypaimon.common.identifier import Identifier
 from pypaimon.common.file_io import FileIO
 
 
diff --git a/paimon-python/pypaimon/api/identifier.py 
b/paimon-python/pypaimon/common/identifier.py
similarity index 100%
rename from paimon-python/pypaimon/api/identifier.py
rename to paimon-python/pypaimon/common/identifier.py
diff --git a/paimon-python/pypaimon/common/predicate.py 
b/paimon-python/pypaimon/common/predicate.py
new file mode 100644
index 0000000000..82d792290a
--- /dev/null
+++ b/paimon-python/pypaimon/common/predicate.py
@@ -0,0 +1,77 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from dataclasses import dataclass
+from typing import Any, List, Optional
+
+from pypaimon.table.row.internal_row import InternalRow
+
+
+@dataclass
+class Predicate:
+    method: str
+    index: Optional[int]
+    field: str | None
+    literals: Optional[List[Any]] = None
+
+    def test(self, record: InternalRow) -> bool:
+        if self.method == 'equal':
+            return record.get_field(self.index) == self.literals[0]
+        elif self.method == 'notEqual':
+            return record.get_field(self.index) != self.literals[0]
+        elif self.method == 'lessThan':
+            return record.get_field(self.index) < self.literals[0]
+        elif self.method == 'lessOrEqual':
+            return record.get_field(self.index) <= self.literals[0]
+        elif self.method == 'greaterThan':
+            return record.get_field(self.index) > self.literals[0]
+        elif self.method == 'greaterOrEqual':
+            return record.get_field(self.index) >= self.literals[0]
+        elif self.method == 'isNull':
+            return record.get_field(self.index) is None
+        elif self.method == 'isNotNull':
+            return record.get_field(self.index) is not None
+        elif self.method == 'startsWith':
+            field_value = record.get_field(self.index)
+            if not isinstance(field_value, str):
+                return False
+            return field_value.startswith(self.literals[0])
+        elif self.method == 'endsWith':
+            field_value = record.get_field(self.index)
+            if not isinstance(field_value, str):
+                return False
+            return field_value.endswith(self.literals[0])
+        elif self.method == 'contains':
+            field_value = record.get_field(self.index)
+            if not isinstance(field_value, str):
+                return False
+            return self.literals[0] in field_value
+        elif self.method == 'in':
+            return record.get_field(self.index) in self.literals
+        elif self.method == 'notIn':
+            return record.get_field(self.index) not in self.literals
+        elif self.method == 'between':
+            field_value = record.get_field(self.index)
+            return self.literals[0] <= field_value <= self.literals[1]
+        elif self.method == 'and':
+            return all(p.test(record) for p in self.literals)
+        elif self.method == 'or':
+            t = any(p.test(record) for p in self.literals)
+            return t
+        else:
+            raise ValueError(f"Unsupported predicate method: {self.method}")
diff --git a/paimon-python/pypaimon/common/predicate_builder.py 
b/paimon-python/pypaimon/common/predicate_builder.py
new file mode 100644
index 0000000000..14ba39f801
--- /dev/null
+++ b/paimon-python/pypaimon/common/predicate_builder.py
@@ -0,0 +1,120 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import List, Any, Optional
+
+from pypaimon.common.predicate import Predicate
+from pypaimon.schema.data_types import DataField
+
+
+class PredicateBuilder:
+    """Implementation of PredicateBuilder using Predicate."""
+
+    def __init__(self, row_field: List[DataField]):
+        self.field_names = [field.name for field in row_field]
+
+    def _get_field_index(self, field: str) -> int:
+        """Get the index of a field in the schema."""
+        try:
+            return self.field_names.index(field)
+        except ValueError:
+            raise ValueError(f'The field {field} is not in field list 
{self.field_names}.')
+
+    def _build_predicate(self, method: str, field: str, literals: 
Optional[List[Any]] = None) -> Predicate:
+        """Build a predicate with the given method, field, and literals."""
+        index = self._get_field_index(field)
+        return Predicate(
+            method=method,
+            index=index,
+            field=field,
+            literals=literals
+        )
+
+    def equal(self, field: str, literal: Any) -> Predicate:
+        """Create an equality predicate."""
+        return self._build_predicate('equal', field, [literal])
+
+    def not_equal(self, field: str, literal: Any) -> Predicate:
+        """Create a not-equal predicate."""
+        return self._build_predicate('notEqual', field, [literal])
+
+    def less_than(self, field: str, literal: Any) -> Predicate:
+        """Create a less-than predicate."""
+        return self._build_predicate('lessThan', field, [literal])
+
+    def less_or_equal(self, field: str, literal: Any) -> Predicate:
+        """Create a less-or-equal predicate."""
+        return self._build_predicate('lessOrEqual', field, [literal])
+
+    def greater_than(self, field: str, literal: Any) -> Predicate:
+        """Create a greater-than predicate."""
+        return self._build_predicate('greaterThan', field, [literal])
+
+    def greater_or_equal(self, field: str, literal: Any) -> Predicate:
+        """Create a greater-or-equal predicate."""
+        return self._build_predicate('greaterOrEqual', field, [literal])
+
+    def is_null(self, field: str) -> Predicate:
+        """Create an is-null predicate."""
+        return self._build_predicate('isNull', field)
+
+    def is_not_null(self, field: str) -> Predicate:
+        """Create an is-not-null predicate."""
+        return self._build_predicate('isNotNull', field)
+
+    def startswith(self, field: str, pattern_literal: Any) -> Predicate:
+        """Create a starts-with predicate."""
+        return self._build_predicate('startsWith', field, [pattern_literal])
+
+    def endswith(self, field: str, pattern_literal: Any) -> Predicate:
+        """Create an ends-with predicate."""
+        return self._build_predicate('endsWith', field, [pattern_literal])
+
+    def contains(self, field: str, pattern_literal: Any) -> Predicate:
+        """Create a contains predicate."""
+        return self._build_predicate('contains', field, [pattern_literal])
+
+    def is_in(self, field: str, literals: List[Any]) -> Predicate:
+        """Create an in predicate."""
+        return self._build_predicate('in', field, literals)
+
+    def is_not_in(self, field: str, literals: List[Any]) -> Predicate:
+        """Create a not-in predicate."""
+        return self._build_predicate('notIn', field, literals)
+
+    def between(self, field: str, included_lower_bound: Any, 
included_upper_bound: Any) -> Predicate:
+        """Create a between predicate."""
+        return self._build_predicate('between', field, [included_lower_bound, 
included_upper_bound])
+
+    def and_predicates(self, predicates: List[Predicate]) -> Predicate:
+        """Create an AND predicate from multiple predicates."""
+        return Predicate(
+            method='and',
+            index=None,
+            field=None,
+            literals=predicates
+        )
+
+    def or_predicates(self, predicates: List[Predicate]) -> Predicate:
+        """Create an OR predicate from multiple predicates."""
+        return Predicate(
+            method='or',
+            index=None,
+            field=None,
+            literals=predicates
+        )
diff --git a/paimon-python/pypaimon/catalog/catalog_factory.py 
b/paimon-python/pypaimon/read/__init__.py
similarity index 54%
copy from paimon-python/pypaimon/catalog/catalog_factory.py
copy to paimon-python/pypaimon/read/__init__.py
index b5ec706ae5..65b48d4d79 100644
--- a/paimon-python/pypaimon/catalog/catalog_factory.py
+++ b/paimon-python/pypaimon/read/__init__.py
@@ -15,24 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from pypaimon import Catalog
-from pypaimon.api import CatalogOptions
-from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
-from pypaimon.rest.rest_catalog import RESTCatalog
-
-
-class CatalogFactory:
-
-    CATALOG_REGISTRY = {
-        "filesystem": FileSystemCatalog,
-        "rest": RESTCatalog,
-    }
-
-    @staticmethod
-    def create(catalog_options: dict) -> Catalog:
-        identifier = catalog_options.get(CatalogOptions.METASTORE, 
"filesystem")
-        catalog_class = CatalogFactory.CATALOG_REGISTRY.get(identifier)
-        if catalog_class is None:
-            raise ValueError(f"Unknown catalog identifier: {identifier}. "
-                             f"Available types: 
{list(CatalogFactory.CATALOG_REGISTRY.keys())}")
-        return catalog_class(catalog_options)
diff --git a/paimon-python/pypaimon/read/interval_partition.py 
b/paimon-python/pypaimon/read/interval_partition.py
new file mode 100644
index 0000000000..7261383196
--- /dev/null
+++ b/paimon-python/pypaimon/read/interval_partition.py
@@ -0,0 +1,130 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import heapq
+from dataclasses import dataclass
+from functools import cmp_to_key
+from typing import List, Callable
+
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.table.row.binary_row import BinaryRow
+
+
+@dataclass
+class SortedRun:
+    """
+    A SortedRun is a list of files sorted by their keys.
+    The key intervals [minKey, maxKey] of these files do not overlap.
+    """
+    files: List[DataFileMeta]
+
+
+class IntervalPartition:
+    """
+    Algorithm to partition several data files into the minimum number of 
SortedRuns.
+    """
+
+    def __init__(self, input_files: List[DataFileMeta]):
+        self.files = input_files.copy()
+        self.key_comparator = default_key_comparator
+        self.files.sort(key=cmp_to_key(self._compare_files))
+
+    def partition(self) -> List[List[SortedRun]]:
+        result = []
+        section: List[DataFileMeta] = []
+        bound = None
+
+        for meta in self.files:
+            if section and self.key_comparator(meta.min_key, bound) > 0:
+                result.append(self._partition_section(section))
+                section.clear()
+                bound = None
+            section.append(meta)
+            if bound is None or self.key_comparator(meta.max_key, bound) > 0:
+                bound = meta.max_key
+
+        if section:
+            result.append(self._partition_section(section))
+        return result
+
+    def _partition_section(self, metas: List[DataFileMeta]) -> List[SortedRun]:
+        heap: List[HeapRun] = []
+        first_run = [metas[0]]
+        heapq.heappush(heap, HeapRun(first_run, self.key_comparator))
+        for i in range(1, len(metas)):
+            meta = metas[i]
+
+            earliest_finishing_run = heap[0]
+            last_max_key = earliest_finishing_run.run[-1].max_key
+            if self.key_comparator(meta.min_key, last_max_key) > 0:
+                top = heapq.heappop(heap)
+                top.run.append(meta)
+                heapq.heappush(heap, top)
+            else:
+                new_run = [meta]
+                heapq.heappush(heap, HeapRun(new_run, self.key_comparator))
+
+        return [SortedRun(files=h.run) for h in heap]
+
+    def _compare_files(self, f1: DataFileMeta, f2: DataFileMeta) -> int:
+        min_key_cmp = self.key_comparator(f1.min_key, f2.min_key)
+        if min_key_cmp != 0:
+            return min_key_cmp
+        return self.key_comparator(f1.max_key, f2.max_key)
+
+
+@dataclass
+class HeapRun:
+    run: List[DataFileMeta]
+    comparator: Callable[[BinaryRow, BinaryRow], int]
+
+    def __lt__(self, other) -> bool:
+        my_last_max = self.run[-1].max_key
+        other_last_max = other.run[-1].max_key
+        return self.comparator(my_last_max, other_last_max) < 0
+
+
+def default_key_comparator(key1: BinaryRow, key2: BinaryRow) -> int:
+    if not key1 or not key1.values:
+        if not key2 or not key2.values:
+            return 0
+        return -1
+    if not key2 or not key2.values:
+        return 1
+
+    min_field_count = min(len(key1.values), len(key2.values))
+    for i in range(min_field_count):
+        val1 = key1.values[i]
+        val2 = key2.values[i]
+        if val1 is None and val2 is None:
+            continue
+        if val1 is None:
+            return -1
+        if val2 is None:
+            return 1
+        if val1 < val2:
+            return -1
+        elif val1 > val2:
+            return 1
+
+    if len(key1.values) < len(key2.values):
+        return -1
+    elif len(key1.values) > len(key2.values):
+        return 1
+    else:
+        return 0
diff --git a/paimon-python/pypaimon/catalog/catalog_factory.py 
b/paimon-python/pypaimon/read/partition_info.py
similarity index 52%
copy from paimon-python/pypaimon/catalog/catalog_factory.py
copy to paimon-python/pypaimon/read/partition_info.py
index b5ec706ae5..e07b37a619 100644
--- a/paimon-python/pypaimon/catalog/catalog_factory.py
+++ b/paimon-python/pypaimon/read/partition_info.py
@@ -15,24 +15,32 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from pypaimon import Catalog
-from pypaimon.api import CatalogOptions
-from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
-from pypaimon.rest.rest_catalog import RESTCatalog
-
-
-class CatalogFactory:
-
-    CATALOG_REGISTRY = {
-        "filesystem": FileSystemCatalog,
-        "rest": RESTCatalog,
-    }
-
-    @staticmethod
-    def create(catalog_options: dict) -> Catalog:
-        identifier = catalog_options.get(CatalogOptions.METASTORE, 
"filesystem")
-        catalog_class = CatalogFactory.CATALOG_REGISTRY.get(identifier)
-        if catalog_class is None:
-            raise ValueError(f"Unknown catalog identifier: {identifier}. "
-                             f"Available types: 
{list(CatalogFactory.CATALOG_REGISTRY.keys())}")
-        return catalog_class(catalog_options)
+
+from typing import Any, List
+
+from pypaimon.schema.data_types import DataField
+from pypaimon.table.row.binary_row import BinaryRow
+
+
+class PartitionInfo:
+    """
+    Partition information about how the row mapping of outer row.
+    """
+
+    def __init__(self, mapping: List[int], partition: BinaryRow):
+        self.mapping = mapping
+        self.partition_values = partition.values
+        self.partition_fields = partition.fields
+
+    def size(self) -> int:
+        return len(self.mapping) - 1
+
+    def is_partition_row(self, pos: int) -> bool:
+        return self.mapping[pos] < 0
+
+    def get_real_index(self, pos: int) -> int:
+        return abs(self.mapping[pos]) - 1
+
+    def get_partition_value(self, pos: int) -> (Any, DataField):
+        real_index = self.get_real_index(pos)
+        return self.partition_values[real_index], 
self.partition_fields[real_index]
diff --git a/paimon-python/pypaimon/catalog/catalog_factory.py 
b/paimon-python/pypaimon/read/plan.py
similarity index 55%
copy from paimon-python/pypaimon/catalog/catalog_factory.py
copy to paimon-python/pypaimon/read/plan.py
index b5ec706ae5..c3aeaa8a54 100644
--- a/paimon-python/pypaimon/catalog/catalog_factory.py
+++ b/paimon-python/pypaimon/read/plan.py
@@ -15,24 +15,17 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from pypaimon import Catalog
-from pypaimon.api import CatalogOptions
-from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
-from pypaimon.rest.rest_catalog import RESTCatalog
 
+from dataclasses import dataclass
+from typing import List
 
-class CatalogFactory:
+from pypaimon.read.split import Split
 
-    CATALOG_REGISTRY = {
-        "filesystem": FileSystemCatalog,
-        "rest": RESTCatalog,
-    }
 
-    @staticmethod
-    def create(catalog_options: dict) -> Catalog:
-        identifier = catalog_options.get(CatalogOptions.METASTORE, 
"filesystem")
-        catalog_class = CatalogFactory.CATALOG_REGISTRY.get(identifier)
-        if catalog_class is None:
-            raise ValueError(f"Unknown catalog identifier: {identifier}. "
-                             f"Available types: 
{list(CatalogFactory.CATALOG_REGISTRY.keys())}")
-        return catalog_class(catalog_options)
+@dataclass
+class Plan:
+    """Implementation of Plan for native Python reading."""
+    _splits: List[Split]
+
+    def splits(self) -> List[Split]:
+        return self._splits
diff --git a/paimon-python/pypaimon/read/read_builder.py 
b/paimon-python/pypaimon/read/read_builder.py
new file mode 100644
index 0000000000..604eff9ff1
--- /dev/null
+++ b/paimon-python/pypaimon/read/read_builder.py
@@ -0,0 +1,75 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import List, Optional
+
+from pypaimon.common.predicate import Predicate
+from pypaimon.common.predicate_builder import PredicateBuilder
+from pypaimon.read.table_read import TableRead
+from pypaimon.read.table_scan import TableScan
+from pypaimon.schema.data_types import DataField
+from pypaimon.table.file_store_table import FileStoreTable
+
+
+class ReadBuilder:
+    """Implementation of ReadBuilder for native Python reading."""
+
+    def __init__(self, table: FileStoreTable):
+        self.table = table
+        self._predicate: Optional[Predicate] = None
+        self._projection: Optional[List[str]] = None
+        self._limit: Optional[int] = None
+
+    def with_filter(self, predicate: Predicate) -> 'ReadBuilder':
+        self._predicate = predicate
+        return self
+
+    def with_projection(self, projection: List[str]) -> 'ReadBuilder':
+        self._projection = projection
+        return self
+
+    def with_limit(self, limit: int) -> 'ReadBuilder':
+        self._limit = limit
+        return self
+
+    def new_scan(self) -> TableScan:
+        return TableScan(
+            table=self.table,
+            predicate=self._predicate,
+            limit=self._limit,
+            read_type=self.read_type()
+        )
+
+    def new_read(self) -> TableRead:
+        return TableRead(
+            table=self.table,
+            predicate=self._predicate,
+            read_type=self.read_type()
+        )
+
+    def new_predicate_builder(self) -> PredicateBuilder:
+        return PredicateBuilder(self.read_type())
+
+    def read_type(self) -> List[DataField]:
+        table_fields = self.table.fields
+
+        if not self._projection:
+            return table_fields
+        else:
+            field_map = {field.name: field for field in self.table.fields}
+            return [field_map[name] for name in self._projection if name in 
field_map]
diff --git a/paimon-python/pypaimon/catalog/catalog_factory.py 
b/paimon-python/pypaimon/read/split.py
similarity index 55%
copy from paimon-python/pypaimon/catalog/catalog_factory.py
copy to paimon-python/pypaimon/read/split.py
index b5ec706ae5..f3bfdbab6b 100644
--- a/paimon-python/pypaimon/catalog/catalog_factory.py
+++ b/paimon-python/pypaimon/read/split.py
@@ -15,24 +15,33 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from pypaimon import Catalog
-from pypaimon.api import CatalogOptions
-from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
-from pypaimon.rest.rest_catalog import RESTCatalog
 
+from dataclasses import dataclass
+from typing import List
 
-class CatalogFactory:
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.table.row.binary_row import BinaryRow
 
-    CATALOG_REGISTRY = {
-        "filesystem": FileSystemCatalog,
-        "rest": RESTCatalog,
-    }
 
-    @staticmethod
-    def create(catalog_options: dict) -> Catalog:
-        identifier = catalog_options.get(CatalogOptions.METASTORE, 
"filesystem")
-        catalog_class = CatalogFactory.CATALOG_REGISTRY.get(identifier)
-        if catalog_class is None:
-            raise ValueError(f"Unknown catalog identifier: {identifier}. "
-                             f"Available types: 
{list(CatalogFactory.CATALOG_REGISTRY.keys())}")
-        return catalog_class(catalog_options)
+@dataclass
+class Split:
+    """Implementation of Split for native Python reading."""
+    files: List[DataFileMeta]
+    partition: BinaryRow
+    bucket: int
+    _file_paths: List[str]
+    _row_count: int
+    _file_size: int
+    raw_convertible: bool = False
+
+    @property
+    def row_count(self) -> int:
+        return self._row_count
+
+    @property
+    def file_size(self) -> int:
+        return self._file_size
+
+    @property
+    def file_paths(self) -> List[str]:
+        return self._file_paths
diff --git a/paimon-python/pypaimon/catalog/catalog_factory.py 
b/paimon-python/pypaimon/read/table_read.py
similarity index 55%
copy from paimon-python/pypaimon/catalog/catalog_factory.py
copy to paimon-python/pypaimon/read/table_read.py
index b5ec706ae5..7050b24d93 100644
--- a/paimon-python/pypaimon/catalog/catalog_factory.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -15,24 +15,6 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from pypaimon import Catalog
-from pypaimon.api import CatalogOptions
-from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
-from pypaimon.rest.rest_catalog import RESTCatalog
 
-
-class CatalogFactory:
-
-    CATALOG_REGISTRY = {
-        "filesystem": FileSystemCatalog,
-        "rest": RESTCatalog,
-    }
-
-    @staticmethod
-    def create(catalog_options: dict) -> Catalog:
-        identifier = catalog_options.get(CatalogOptions.METASTORE, 
"filesystem")
-        catalog_class = CatalogFactory.CATALOG_REGISTRY.get(identifier)
-        if catalog_class is None:
-            raise ValueError(f"Unknown catalog identifier: {identifier}. "
-                             f"Available types: 
{list(CatalogFactory.CATALOG_REGISTRY.keys())}")
-        return catalog_class(catalog_options)
+class TableRead:
+    """Implementation of TableRead for native Python reading."""
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
new file mode 100644
index 0000000000..fb56d70a82
--- /dev/null
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -0,0 +1,291 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from collections import defaultdict
+from typing import List, Optional, Callable
+
+from pypaimon.common.predicate import Predicate
+from pypaimon.manifest.manifest_file_manager import ManifestFileManager
+from pypaimon.manifest.manifest_list_manager import ManifestListManager
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.read.interval_partition import IntervalPartition, SortedRun
+from pypaimon.read.plan import Plan
+from pypaimon.read.split import Split
+from pypaimon.schema.data_types import DataField
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.table.file_store_table import FileStoreTable
+
+
+class TableScan:
+    """Implementation of TableScan for native Python reading."""
+
+    def __init__(self, table: FileStoreTable, predicate: Optional[Predicate], 
limit: Optional[int],
+                 read_type: List[DataField]):
+        self.table = table
+        self.predicate = predicate
+        self.predicate = predicate
+        self.limit = limit
+        self.read_type = read_type
+
+        self.snapshot_manager = SnapshotManager(table)
+        self.manifest_list_manager = ManifestListManager(table)
+        self.manifest_file_manager = ManifestFileManager(table)
+
+        self.partition_conditions = self._extract_partition_conditions()
+        self.target_split_size = 128 * 1024 * 1024
+        self.open_file_cost = 4 * 1024 * 1024
+
+    def plan(self) -> Plan:
+        latest_snapshot = self.snapshot_manager.get_latest_snapshot()
+        if not latest_snapshot:
+            return Plan([])
+        manifest_files = 
self.manifest_list_manager.read_all_manifest_files(latest_snapshot)
+
+        file_entries = []
+        for manifest_file_path in manifest_files:
+            manifest_entries = 
self.manifest_file_manager.read(manifest_file_path)
+            for entry in manifest_entries:
+                if entry.kind == 0:
+                    file_entries.append(entry)
+
+        if self.predicate:
+            file_entries = self._filter_by_predicate(file_entries)
+
+        partitioned_split = defaultdict(list)
+        for entry in file_entries:
+            partitioned_split[(tuple(entry.partition.values), 
entry.bucket)].append(entry)
+
+        splits = []
+        for key, values in partitioned_split.items():
+            if self.table.is_primary_key_table:
+                splits += self._create_primary_key_splits(values)
+            else:
+                splits += self._create_append_only_splits(values)
+
+        splits = self._apply_push_down_limit(splits)
+
+        return Plan(splits)
+
+    def _apply_push_down_limit(self, splits: List[Split]) -> List[Split]:
+        if self.limit is None:
+            return splits
+        scanned_row_count = 0
+        limited_splits = []
+
+        for split in splits:
+            if split.raw_convertible:
+                limited_splits.append(split)
+                scanned_row_count += split.row_count
+                if scanned_row_count >= self.limit:
+                    return limited_splits
+
+        return limited_splits
+
+    def _filter_by_predicate(self, file_entries: List[ManifestEntry]) -> 
List[ManifestEntry]:
+        if not self.predicate:
+            return file_entries
+
+        filtered_files = []
+        for file_entry in file_entries:
+            if self.partition_conditions and not 
self._filter_by_partition(file_entry):
+                continue
+            if not self._filter_by_stats(file_entry):
+                continue
+            filtered_files.append(file_entry)
+
+        return filtered_files
+
+    def _filter_by_partition(self, file_entry: ManifestEntry) -> bool:
+        # TODO: refactor with a better solution
+        partition_dict = file_entry.partition.to_dict()
+        for field_name, condition in self.partition_conditions.items():
+            partition_value = partition_dict[field_name]
+            if condition['op'] == '=':
+                if str(partition_value) != str(condition['value']):
+                    return False
+            elif condition['op'] == 'in':
+                if str(partition_value) not in [str(v) for v in 
condition['values']]:
+                    return False
+            elif condition['op'] == 'notIn':
+                if str(partition_value) in [str(v) for v in 
condition['values']]:
+                    return False
+            elif condition['op'] == '>':
+                if partition_value <= condition['values']:
+                    return False
+            elif condition['op'] == '>=':
+                if partition_value < condition['values']:
+                    return False
+            elif condition['op'] == '<':
+                if partition_value >= condition['values']:
+                    return False
+            elif condition['op'] == '<=':
+                if partition_value > condition['values']:
+                    return False
+        return True
+
+    def _filter_by_stats(self, file_entry: ManifestEntry) -> bool:
+        # TODO: real support for filtering by stat
+        return True
+
+    def _extract_partition_conditions(self) -> dict:
+        if not self.predicate or not self.table.partition_keys:
+            return {}
+
+        conditions = {}
+        self._extract_conditions_from_predicate(self.predicate, conditions, 
self.table.partition_keys)
+        return conditions
+
+    def _extract_conditions_from_predicate(self, predicate: 'Predicate', 
conditions: dict,
+                                           partition_keys: List[str]):
+        if predicate.method == 'and':
+            for sub_predicate in predicate.literals:
+                self._extract_conditions_from_predicate(sub_predicate, 
conditions, partition_keys)
+            return
+        elif predicate.method == 'or':
+            all_partition_conditions = True
+            for sub_predicate in predicate.literals:
+                if sub_predicate.field not in partition_keys:
+                    all_partition_conditions = False
+                    break
+            if all_partition_conditions:
+                for sub_predicate in predicate.literals:
+                    self._extract_conditions_from_predicate(sub_predicate, 
conditions, partition_keys)
+            return
+
+        if predicate.field in partition_keys:
+            if predicate.method == 'equal':
+                conditions[predicate.field] = {
+                    'op': '=',
+                    'value': predicate.literals[0] if predicate.literals else 
None
+                }
+            elif predicate.method == 'in':
+                conditions[predicate.field] = {
+                    'op': 'in',
+                    'values': predicate.literals if predicate.literals else []
+                }
+            elif predicate.method == 'notIn':
+                conditions[predicate.field] = {
+                    'op': 'notIn',
+                    'values': predicate.literals if predicate.literals else []
+                }
+            elif predicate.method == 'greaterThan':
+                conditions[predicate.field] = {
+                    'op': '>',
+                    'value': predicate.literals[0] if predicate.literals else 
None
+                }
+            elif predicate.method == 'greaterOrEqual':
+                conditions[predicate.field] = {
+                    'op': '>=',
+                    'value': predicate.literals[0] if predicate.literals else 
None
+                }
+            elif predicate.method == 'lessThan':
+                conditions[predicate.field] = {
+                    'op': '<',
+                    'value': predicate.literals[0] if predicate.literals else 
None
+                }
+            elif predicate.method == 'lessOrEqual':
+                conditions[predicate.field] = {
+                    'op': '<=',
+                    'value': predicate.literals[0] if predicate.literals else 
None
+                }
+
+    def _create_append_only_splits(self, file_entries: List[ManifestEntry]) -> 
List['Split']:
+        if not file_entries:
+            return []
+
+        data_files: List[DataFileMeta] = [e.file for e in file_entries]
+
+        def weight_func(f: DataFileMeta) -> int:
+            return max(f.file_size, self.open_file_cost)
+
+        packed_files: List[List[DataFileMeta]] = _pack_for_ordered(data_files, 
weight_func, self.target_split_size)
+        return self._build_split_from_pack(packed_files, file_entries, False)
+
+    def _create_primary_key_splits(self, file_entries: List[ManifestEntry]) -> 
List['Split']:
+        if not file_entries:
+            return []
+
+        data_files: List[DataFileMeta] = [e.file for e in file_entries]
+        partition_sort_runs: List[List[SortedRun]] = 
IntervalPartition(data_files).partition()
+        sections: List[List[DataFileMeta]] = [
+            [file for s in sl for file in s.files]
+            for sl in partition_sort_runs
+        ]
+
+        def weight_func(fl: List[DataFileMeta]) -> int:
+            return max(sum(f.file_size for f in fl), self.open_file_cost)
+
+        packed_files: List[List[List[DataFileMeta]]] = 
_pack_for_ordered(sections, weight_func, self.target_split_size)
+        flatten_packed_files: List[List[DataFileMeta]] = [
+            [file for sub_pack in pack for file in sub_pack]
+            for pack in packed_files
+        ]
+        return self._build_split_from_pack(flatten_packed_files, file_entries, 
True)
+
+    def _build_split_from_pack(self, packed_files, file_entries, 
for_primary_key_split: bool) -> List['Split']:
+        splits = []
+        for file_group in packed_files:
+            raw_convertible = True
+            if for_primary_key_split:
+                raw_convertible = len(file_group) == 1
+
+            file_paths = []
+            total_file_size = 0
+            total_record_count = 0
+
+            for data_file in file_group:
+                data_file.set_file_path(self.table.table_path, 
file_entries[0].partition,
+                                        file_entries[0].bucket)
+                file_paths.append(data_file.file_path)
+                total_file_size += data_file.file_size
+                total_record_count += data_file.row_count
+
+            if file_paths:
+                split = Split(
+                    files=file_group,
+                    partition=file_entries[0].partition,
+                    bucket=file_entries[0].bucket,
+                    _file_paths=file_paths,
+                    _row_count=total_record_count,
+                    _file_size=total_file_size,
+                    raw_convertible=raw_convertible
+                )
+                splits.append(split)
+        return splits
+
+
+def _pack_for_ordered(items: List, weight_func: Callable, target_weight: int) 
-> List[List]:
+    packed = []
+    bin_items = []
+    bin_weight = 0
+
+    for item in items:
+        weight = weight_func(item)
+        if bin_weight + weight > target_weight and len(bin_items) > 0:
+            packed.append(bin_items)
+            bin_items.clear()
+            bin_weight = 0
+
+        bin_weight += weight
+        bin_items.append(item)
+
+    if len(bin_items) > 0:
+        packed.append(bin_items)
+
+    return packed
diff --git a/paimon-python/pypaimon/table/catalog_environment.py 
b/paimon-python/pypaimon/table/catalog_environment.py
index 3150e2d78e..e694792ff4 100644
--- a/paimon-python/pypaimon/table/catalog_environment.py
+++ b/paimon-python/pypaimon/table/catalog_environment.py
@@ -17,7 +17,7 @@ limitations under the License.
 """
 from typing import Optional
 
-from pypaimon.api.identifier import Identifier
+from pypaimon.common.identifier import Identifier
 from pypaimon.catalog.catalog_loader import CatalogLoader
 
 
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index 9f230d3463..e1b8322fe8 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -20,7 +20,7 @@ from pathlib import Path
 
 from pypaimon import Table
 from pypaimon.api.core_options import CoreOptions
-from pypaimon.api.identifier import Identifier
+from pypaimon.common.identifier import Identifier
 from pypaimon.schema.table_schema import TableSchema
 from pypaimon.common.file_io import FileIO
 from pypaimon.schema.schema_manager import SchemaManager
diff --git a/paimon-python/pypaimon/tests/api_test.py 
b/paimon-python/pypaimon/tests/api_test.py
index b84bc405b5..d4b21afc99 100644
--- a/paimon-python/pypaimon/tests/api_test.py
+++ b/paimon-python/pypaimon/tests/api_test.py
@@ -24,7 +24,7 @@ from .rest_server import RESTCatalogServer
 from ..api.api_response import (ConfigResponse)
 from ..api import RESTApi
 from ..api.auth import BearTokenAuthProvider
-from ..api.identifier import Identifier
+from pypaimon.common.identifier import Identifier
 from pypaimon.common.rest_json import JSON
 from pypaimon.schema.table_schema import TableSchema
 from ..api.token_loader import DLFTokenLoaderFactory, DLFToken
diff --git a/paimon-python/pypaimon/tests/rest_catalog_test.py 
b/paimon-python/pypaimon/tests/rest_catalog_test.py
index be5989a171..6fedb916e9 100644
--- a/paimon-python/pypaimon/tests/rest_catalog_test.py
+++ b/paimon-python/pypaimon/tests/rest_catalog_test.py
@@ -25,7 +25,7 @@ from pypaimon.api.auth import BearTokenAuthProvider
 from pypaimon.api.options import Options
 from pypaimon.catalog.catalog_context import CatalogContext
 from pypaimon.catalog.table_metadata import TableMetadata
-from pypaimon.rest.rest_catalog import RESTCatalog
+from pypaimon.catalog.rest.rest_catalog import RESTCatalog
 from pypaimon.schema.data_types import DataField, ArrayType, AtomicType, 
MapType
 from pypaimon.schema.table_schema import TableSchema
 from pypaimon.tests.rest_server import RESTCatalogServer


Reply via email to