This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2ddff15d59 [Python] Support scan and plan for PyPaimon (#5996)
2ddff15d59 is described below
commit 2ddff15d594d35a77d0d8b6df6563afebb15d670
Author: ChengHui Chen <[email protected]>
AuthorDate: Thu Jul 31 15:24:58 2025 +0800
[Python] Support scan and plan for PyPaimon (#5996)
---
paimon-python/pypaimon/api/__init__.py | 2 +-
paimon-python/pypaimon/api/api_resquest.py | 2 +-
paimon-python/pypaimon/catalog/catalog_factory.py | 2 +-
paimon-python/pypaimon/catalog/catalog_utils.py | 2 +-
.../pypaimon/{ => catalog}/rest/__init__.py | 0
.../pypaimon/{ => catalog}/rest/rest_catalog.py | 4 +-
.../{ => catalog}/rest/rest_token_file_io.py | 2 +-
.../pypaimon/{api => common}/identifier.py | 0
paimon-python/pypaimon/common/predicate.py | 77 ++++++
paimon-python/pypaimon/common/predicate_builder.py | 120 +++++++++
.../catalog_factory.py => read/__init__.py} | 21 --
paimon-python/pypaimon/read/interval_partition.py | 130 +++++++++
.../catalog_factory.py => read/partition_info.py} | 50 ++--
.../{catalog/catalog_factory.py => read/plan.py} | 27 +-
paimon-python/pypaimon/read/read_builder.py | 75 ++++++
.../{catalog/catalog_factory.py => read/split.py} | 43 +--
.../catalog_factory.py => read/table_read.py} | 22 +-
paimon-python/pypaimon/read/table_scan.py | 291 +++++++++++++++++++++
.../pypaimon/table/catalog_environment.py | 2 +-
paimon-python/pypaimon/table/file_store_table.py | 2 +-
paimon-python/pypaimon/tests/api_test.py | 2 +-
paimon-python/pypaimon/tests/rest_catalog_test.py | 2 +-
22 files changed, 771 insertions(+), 107 deletions(-)
diff --git a/paimon-python/pypaimon/api/__init__.py
b/paimon-python/pypaimon/api/__init__.py
index 8e05df6c01..509c4d91a2 100644
--- a/paimon-python/pypaimon/api/__init__.py
+++ b/paimon-python/pypaimon/api/__init__.py
@@ -33,7 +33,7 @@ from pypaimon.api.api_resquest import CreateDatabaseRequest,
AlterDatabaseReques
CreateTableRequest
from pypaimon.api.config import CatalogOptions
from pypaimon.api.client import HttpClient
-from pypaimon.api.identifier import Identifier
+from pypaimon.common.identifier import Identifier
from pypaimon.api.typedef import T
from pypaimon.schema.schema import Schema
diff --git a/paimon-python/pypaimon/api/api_resquest.py
b/paimon-python/pypaimon/api/api_resquest.py
index dfc517ae89..8c1628120e 100644
--- a/paimon-python/pypaimon/api/api_resquest.py
+++ b/paimon-python/pypaimon/api/api_resquest.py
@@ -20,7 +20,7 @@ from abc import ABC
from dataclasses import dataclass
from typing import Dict, List
-from .identifier import Identifier
+from pypaimon.common.identifier import Identifier
from pypaimon.common.rest_json import json_field
from ..schema.schema import Schema
diff --git a/paimon-python/pypaimon/catalog/catalog_factory.py
b/paimon-python/pypaimon/catalog/catalog_factory.py
index b5ec706ae5..8ae17836d4 100644
--- a/paimon-python/pypaimon/catalog/catalog_factory.py
+++ b/paimon-python/pypaimon/catalog/catalog_factory.py
@@ -18,7 +18,7 @@
from pypaimon import Catalog
from pypaimon.api import CatalogOptions
from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
-from pypaimon.rest.rest_catalog import RESTCatalog
+from pypaimon.catalog.rest.rest_catalog import RESTCatalog
class CatalogFactory:
diff --git a/paimon-python/pypaimon/catalog/catalog_utils.py
b/paimon-python/pypaimon/catalog/catalog_utils.py
index d9877ce53a..2bae27aa4e 100644
--- a/paimon-python/pypaimon/catalog/catalog_utils.py
+++ b/paimon-python/pypaimon/catalog/catalog_utils.py
@@ -19,7 +19,7 @@ from pathlib import Path
from typing import Callable, Any
from pypaimon.api.core_options import CoreOptions
-from pypaimon.api.identifier import Identifier
+from pypaimon.common.identifier import Identifier
from pypaimon.catalog.table_metadata import TableMetadata
from pypaimon.table.catalog_environment import CatalogEnvironment
diff --git a/paimon-python/pypaimon/rest/__init__.py
b/paimon-python/pypaimon/catalog/rest/__init__.py
similarity index 100%
rename from paimon-python/pypaimon/rest/__init__.py
rename to paimon-python/pypaimon/catalog/rest/__init__.py
diff --git a/paimon-python/pypaimon/rest/rest_catalog.py
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
similarity index 97%
rename from paimon-python/pypaimon/rest/rest_catalog.py
rename to paimon-python/pypaimon/catalog/rest/rest_catalog.py
index 32b5c5c7ad..ab08685840 100644
--- a/paimon-python/pypaimon/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -22,7 +22,7 @@ from pypaimon import Database, Catalog, Schema
from pypaimon.api import RESTApi, CatalogOptions
from pypaimon.api.api_response import PagedList, GetTableResponse
from pypaimon.api.core_options import CoreOptions
-from pypaimon.api.identifier import Identifier
+from pypaimon.common.identifier import Identifier
from pypaimon.api.options import Options
@@ -30,7 +30,7 @@ from pypaimon.catalog.catalog_context import CatalogContext
from pypaimon.catalog.catalog_utils import CatalogUtils
from pypaimon.catalog.property_change import PropertyChange
from pypaimon.catalog.table_metadata import TableMetadata
-from pypaimon.rest.rest_token_file_io import RESTTokenFileIO
+from pypaimon.catalog.rest.rest_token_file_io import RESTTokenFileIO
from pypaimon.table.file_store_table import FileStoreTable
diff --git a/paimon-python/pypaimon/rest/rest_token_file_io.py
b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
similarity index 96%
rename from paimon-python/pypaimon/rest/rest_token_file_io.py
rename to paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
index 5b179b95f0..283225d507 100644
--- a/paimon-python/pypaimon/rest/rest_token_file_io.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
@@ -18,7 +18,7 @@ limitations under the License.
from pathlib import Path
from typing import Optional
-from pypaimon.api.identifier import Identifier
+from pypaimon.common.identifier import Identifier
from pypaimon.common.file_io import FileIO
diff --git a/paimon-python/pypaimon/api/identifier.py
b/paimon-python/pypaimon/common/identifier.py
similarity index 100%
rename from paimon-python/pypaimon/api/identifier.py
rename to paimon-python/pypaimon/common/identifier.py
diff --git a/paimon-python/pypaimon/common/predicate.py
b/paimon-python/pypaimon/common/predicate.py
new file mode 100644
index 0000000000..82d792290a
--- /dev/null
+++ b/paimon-python/pypaimon/common/predicate.py
@@ -0,0 +1,77 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from dataclasses import dataclass
+from typing import Any, List, Optional
+
+from pypaimon.table.row.internal_row import InternalRow
+
+
+@dataclass
+class Predicate:
+ method: str
+ index: Optional[int]
+ field: str | None
+ literals: Optional[List[Any]] = None
+
+ def test(self, record: InternalRow) -> bool:
+ if self.method == 'equal':
+ return record.get_field(self.index) == self.literals[0]
+ elif self.method == 'notEqual':
+ return record.get_field(self.index) != self.literals[0]
+ elif self.method == 'lessThan':
+ return record.get_field(self.index) < self.literals[0]
+ elif self.method == 'lessOrEqual':
+ return record.get_field(self.index) <= self.literals[0]
+ elif self.method == 'greaterThan':
+ return record.get_field(self.index) > self.literals[0]
+ elif self.method == 'greaterOrEqual':
+ return record.get_field(self.index) >= self.literals[0]
+ elif self.method == 'isNull':
+ return record.get_field(self.index) is None
+ elif self.method == 'isNotNull':
+ return record.get_field(self.index) is not None
+ elif self.method == 'startsWith':
+ field_value = record.get_field(self.index)
+ if not isinstance(field_value, str):
+ return False
+ return field_value.startswith(self.literals[0])
+ elif self.method == 'endsWith':
+ field_value = record.get_field(self.index)
+ if not isinstance(field_value, str):
+ return False
+ return field_value.endswith(self.literals[0])
+ elif self.method == 'contains':
+ field_value = record.get_field(self.index)
+ if not isinstance(field_value, str):
+ return False
+ return self.literals[0] in field_value
+ elif self.method == 'in':
+ return record.get_field(self.index) in self.literals
+ elif self.method == 'notIn':
+ return record.get_field(self.index) not in self.literals
+ elif self.method == 'between':
+ field_value = record.get_field(self.index)
+ return self.literals[0] <= field_value <= self.literals[1]
+ elif self.method == 'and':
+ return all(p.test(record) for p in self.literals)
+ elif self.method == 'or':
+ t = any(p.test(record) for p in self.literals)
+ return t
+ else:
+ raise ValueError(f"Unsupported predicate method: {self.method}")
diff --git a/paimon-python/pypaimon/common/predicate_builder.py
b/paimon-python/pypaimon/common/predicate_builder.py
new file mode 100644
index 0000000000..14ba39f801
--- /dev/null
+++ b/paimon-python/pypaimon/common/predicate_builder.py
@@ -0,0 +1,120 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import List, Any, Optional
+
+from pypaimon.common.predicate import Predicate
+from pypaimon.schema.data_types import DataField
+
+
+class PredicateBuilder:
+ """Implementation of PredicateBuilder using Predicate."""
+
+ def __init__(self, row_field: List[DataField]):
+ self.field_names = [field.name for field in row_field]
+
+ def _get_field_index(self, field: str) -> int:
+ """Get the index of a field in the schema."""
+ try:
+ return self.field_names.index(field)
+ except ValueError:
+ raise ValueError(f'The field {field} is not in field list
{self.field_names}.')
+
+ def _build_predicate(self, method: str, field: str, literals:
Optional[List[Any]] = None) -> Predicate:
+ """Build a predicate with the given method, field, and literals."""
+ index = self._get_field_index(field)
+ return Predicate(
+ method=method,
+ index=index,
+ field=field,
+ literals=literals
+ )
+
+ def equal(self, field: str, literal: Any) -> Predicate:
+ """Create an equality predicate."""
+ return self._build_predicate('equal', field, [literal])
+
+ def not_equal(self, field: str, literal: Any) -> Predicate:
+ """Create a not-equal predicate."""
+ return self._build_predicate('notEqual', field, [literal])
+
+ def less_than(self, field: str, literal: Any) -> Predicate:
+ """Create a less-than predicate."""
+ return self._build_predicate('lessThan', field, [literal])
+
+ def less_or_equal(self, field: str, literal: Any) -> Predicate:
+ """Create a less-or-equal predicate."""
+ return self._build_predicate('lessOrEqual', field, [literal])
+
+ def greater_than(self, field: str, literal: Any) -> Predicate:
+ """Create a greater-than predicate."""
+ return self._build_predicate('greaterThan', field, [literal])
+
+ def greater_or_equal(self, field: str, literal: Any) -> Predicate:
+ """Create a greater-or-equal predicate."""
+ return self._build_predicate('greaterOrEqual', field, [literal])
+
+ def is_null(self, field: str) -> Predicate:
+ """Create an is-null predicate."""
+ return self._build_predicate('isNull', field)
+
+ def is_not_null(self, field: str) -> Predicate:
+ """Create an is-not-null predicate."""
+ return self._build_predicate('isNotNull', field)
+
+ def startswith(self, field: str, pattern_literal: Any) -> Predicate:
+ """Create a starts-with predicate."""
+ return self._build_predicate('startsWith', field, [pattern_literal])
+
+ def endswith(self, field: str, pattern_literal: Any) -> Predicate:
+ """Create an ends-with predicate."""
+ return self._build_predicate('endsWith', field, [pattern_literal])
+
+ def contains(self, field: str, pattern_literal: Any) -> Predicate:
+ """Create a contains predicate."""
+ return self._build_predicate('contains', field, [pattern_literal])
+
+ def is_in(self, field: str, literals: List[Any]) -> Predicate:
+ """Create an in predicate."""
+ return self._build_predicate('in', field, literals)
+
+ def is_not_in(self, field: str, literals: List[Any]) -> Predicate:
+ """Create a not-in predicate."""
+ return self._build_predicate('notIn', field, literals)
+
+ def between(self, field: str, included_lower_bound: Any,
included_upper_bound: Any) -> Predicate:
+ """Create a between predicate."""
+ return self._build_predicate('between', field, [included_lower_bound,
included_upper_bound])
+
+ def and_predicates(self, predicates: List[Predicate]) -> Predicate:
+ """Create an AND predicate from multiple predicates."""
+ return Predicate(
+ method='and',
+ index=None,
+ field=None,
+ literals=predicates
+ )
+
+ def or_predicates(self, predicates: List[Predicate]) -> Predicate:
+ """Create an OR predicate from multiple predicates."""
+ return Predicate(
+ method='or',
+ index=None,
+ field=None,
+ literals=predicates
+ )
diff --git a/paimon-python/pypaimon/catalog/catalog_factory.py
b/paimon-python/pypaimon/read/__init__.py
similarity index 54%
copy from paimon-python/pypaimon/catalog/catalog_factory.py
copy to paimon-python/pypaimon/read/__init__.py
index b5ec706ae5..65b48d4d79 100644
--- a/paimon-python/pypaimon/catalog/catalog_factory.py
+++ b/paimon-python/pypaimon/read/__init__.py
@@ -15,24 +15,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from pypaimon import Catalog
-from pypaimon.api import CatalogOptions
-from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
-from pypaimon.rest.rest_catalog import RESTCatalog
-
-
-class CatalogFactory:
-
- CATALOG_REGISTRY = {
- "filesystem": FileSystemCatalog,
- "rest": RESTCatalog,
- }
-
- @staticmethod
- def create(catalog_options: dict) -> Catalog:
- identifier = catalog_options.get(CatalogOptions.METASTORE,
"filesystem")
- catalog_class = CatalogFactory.CATALOG_REGISTRY.get(identifier)
- if catalog_class is None:
- raise ValueError(f"Unknown catalog identifier: {identifier}. "
- f"Available types:
{list(CatalogFactory.CATALOG_REGISTRY.keys())}")
- return catalog_class(catalog_options)
diff --git a/paimon-python/pypaimon/read/interval_partition.py
b/paimon-python/pypaimon/read/interval_partition.py
new file mode 100644
index 0000000000..7261383196
--- /dev/null
+++ b/paimon-python/pypaimon/read/interval_partition.py
@@ -0,0 +1,130 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import heapq
+from dataclasses import dataclass
+from functools import cmp_to_key
+from typing import List, Callable
+
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.table.row.binary_row import BinaryRow
+
+
+@dataclass
+class SortedRun:
+ """
+ A SortedRun is a list of files sorted by their keys.
+ The key intervals [minKey, maxKey] of these files do not overlap.
+ """
+ files: List[DataFileMeta]
+
+
+class IntervalPartition:
+ """
+ Algorithm to partition several data files into the minimum number of
SortedRuns.
+ """
+
+ def __init__(self, input_files: List[DataFileMeta]):
+ self.files = input_files.copy()
+ self.key_comparator = default_key_comparator
+ self.files.sort(key=cmp_to_key(self._compare_files))
+
+ def partition(self) -> List[List[SortedRun]]:
+ result = []
+ section: List[DataFileMeta] = []
+ bound = None
+
+ for meta in self.files:
+ if section and self.key_comparator(meta.min_key, bound) > 0:
+ result.append(self._partition_section(section))
+ section.clear()
+ bound = None
+ section.append(meta)
+ if bound is None or self.key_comparator(meta.max_key, bound) > 0:
+ bound = meta.max_key
+
+ if section:
+ result.append(self._partition_section(section))
+ return result
+
+ def _partition_section(self, metas: List[DataFileMeta]) -> List[SortedRun]:
+ heap: List[HeapRun] = []
+ first_run = [metas[0]]
+ heapq.heappush(heap, HeapRun(first_run, self.key_comparator))
+ for i in range(1, len(metas)):
+ meta = metas[i]
+
+ earliest_finishing_run = heap[0]
+ last_max_key = earliest_finishing_run.run[-1].max_key
+ if self.key_comparator(meta.min_key, last_max_key) > 0:
+ top = heapq.heappop(heap)
+ top.run.append(meta)
+ heapq.heappush(heap, top)
+ else:
+ new_run = [meta]
+ heapq.heappush(heap, HeapRun(new_run, self.key_comparator))
+
+ return [SortedRun(files=h.run) for h in heap]
+
+ def _compare_files(self, f1: DataFileMeta, f2: DataFileMeta) -> int:
+ min_key_cmp = self.key_comparator(f1.min_key, f2.min_key)
+ if min_key_cmp != 0:
+ return min_key_cmp
+ return self.key_comparator(f1.max_key, f2.max_key)
+
+
+@dataclass
+class HeapRun:
+ run: List[DataFileMeta]
+ comparator: Callable[[BinaryRow, BinaryRow], int]
+
+ def __lt__(self, other) -> bool:
+ my_last_max = self.run[-1].max_key
+ other_last_max = other.run[-1].max_key
+ return self.comparator(my_last_max, other_last_max) < 0
+
+
+def default_key_comparator(key1: BinaryRow, key2: BinaryRow) -> int:
+ if not key1 or not key1.values:
+ if not key2 or not key2.values:
+ return 0
+ return -1
+ if not key2 or not key2.values:
+ return 1
+
+ min_field_count = min(len(key1.values), len(key2.values))
+ for i in range(min_field_count):
+ val1 = key1.values[i]
+ val2 = key2.values[i]
+ if val1 is None and val2 is None:
+ continue
+ if val1 is None:
+ return -1
+ if val2 is None:
+ return 1
+ if val1 < val2:
+ return -1
+ elif val1 > val2:
+ return 1
+
+ if len(key1.values) < len(key2.values):
+ return -1
+ elif len(key1.values) > len(key2.values):
+ return 1
+ else:
+ return 0
diff --git a/paimon-python/pypaimon/catalog/catalog_factory.py
b/paimon-python/pypaimon/read/partition_info.py
similarity index 52%
copy from paimon-python/pypaimon/catalog/catalog_factory.py
copy to paimon-python/pypaimon/read/partition_info.py
index b5ec706ae5..e07b37a619 100644
--- a/paimon-python/pypaimon/catalog/catalog_factory.py
+++ b/paimon-python/pypaimon/read/partition_info.py
@@ -15,24 +15,32 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from pypaimon import Catalog
-from pypaimon.api import CatalogOptions
-from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
-from pypaimon.rest.rest_catalog import RESTCatalog
-
-
-class CatalogFactory:
-
- CATALOG_REGISTRY = {
- "filesystem": FileSystemCatalog,
- "rest": RESTCatalog,
- }
-
- @staticmethod
- def create(catalog_options: dict) -> Catalog:
- identifier = catalog_options.get(CatalogOptions.METASTORE,
"filesystem")
- catalog_class = CatalogFactory.CATALOG_REGISTRY.get(identifier)
- if catalog_class is None:
- raise ValueError(f"Unknown catalog identifier: {identifier}. "
- f"Available types:
{list(CatalogFactory.CATALOG_REGISTRY.keys())}")
- return catalog_class(catalog_options)
+
+from typing import Any, List
+
+from pypaimon.schema.data_types import DataField
+from pypaimon.table.row.binary_row import BinaryRow
+
+
+class PartitionInfo:
+ """
+ Partition information about how the row mapping of outer row.
+ """
+
+ def __init__(self, mapping: List[int], partition: BinaryRow):
+ self.mapping = mapping
+ self.partition_values = partition.values
+ self.partition_fields = partition.fields
+
+ def size(self) -> int:
+ return len(self.mapping) - 1
+
+ def is_partition_row(self, pos: int) -> bool:
+ return self.mapping[pos] < 0
+
+ def get_real_index(self, pos: int) -> int:
+ return abs(self.mapping[pos]) - 1
+
+ def get_partition_value(self, pos: int) -> (Any, DataField):
+ real_index = self.get_real_index(pos)
+ return self.partition_values[real_index],
self.partition_fields[real_index]
diff --git a/paimon-python/pypaimon/catalog/catalog_factory.py
b/paimon-python/pypaimon/read/plan.py
similarity index 55%
copy from paimon-python/pypaimon/catalog/catalog_factory.py
copy to paimon-python/pypaimon/read/plan.py
index b5ec706ae5..c3aeaa8a54 100644
--- a/paimon-python/pypaimon/catalog/catalog_factory.py
+++ b/paimon-python/pypaimon/read/plan.py
@@ -15,24 +15,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from pypaimon import Catalog
-from pypaimon.api import CatalogOptions
-from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
-from pypaimon.rest.rest_catalog import RESTCatalog
+from dataclasses import dataclass
+from typing import List
-class CatalogFactory:
+from pypaimon.read.split import Split
- CATALOG_REGISTRY = {
- "filesystem": FileSystemCatalog,
- "rest": RESTCatalog,
- }
- @staticmethod
- def create(catalog_options: dict) -> Catalog:
- identifier = catalog_options.get(CatalogOptions.METASTORE,
"filesystem")
- catalog_class = CatalogFactory.CATALOG_REGISTRY.get(identifier)
- if catalog_class is None:
- raise ValueError(f"Unknown catalog identifier: {identifier}. "
- f"Available types:
{list(CatalogFactory.CATALOG_REGISTRY.keys())}")
- return catalog_class(catalog_options)
+@dataclass
+class Plan:
+ """Implementation of Plan for native Python reading."""
+ _splits: List[Split]
+
+ def splits(self) -> List[Split]:
+ return self._splits
diff --git a/paimon-python/pypaimon/read/read_builder.py
b/paimon-python/pypaimon/read/read_builder.py
new file mode 100644
index 0000000000..604eff9ff1
--- /dev/null
+++ b/paimon-python/pypaimon/read/read_builder.py
@@ -0,0 +1,75 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import List, Optional
+
+from pypaimon.common.predicate import Predicate
+from pypaimon.common.predicate_builder import PredicateBuilder
+from pypaimon.read.table_read import TableRead
+from pypaimon.read.table_scan import TableScan
+from pypaimon.schema.data_types import DataField
+from pypaimon.table.file_store_table import FileStoreTable
+
+
+class ReadBuilder:
+ """Implementation of ReadBuilder for native Python reading."""
+
+ def __init__(self, table: FileStoreTable):
+ self.table = table
+ self._predicate: Optional[Predicate] = None
+ self._projection: Optional[List[str]] = None
+ self._limit: Optional[int] = None
+
+ def with_filter(self, predicate: Predicate) -> 'ReadBuilder':
+ self._predicate = predicate
+ return self
+
+ def with_projection(self, projection: List[str]) -> 'ReadBuilder':
+ self._projection = projection
+ return self
+
+ def with_limit(self, limit: int) -> 'ReadBuilder':
+ self._limit = limit
+ return self
+
+ def new_scan(self) -> TableScan:
+ return TableScan(
+ table=self.table,
+ predicate=self._predicate,
+ limit=self._limit,
+ read_type=self.read_type()
+ )
+
+ def new_read(self) -> TableRead:
+ return TableRead(
+ table=self.table,
+ predicate=self._predicate,
+ read_type=self.read_type()
+ )
+
+ def new_predicate_builder(self) -> PredicateBuilder:
+ return PredicateBuilder(self.read_type())
+
+ def read_type(self) -> List[DataField]:
+ table_fields = self.table.fields
+
+ if not self._projection:
+ return table_fields
+ else:
+ field_map = {field.name: field for field in self.table.fields}
+ return [field_map[name] for name in self._projection if name in
field_map]
diff --git a/paimon-python/pypaimon/catalog/catalog_factory.py
b/paimon-python/pypaimon/read/split.py
similarity index 55%
copy from paimon-python/pypaimon/catalog/catalog_factory.py
copy to paimon-python/pypaimon/read/split.py
index b5ec706ae5..f3bfdbab6b 100644
--- a/paimon-python/pypaimon/catalog/catalog_factory.py
+++ b/paimon-python/pypaimon/read/split.py
@@ -15,24 +15,33 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from pypaimon import Catalog
-from pypaimon.api import CatalogOptions
-from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
-from pypaimon.rest.rest_catalog import RESTCatalog
+from dataclasses import dataclass
+from typing import List
-class CatalogFactory:
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.table.row.binary_row import BinaryRow
- CATALOG_REGISTRY = {
- "filesystem": FileSystemCatalog,
- "rest": RESTCatalog,
- }
- @staticmethod
- def create(catalog_options: dict) -> Catalog:
- identifier = catalog_options.get(CatalogOptions.METASTORE,
"filesystem")
- catalog_class = CatalogFactory.CATALOG_REGISTRY.get(identifier)
- if catalog_class is None:
- raise ValueError(f"Unknown catalog identifier: {identifier}. "
- f"Available types:
{list(CatalogFactory.CATALOG_REGISTRY.keys())}")
- return catalog_class(catalog_options)
+@dataclass
+class Split:
+ """Implementation of Split for native Python reading."""
+ files: List[DataFileMeta]
+ partition: BinaryRow
+ bucket: int
+ _file_paths: List[str]
+ _row_count: int
+ _file_size: int
+ raw_convertible: bool = False
+
+ @property
+ def row_count(self) -> int:
+ return self._row_count
+
+ @property
+ def file_size(self) -> int:
+ return self._file_size
+
+ @property
+ def file_paths(self) -> List[str]:
+ return self._file_paths
diff --git a/paimon-python/pypaimon/catalog/catalog_factory.py
b/paimon-python/pypaimon/read/table_read.py
similarity index 55%
copy from paimon-python/pypaimon/catalog/catalog_factory.py
copy to paimon-python/pypaimon/read/table_read.py
index b5ec706ae5..7050b24d93 100644
--- a/paimon-python/pypaimon/catalog/catalog_factory.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -15,24 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from pypaimon import Catalog
-from pypaimon.api import CatalogOptions
-from pypaimon.catalog.filesystem_catalog import FileSystemCatalog
-from pypaimon.rest.rest_catalog import RESTCatalog
-
-class CatalogFactory:
-
- CATALOG_REGISTRY = {
- "filesystem": FileSystemCatalog,
- "rest": RESTCatalog,
- }
-
- @staticmethod
- def create(catalog_options: dict) -> Catalog:
- identifier = catalog_options.get(CatalogOptions.METASTORE,
"filesystem")
- catalog_class = CatalogFactory.CATALOG_REGISTRY.get(identifier)
- if catalog_class is None:
- raise ValueError(f"Unknown catalog identifier: {identifier}. "
- f"Available types:
{list(CatalogFactory.CATALOG_REGISTRY.keys())}")
- return catalog_class(catalog_options)
+class TableRead:
+ """Implementation of TableRead for native Python reading."""
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
new file mode 100644
index 0000000000..fb56d70a82
--- /dev/null
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -0,0 +1,291 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from collections import defaultdict
+from typing import List, Optional, Callable
+
+from pypaimon.common.predicate import Predicate
+from pypaimon.manifest.manifest_file_manager import ManifestFileManager
+from pypaimon.manifest.manifest_list_manager import ManifestListManager
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.read.interval_partition import IntervalPartition, SortedRun
+from pypaimon.read.plan import Plan
+from pypaimon.read.split import Split
+from pypaimon.schema.data_types import DataField
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.table.file_store_table import FileStoreTable
+
+
+class TableScan:
+ """Implementation of TableScan for native Python reading."""
+
+ def __init__(self, table: FileStoreTable, predicate: Optional[Predicate],
limit: Optional[int],
+ read_type: List[DataField]):
+ self.table = table
+ self.predicate = predicate
+ self.predicate = predicate
+ self.limit = limit
+ self.read_type = read_type
+
+ self.snapshot_manager = SnapshotManager(table)
+ self.manifest_list_manager = ManifestListManager(table)
+ self.manifest_file_manager = ManifestFileManager(table)
+
+ self.partition_conditions = self._extract_partition_conditions()
+ self.target_split_size = 128 * 1024 * 1024
+ self.open_file_cost = 4 * 1024 * 1024
+
+ def plan(self) -> Plan:
+ latest_snapshot = self.snapshot_manager.get_latest_snapshot()
+ if not latest_snapshot:
+ return Plan([])
+ manifest_files =
self.manifest_list_manager.read_all_manifest_files(latest_snapshot)
+
+ file_entries = []
+ for manifest_file_path in manifest_files:
+ manifest_entries =
self.manifest_file_manager.read(manifest_file_path)
+ for entry in manifest_entries:
+ if entry.kind == 0:
+ file_entries.append(entry)
+
+ if self.predicate:
+ file_entries = self._filter_by_predicate(file_entries)
+
+ partitioned_split = defaultdict(list)
+ for entry in file_entries:
+ partitioned_split[(tuple(entry.partition.values),
entry.bucket)].append(entry)
+
+ splits = []
+ for key, values in partitioned_split.items():
+ if self.table.is_primary_key_table:
+ splits += self._create_primary_key_splits(values)
+ else:
+ splits += self._create_append_only_splits(values)
+
+ splits = self._apply_push_down_limit(splits)
+
+ return Plan(splits)
+
+ def _apply_push_down_limit(self, splits: List[Split]) -> List[Split]:
+ if self.limit is None:
+ return splits
+ scanned_row_count = 0
+ limited_splits = []
+
+ for split in splits:
+ if split.raw_convertible:
+ limited_splits.append(split)
+ scanned_row_count += split.row_count
+ if scanned_row_count >= self.limit:
+ return limited_splits
+
+ return limited_splits
+
+ def _filter_by_predicate(self, file_entries: List[ManifestEntry]) ->
List[ManifestEntry]:
+ if not self.predicate:
+ return file_entries
+
+ filtered_files = []
+ for file_entry in file_entries:
+ if self.partition_conditions and not
self._filter_by_partition(file_entry):
+ continue
+ if not self._filter_by_stats(file_entry):
+ continue
+ filtered_files.append(file_entry)
+
+ return filtered_files
+
+ def _filter_by_partition(self, file_entry: ManifestEntry) -> bool:
+ # TODO: refactor with a better solution
+ partition_dict = file_entry.partition.to_dict()
+ for field_name, condition in self.partition_conditions.items():
+ partition_value = partition_dict[field_name]
+ if condition['op'] == '=':
+ if str(partition_value) != str(condition['value']):
+ return False
+ elif condition['op'] == 'in':
+ if str(partition_value) not in [str(v) for v in
condition['values']]:
+ return False
+ elif condition['op'] == 'notIn':
+ if str(partition_value) in [str(v) for v in
condition['values']]:
+ return False
+ elif condition['op'] == '>':
+ if partition_value <= condition['values']:
+ return False
+ elif condition['op'] == '>=':
+ if partition_value < condition['values']:
+ return False
+ elif condition['op'] == '<':
+ if partition_value >= condition['values']:
+ return False
+ elif condition['op'] == '<=':
+ if partition_value > condition['values']:
+ return False
+ return True
+
+ def _filter_by_stats(self, file_entry: ManifestEntry) -> bool:
+ # TODO: real support for filtering by stat
+ return True
+
+ def _extract_partition_conditions(self) -> dict:
+ if not self.predicate or not self.table.partition_keys:
+ return {}
+
+ conditions = {}
+ self._extract_conditions_from_predicate(self.predicate, conditions,
self.table.partition_keys)
+ return conditions
+
+ def _extract_conditions_from_predicate(self, predicate: 'Predicate',
conditions: dict,
+ partition_keys: List[str]):
+ if predicate.method == 'and':
+ for sub_predicate in predicate.literals:
+ self._extract_conditions_from_predicate(sub_predicate,
conditions, partition_keys)
+ return
+ elif predicate.method == 'or':
+ all_partition_conditions = True
+ for sub_predicate in predicate.literals:
+ if sub_predicate.field not in partition_keys:
+ all_partition_conditions = False
+ break
+ if all_partition_conditions:
+ for sub_predicate in predicate.literals:
+ self._extract_conditions_from_predicate(sub_predicate,
conditions, partition_keys)
+ return
+
+ if predicate.field in partition_keys:
+ if predicate.method == 'equal':
+ conditions[predicate.field] = {
+ 'op': '=',
+ 'value': predicate.literals[0] if predicate.literals else
None
+ }
+ elif predicate.method == 'in':
+ conditions[predicate.field] = {
+ 'op': 'in',
+ 'values': predicate.literals if predicate.literals else []
+ }
+ elif predicate.method == 'notIn':
+ conditions[predicate.field] = {
+ 'op': 'notIn',
+ 'values': predicate.literals if predicate.literals else []
+ }
+ elif predicate.method == 'greaterThan':
+ conditions[predicate.field] = {
+ 'op': '>',
+ 'value': predicate.literals[0] if predicate.literals else
None
+ }
+ elif predicate.method == 'greaterOrEqual':
+ conditions[predicate.field] = {
+ 'op': '>=',
+ 'value': predicate.literals[0] if predicate.literals else
None
+ }
+ elif predicate.method == 'lessThan':
+ conditions[predicate.field] = {
+ 'op': '<',
+ 'value': predicate.literals[0] if predicate.literals else
None
+ }
+ elif predicate.method == 'lessOrEqual':
+ conditions[predicate.field] = {
+ 'op': '<=',
+ 'value': predicate.literals[0] if predicate.literals else
None
+ }
+
+ def _create_append_only_splits(self, file_entries: List[ManifestEntry]) ->
List['Split']:
+ if not file_entries:
+ return []
+
+ data_files: List[DataFileMeta] = [e.file for e in file_entries]
+
+ def weight_func(f: DataFileMeta) -> int:
+ return max(f.file_size, self.open_file_cost)
+
+ packed_files: List[List[DataFileMeta]] = _pack_for_ordered(data_files,
weight_func, self.target_split_size)
+ return self._build_split_from_pack(packed_files, file_entries, False)
+
+ def _create_primary_key_splits(self, file_entries: List[ManifestEntry]) ->
List['Split']:
+ if not file_entries:
+ return []
+
+ data_files: List[DataFileMeta] = [e.file for e in file_entries]
+ partition_sort_runs: List[List[SortedRun]] =
IntervalPartition(data_files).partition()
+ sections: List[List[DataFileMeta]] = [
+ [file for s in sl for file in s.files]
+ for sl in partition_sort_runs
+ ]
+
+ def weight_func(fl: List[DataFileMeta]) -> int:
+ return max(sum(f.file_size for f in fl), self.open_file_cost)
+
+ packed_files: List[List[List[DataFileMeta]]] =
_pack_for_ordered(sections, weight_func, self.target_split_size)
+ flatten_packed_files: List[List[DataFileMeta]] = [
+ [file for sub_pack in pack for file in sub_pack]
+ for pack in packed_files
+ ]
+ return self._build_split_from_pack(flatten_packed_files, file_entries,
True)
+
+ def _build_split_from_pack(self, packed_files, file_entries,
for_primary_key_split: bool) -> List['Split']:
+ splits = []
+ for file_group in packed_files:
+ raw_convertible = True
+ if for_primary_key_split:
+ raw_convertible = len(file_group) == 1
+
+ file_paths = []
+ total_file_size = 0
+ total_record_count = 0
+
+ for data_file in file_group:
+ data_file.set_file_path(self.table.table_path,
file_entries[0].partition,
+ file_entries[0].bucket)
+ file_paths.append(data_file.file_path)
+ total_file_size += data_file.file_size
+ total_record_count += data_file.row_count
+
+ if file_paths:
+ split = Split(
+ files=file_group,
+ partition=file_entries[0].partition,
+ bucket=file_entries[0].bucket,
+ _file_paths=file_paths,
+ _row_count=total_record_count,
+ _file_size=total_file_size,
+ raw_convertible=raw_convertible
+ )
+ splits.append(split)
+ return splits
+
+
+def _pack_for_ordered(items: List, weight_func: Callable, target_weight: int)
-> List[List]:
+ packed = []
+ bin_items = []
+ bin_weight = 0
+
+ for item in items:
+ weight = weight_func(item)
+ if bin_weight + weight > target_weight and len(bin_items) > 0:
+ packed.append(bin_items)
+ bin_items.clear()
+ bin_weight = 0
+
+ bin_weight += weight
+ bin_items.append(item)
+
+ if len(bin_items) > 0:
+ packed.append(bin_items)
+
+ return packed
diff --git a/paimon-python/pypaimon/table/catalog_environment.py
b/paimon-python/pypaimon/table/catalog_environment.py
index 3150e2d78e..e694792ff4 100644
--- a/paimon-python/pypaimon/table/catalog_environment.py
+++ b/paimon-python/pypaimon/table/catalog_environment.py
@@ -17,7 +17,7 @@ limitations under the License.
"""
from typing import Optional
-from pypaimon.api.identifier import Identifier
+from pypaimon.common.identifier import Identifier
from pypaimon.catalog.catalog_loader import CatalogLoader
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index 9f230d3463..e1b8322fe8 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -20,7 +20,7 @@ from pathlib import Path
from pypaimon import Table
from pypaimon.api.core_options import CoreOptions
-from pypaimon.api.identifier import Identifier
+from pypaimon.common.identifier import Identifier
from pypaimon.schema.table_schema import TableSchema
from pypaimon.common.file_io import FileIO
from pypaimon.schema.schema_manager import SchemaManager
diff --git a/paimon-python/pypaimon/tests/api_test.py
b/paimon-python/pypaimon/tests/api_test.py
index b84bc405b5..d4b21afc99 100644
--- a/paimon-python/pypaimon/tests/api_test.py
+++ b/paimon-python/pypaimon/tests/api_test.py
@@ -24,7 +24,7 @@ from .rest_server import RESTCatalogServer
from ..api.api_response import (ConfigResponse)
from ..api import RESTApi
from ..api.auth import BearTokenAuthProvider
-from ..api.identifier import Identifier
+from pypaimon.common.identifier import Identifier
from pypaimon.common.rest_json import JSON
from pypaimon.schema.table_schema import TableSchema
from ..api.token_loader import DLFTokenLoaderFactory, DLFToken
diff --git a/paimon-python/pypaimon/tests/rest_catalog_test.py
b/paimon-python/pypaimon/tests/rest_catalog_test.py
index be5989a171..6fedb916e9 100644
--- a/paimon-python/pypaimon/tests/rest_catalog_test.py
+++ b/paimon-python/pypaimon/tests/rest_catalog_test.py
@@ -25,7 +25,7 @@ from pypaimon.api.auth import BearTokenAuthProvider
from pypaimon.api.options import Options
from pypaimon.catalog.catalog_context import CatalogContext
from pypaimon.catalog.table_metadata import TableMetadata
-from pypaimon.rest.rest_catalog import RESTCatalog
+from pypaimon.catalog.rest.rest_catalog import RESTCatalog
from pypaimon.schema.data_types import DataField, ArrayType, AtomicType,
MapType
from pypaimon.schema.table_schema import TableSchema
from pypaimon.tests.rest_server import RESTCatalogServer