This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 28fabdd3 feat: Add support for rest scan planning (#2864)
28fabdd3 is described below

commit 28fabdd32d09d559e7fbed9b2883edeb5ac5e873
Author: geruh <[email protected]>
AuthorDate: Fri Jan 16 07:51:49 2026 -0800

    feat: Add support for rest scan planning (#2864)
    
    related to #2775
    
    # Rationale for this change
    
    Adds **synchornous** client-side support for REST server side scan
    planning, allowing for scanning if the rest catalog supports it.
    
    This PR cherry-picks and builds on two WIP PRs:
      - Rest Models #2861
      - Endpoints PR #2848
    
    Currently scanning is enable with rest-scan-planning-enabled=true in
    catalog properties.
    
    TODO: spec handling
    
    ## Are these changes tested?
    
    Integration tests added with manual testing
    
    ## Are there any user-facing changes?
    
    yes
---
 pyiceberg/catalog/__init__.py                      |   4 +
 pyiceberg/catalog/rest/__init__.py                 | 126 +++++-
 pyiceberg/exceptions.py                            |   4 +
 pyiceberg/manifest.py                              |  22 ++
 pyiceberg/table/__init__.py                        | 126 +++++-
 tests/catalog/test_rest.py                         |  20 +-
 tests/catalog/test_scan_planning_models.py         | 422 +++++++++++++--------
 .../test_rest_scan_planning_integration.py         | 346 +++++++++++++++++
 8 files changed, 898 insertions(+), 172 deletions(-)

diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py
index d1d83c6d..67ca805c 100644
--- a/pyiceberg/catalog/__init__.py
+++ b/pyiceberg/catalog/__init__.py
@@ -722,6 +722,10 @@ class Catalog(ABC):
 
         return ".".join(segment.strip() for segment in tuple_identifier)
 
+    def supports_server_side_planning(self) -> bool:
+        """Check if the catalog supports server-side scan planning."""
+        return False
+
     @staticmethod
     def identifier_to_database(
         identifier: str | Identifier, err: type[ValueError] | 
type[NoSuchNamespaceError] = ValueError
diff --git a/pyiceberg/catalog/rest/__init__.py 
b/pyiceberg/catalog/rest/__init__.py
index 533c4313..84188e58 100644
--- a/pyiceberg/catalog/rest/__init__.py
+++ b/pyiceberg/catalog/rest/__init__.py
@@ -14,6 +14,7 @@
 #  KIND, either express or implied.  See the License for the
 #  specific language governing permissions and limitations
 #  under the License.
+from collections import deque
 from enum import Enum
 from typing import (
     TYPE_CHECKING,
@@ -21,7 +22,7 @@ from typing import (
     Union,
 )
 
-from pydantic import ConfigDict, Field, field_validator
+from pydantic import ConfigDict, Field, TypeAdapter, field_validator
 from requests import HTTPError, Session
 from tenacity import RetryCallState, retry, retry_if_exception_type, 
stop_after_attempt
 
@@ -36,6 +37,16 @@ from pyiceberg.catalog import (
 )
 from pyiceberg.catalog.rest.auth import AuthManager, AuthManagerAdapter, 
AuthManagerFactory, LegacyOAuth2AuthManager
 from pyiceberg.catalog.rest.response import _handle_non_200_response
+from pyiceberg.catalog.rest.scan_planning import (
+    FetchScanTasksRequest,
+    PlanCancelled,
+    PlanCompleted,
+    PlanFailed,
+    PlanningResponse,
+    PlanSubmitted,
+    PlanTableScanRequest,
+    ScanTasks,
+)
 from pyiceberg.exceptions import (
     AuthorizationExpiredError,
     CommitFailedException,
@@ -44,6 +55,7 @@ from pyiceberg.exceptions import (
     NamespaceNotEmptyError,
     NoSuchIdentifierError,
     NoSuchNamespaceError,
+    NoSuchPlanTaskError,
     NoSuchTableError,
     NoSuchViewError,
     TableAlreadyExistsError,
@@ -56,6 +68,7 @@ from pyiceberg.table import (
     CommitTableRequest,
     CommitTableResponse,
     CreateTableTransaction,
+    FileScanTask,
     StagedTable,
     Table,
     TableIdentifier,
@@ -316,6 +329,9 @@ class ListViewsResponse(IcebergBaseModel):
     identifiers: list[ListViewResponseEntry] = Field()
 
 
+_PLANNING_RESPONSE_ADAPTER = TypeAdapter(PlanningResponse)
+
+
 class RestCatalog(Catalog):
     uri: str
     _session: Session
@@ -375,15 +391,113 @@ class RestCatalog(Catalog):
 
         return session
 
-    def is_rest_scan_planning_enabled(self) -> bool:
-        """Check if rest server-side scan planning is enabled.
+    def supports_server_side_planning(self) -> bool:
+        """Check if the catalog supports server-side scan planning."""
+        return Capability.V1_SUBMIT_TABLE_SCAN_PLAN in 
self._supported_endpoints and property_as_bool(
+            self.properties, REST_SCAN_PLANNING_ENABLED, 
REST_SCAN_PLANNING_ENABLED_DEFAULT
+        )
+
+    @retry(**_RETRY_ARGS)
+    def _plan_table_scan(self, identifier: str | Identifier, request: 
PlanTableScanRequest) -> PlanningResponse:
+        """Submit a scan plan request to the REST server.
+
+        Args:
+            identifier: Table identifier.
+            request: The scan plan request parameters.
 
         Returns:
-            True if enabled, False otherwise.
+            PlanningResponse the result of the scan plan request representing 
the status
+
+        Raises:
+            NoSuchTableError: If a table with the given identifier does not 
exist.
         """
-        return Capability.V1_SUBMIT_TABLE_SCAN_PLAN in 
self._supported_endpoints and property_as_bool(
-            self.properties, REST_SCAN_PLANNING_ENABLED, 
REST_SCAN_PLANNING_ENABLED_DEFAULT
+        self._check_endpoint(Capability.V1_SUBMIT_TABLE_SCAN_PLAN)
+        response = self._session.post(
+            self.url(Endpoints.plan_table_scan, prefixed=True, 
**self._split_identifier_for_path(identifier)),
+            data=request.model_dump_json(by_alias=True, 
exclude_none=True).encode(UTF8),
         )
+        try:
+            response.raise_for_status()
+        except HTTPError as exc:
+            _handle_non_200_response(exc, {404: NoSuchTableError})
+
+        return _PLANNING_RESPONSE_ADAPTER.validate_json(response.text)
+
+    @retry(**_RETRY_ARGS)
+    def _fetch_scan_tasks(self, identifier: str | Identifier, plan_task: str) 
-> ScanTasks:
+        """Fetch additional scan tasks using a plan task token.
+
+        Args:
+            identifier: Table identifier.
+            plan_task: The plan task token from a previous response.
+
+        Returns:
+            ScanTasks containing file scan tasks and possibly more plan-task 
tokens.
+
+        Raises:
+            NoSuchPlanTaskError: If a plan task with the given identifier or 
task does not exist.
+        """
+        self._check_endpoint(Capability.V1_TABLE_SCAN_PLAN_TASKS)
+        request = FetchScanTasksRequest(plan_task=plan_task)
+        response = self._session.post(
+            self.url(Endpoints.fetch_scan_tasks, prefixed=True, 
**self._split_identifier_for_path(identifier)),
+            data=request.model_dump_json(by_alias=True).encode(UTF8),
+        )
+        try:
+            response.raise_for_status()
+        except HTTPError as exc:
+            _handle_non_200_response(exc, {404: NoSuchPlanTaskError})
+
+        return ScanTasks.model_validate_json(response.text)
+
+    def plan_scan(self, identifier: str | Identifier, request: 
PlanTableScanRequest) -> list[FileScanTask]:
+        """Plan a table scan and return FileScanTasks.
+
+        Handles the full scan planning lifecycle including pagination.
+
+        Args:
+            identifier: Table identifier.
+            request: The scan plan request parameters.
+
+        Returns:
+            List of FileScanTask objects ready for execution.
+
+        Raises:
+            RuntimeError: If planning fails, is cancelled, or returns 
unexpected response.
+            NotImplementedError: If async planning is required but not yet 
supported.
+        """
+        response = self._plan_table_scan(identifier, request)
+
+        if isinstance(response, PlanFailed):
+            error_msg = response.error.message if response.error else "unknown 
error"
+            raise RuntimeError(f"Received status: failed: {error_msg}")
+
+        if isinstance(response, PlanCancelled):
+            raise RuntimeError("Received status: cancelled")
+
+        if isinstance(response, PlanSubmitted):
+            # TODO: implement polling for async planning
+            raise NotImplementedError(f"Async scan planning not yet supported 
for planId: {response.plan_id}")
+
+        if not isinstance(response, PlanCompleted):
+            raise RuntimeError(f"Invalid planStatus for response: 
{type(response).__name__}")
+
+        tasks: list[FileScanTask] = []
+
+        # Collect tasks from initial response
+        for task in response.file_scan_tasks:
+            tasks.append(FileScanTask.from_rest_response(task, 
response.delete_files))
+
+        # Fetch and collect from additional batches
+        pending_tasks = deque(response.plan_tasks)
+        while pending_tasks:
+            plan_task = pending_tasks.popleft()
+            batch = self._fetch_scan_tasks(identifier, plan_task)
+            for task in batch.file_scan_tasks:
+                tasks.append(FileScanTask.from_rest_response(task, 
batch.delete_files))
+            pending_tasks.extend(batch.plan_tasks)
+
+        return tasks
 
     def _create_legacy_oauth2_auth_manager(self, session: Session) -> 
AuthManager:
         """Create the LegacyOAuth2AuthManager by fetching required properties.
diff --git a/pyiceberg/exceptions.py b/pyiceberg/exceptions.py
index c80f104e..e755c730 100644
--- a/pyiceberg/exceptions.py
+++ b/pyiceberg/exceptions.py
@@ -52,6 +52,10 @@ class NoSuchNamespaceError(Exception):
     """Raised when a referenced name-space is not found."""
 
 
+class NoSuchPlanTaskError(Exception):
+    """Raised when a scan plan task is not found."""
+
+
 class RESTError(Exception):
     """Raises when there is an unknown response from the REST Catalog."""
 
diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py
index ca288388..0afa1666 100644
--- a/pyiceberg/manifest.py
+++ b/pyiceberg/manifest.py
@@ -69,6 +69,28 @@ class DataFileContent(int, Enum):
         """Return the string representation of the DataFileContent class."""
         return f"DataFileContent.{self.name}"
 
+    @staticmethod
+    def from_rest_type(content_type: str) -> DataFileContent:
+        """Convert REST API content type string to DataFileContent.
+
+        Args:
+            content_type: REST API content type.
+
+        Returns:
+            The corresponding DataFileContent enum value.
+
+        Raises:
+            ValueError: If the content type is unknown.
+        """
+        mapping = {
+            "data": DataFileContent.DATA,
+            "position-deletes": DataFileContent.POSITION_DELETES,
+            "equality-deletes": DataFileContent.EQUALITY_DELETES,
+        }
+        if content_type not in mapping:
+            raise ValueError(f"Invalid file content value: {content_type}")
+        return mapping[content_type]
+
 
 class ManifestContent(int, Enum):
     DATA = 0
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 9fdf5a70..b30a1426 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -145,6 +145,11 @@ if TYPE_CHECKING:
     from pyiceberg_core.datafusion import IcebergDataFusionTable
 
     from pyiceberg.catalog import Catalog
+    from pyiceberg.catalog.rest.scan_planning import (
+        RESTContentFile,
+        RESTDeleteFile,
+        RESTFileScanTask,
+    )
 
 ALWAYS_TRUE = AlwaysTrue()
 DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"
@@ -1194,6 +1199,8 @@ class Table:
             snapshot_id=snapshot_id,
             options=options,
             limit=limit,
+            catalog=self.catalog,
+            table_identifier=self._identifier,
         )
 
     @property
@@ -1725,6 +1732,8 @@ class TableScan(ABC):
     snapshot_id: int | None
     options: Properties
     limit: int | None
+    catalog: Catalog | None
+    table_identifier: Identifier | None
 
     def __init__(
         self,
@@ -1736,6 +1745,8 @@ class TableScan(ABC):
         snapshot_id: int | None = None,
         options: Properties = EMPTY_DICT,
         limit: int | None = None,
+        catalog: Catalog | None = None,
+        table_identifier: Identifier | None = None,
     ):
         self.table_metadata = table_metadata
         self.io = io
@@ -1745,6 +1756,8 @@ class TableScan(ABC):
         self.snapshot_id = snapshot_id
         self.options = options
         self.limit = limit
+        self.catalog = catalog
+        self.table_identifier = table_identifier
 
     def snapshot(self) -> Snapshot | None:
         if self.snapshot_id:
@@ -1839,6 +1852,74 @@ class FileScanTask(ScanTask):
         self.delete_files = delete_files or set()
         self.residual = residual
 
+    @staticmethod
+    def from_rest_response(
+        rest_task: RESTFileScanTask,
+        delete_files: list[RESTDeleteFile],
+    ) -> FileScanTask:
+        """Convert a RESTFileScanTask to a FileScanTask.
+
+        Args:
+            rest_task: The REST file scan task.
+            delete_files: The list of delete files from the ScanTasks response.
+
+        Returns:
+            A FileScanTask with the converted data and delete files.
+
+        Raises:
+            NotImplementedError: If equality delete files are encountered.
+        """
+        from pyiceberg.catalog.rest.scan_planning import RESTEqualityDeleteFile
+
+        data_file = _rest_file_to_data_file(rest_task.data_file)
+
+        resolved_deletes: set[DataFile] = set()
+        if rest_task.delete_file_references:
+            for idx in rest_task.delete_file_references:
+                delete_file = delete_files[idx]
+                if isinstance(delete_file, RESTEqualityDeleteFile):
+                    raise NotImplementedError(f"PyIceberg does not yet support 
equality deletes: {delete_file.file_path}")
+                resolved_deletes.add(_rest_file_to_data_file(delete_file))
+
+        return FileScanTask(
+            data_file=data_file,
+            delete_files=resolved_deletes,
+            residual=rest_task.residual_filter if rest_task.residual_filter 
else ALWAYS_TRUE,
+        )
+
+
+def _rest_file_to_data_file(rest_file: RESTContentFile) -> DataFile:
+    """Convert a REST content file to a manifest DataFile."""
+    from pyiceberg.catalog.rest.scan_planning import RESTDataFile
+
+    if isinstance(rest_file, RESTDataFile):
+        column_sizes = rest_file.column_sizes.to_dict() if 
rest_file.column_sizes else None
+        value_counts = rest_file.value_counts.to_dict() if 
rest_file.value_counts else None
+        null_value_counts = rest_file.null_value_counts.to_dict() if 
rest_file.null_value_counts else None
+        nan_value_counts = rest_file.nan_value_counts.to_dict() if 
rest_file.nan_value_counts else None
+    else:
+        column_sizes = None
+        value_counts = None
+        null_value_counts = None
+        nan_value_counts = None
+
+    data_file = DataFile.from_args(
+        content=DataFileContent.from_rest_type(rest_file.content),
+        file_path=rest_file.file_path,
+        file_format=rest_file.file_format,
+        partition=Record(*rest_file.partition) if rest_file.partition else 
Record(),
+        record_count=rest_file.record_count,
+        file_size_in_bytes=rest_file.file_size_in_bytes,
+        column_sizes=column_sizes,
+        value_counts=value_counts,
+        null_value_counts=null_value_counts,
+        nan_value_counts=nan_value_counts,
+        split_offsets=rest_file.split_offsets,
+        sort_order_id=rest_file.sort_order_id,
+    )
+    data_file.spec_id = rest_file.spec_id
+    return data_file
+
 
 def _open_manifest(
     io: FileIO,
@@ -2011,12 +2092,33 @@ class DataScan(TableScan):
             ],
         )
 
-    def plan_files(self) -> Iterable[FileScanTask]:
-        """Plans the relevant files by filtering on the PartitionSpecs.
+    def _should_use_server_side_planning(self) -> bool:
+        """Check if server-side scan planning should be used for this scan."""
+        if not self.catalog:
+            return False
+        return self.catalog.supports_server_side_planning()
+
+    def _plan_files_server_side(self) -> Iterable[FileScanTask]:
+        """Plan files using REST server-side scan planning."""
+        from pyiceberg.catalog.rest import RestCatalog
+        from pyiceberg.catalog.rest.scan_planning import PlanTableScanRequest
+
+        if not isinstance(self.catalog, RestCatalog):
+            raise TypeError("REST scan planning requires a RestCatalog")
+        if self.table_identifier is None:
+            raise ValueError("REST scan planning requires a table identifier")
+
+        request = PlanTableScanRequest(
+            snapshot_id=self.snapshot_id,
+            select=list(self.selected_fields) if self.selected_fields != 
("*",) else None,
+            filter=self.row_filter if self.row_filter != ALWAYS_TRUE else None,
+            case_sensitive=self.case_sensitive,
+        )
 
-        Returns:
-            List of FileScanTasks that contain both data and delete files.
-        """
+        return self.catalog.plan_scan(self.table_identifier, request)
+
+    def _plan_files_local(self) -> Iterable[FileScanTask]:
+        """Plan files locally by reading manifests."""
         data_entries: list[ManifestEntry] = []
         positional_delete_entries = SortedList(key=lambda entry: 
entry.sequence_number or INITIAL_SEQUENCE_NUMBER)
 
@@ -2047,6 +2149,20 @@ class DataScan(TableScan):
             for data_entry in data_entries
         ]
 
+    def plan_files(self) -> Iterable[FileScanTask]:
+        """Plans the relevant files by filtering on the PartitionSpecs.
+
+        If the table comes from a REST catalog with scan planning enabled,
+        this will use server-side scan planning. Otherwise, it falls back
+        to local planning.
+
+        Returns:
+            List of FileScanTasks that contain both data and delete files.
+        """
+        if self._should_use_server_side_planning():
+            return self._plan_files_server_side()
+        return self._plan_files_local()
+
     def to_arrow(self) -> pa.Table:
         """Read an Arrow table eagerly from this DataScan.
 
diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py
index 37c373cc..2e3b0d50 100644
--- a/tests/catalog/test_rest.py
+++ b/tests/catalog/test_rest.py
@@ -2091,12 +2091,12 @@ class TestRestCatalogClose:
         assert catalog is not None and hasattr(catalog, "_session")
         assert len(catalog._session.adapters) == self.EXPECTED_ADAPTERS_SIGV4
 
-    def test_rest_scan_planning_disabled_by_default(self, rest_mock: Mocker) 
-> None:
+    def test_server_side_planning_disabled_by_default(self, rest_mock: Mocker) 
-> None:
         catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
 
-        assert catalog.is_rest_scan_planning_enabled() is False
+        assert catalog.supports_server_side_planning() is False
 
-    def test_rest_scan_planning_enabled_by_property(self, rest_mock: Mocker) 
-> None:
+    def test_server_side_planning_enabled_by_property(self, rest_mock: Mocker) 
-> None:
         catalog = RestCatalog(
             "rest",
             uri=TEST_URI,
@@ -2104,9 +2104,9 @@ class TestRestCatalogClose:
             **{"rest-scan-planning-enabled": "true"},
         )
 
-        assert catalog.is_rest_scan_planning_enabled() is True
+        assert catalog.supports_server_side_planning() is True
 
-    def test_rest_scan_planning_disabled_when_endpoint_unsupported(self, 
requests_mock: Mocker) -> None:
+    def test_server_side_planning_disabled_when_endpoint_unsupported(self, 
requests_mock: Mocker) -> None:
         # config endpoint does not populate endpoint falling back to default
         requests_mock.get(
             f"{TEST_URI}v1/config",
@@ -2120,9 +2120,9 @@ class TestRestCatalogClose:
             **{"rest-scan-planning-enabled": "true"},
         )
 
-        assert catalog.is_rest_scan_planning_enabled() is False
+        assert catalog.supports_server_side_planning() is False
 
-    def test_rest_scan_planning_explicitly_disabled(self, rest_mock: Mocker) 
-> None:
+    def test_server_side_planning_explicitly_disabled(self, rest_mock: Mocker) 
-> None:
         catalog = RestCatalog(
             "rest",
             uri=TEST_URI,
@@ -2130,9 +2130,9 @@ class TestRestCatalogClose:
             **{"rest-scan-planning-enabled": "false"},
         )
 
-        assert catalog.is_rest_scan_planning_enabled() is False
+        assert catalog.supports_server_side_planning() is False
 
-    def test_rest_scan_planning_enabled_from_server_config(self, rest_mock: 
Mocker) -> None:
+    def test_server_side_planning_enabled_from_server_config(self, rest_mock: 
Mocker) -> None:
         rest_mock.get(
             f"{TEST_URI}v1/config",
             json={
@@ -2144,7 +2144,7 @@ class TestRestCatalogClose:
         )
         catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
 
-        assert catalog.is_rest_scan_planning_enabled() is True
+        assert catalog.supports_server_side_planning() is True
 
     def test_supported_endpoint(self, requests_mock: Mocker) -> None:
         requests_mock.get(
diff --git a/tests/catalog/test_scan_planning_models.py 
b/tests/catalog/test_scan_planning_models.py
index 9f03c8f7..567f1444 100644
--- a/tests/catalog/test_scan_planning_models.py
+++ b/tests/catalog/test_scan_planning_models.py
@@ -18,7 +18,9 @@ from typing import Any
 
 import pytest
 from pydantic import TypeAdapter, ValidationError
+from requests_mock import Mocker
 
+from pyiceberg.catalog.rest import RestCatalog
 from pyiceberg.catalog.rest.scan_planning import (
     CountMap,
     FetchScanTasksRequest,
@@ -33,12 +35,92 @@ from pyiceberg.catalog.rest.scan_planning import (
     RESTFileScanTask,
     RESTPositionDeleteFile,
     ScanTasks,
-    StorageCredential,
     ValueMap,
 )
 from pyiceberg.expressions import AlwaysTrue, EqualTo, Reference
 from pyiceberg.manifest import FileFormat
 
+TEST_URI = "https://iceberg-test-catalog/";
+
+
[email protected]
+def rest_scan_catalog(requests_mock: Mocker) -> RestCatalog:
+    requests_mock.get(
+        f"{TEST_URI}v1/config",
+        json={
+            "defaults": {"rest-scan-planning-enabled": "true"},
+            "overrides": {},
+            "endpoints": [
+                "POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan",
+                "POST 
/v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks",
+            ],
+        },
+        status_code=200,
+    )
+
+    return RestCatalog(
+        "test",
+        uri=TEST_URI,
+        **{"rest-scan-planning-enabled": "true"},
+    )
+
+
+def _rest_data_file(
+    *,
+    file_path: str = "s3://bucket/table/data/file.parquet",
+    file_format: str = "parquet",
+    file_size_in_bytes: int = 1024,
+    record_count: int = 100,
+) -> dict[str, Any]:
+    return {
+        "spec-id": 0,
+        "content": "data",
+        "file-path": file_path,
+        "file-format": file_format,
+        "file-size-in-bytes": file_size_in_bytes,
+        "record-count": record_count,
+    }
+
+
+def _rest_position_delete_file(
+    *,
+    file_path: str = "s3://bucket/table/delete.parquet",
+    file_format: str = "parquet",
+    file_size_in_bytes: int = 256,
+    record_count: int = 5,
+    content_offset: int = 100,
+    content_size_in_bytes: int = 200,
+) -> dict[str, Any]:
+    return {
+        "spec-id": 0,
+        "content": "position-deletes",
+        "file-path": file_path,
+        "file-format": file_format,
+        "file-size-in-bytes": file_size_in_bytes,
+        "record-count": record_count,
+        "content-offset": content_offset,
+        "content-size-in-bytes": content_size_in_bytes,
+    }
+
+
+def _rest_equality_delete_file(
+    *,
+    file_path: str = "s3://bucket/table/eq-delete.parquet",
+    equality_ids: list[int],
+    file_format: str = "parquet",
+    file_size_in_bytes: int = 256,
+    record_count: int = 5,
+) -> dict[str, Any]:
+    return {
+        "spec-id": 0,
+        "content": "equality-deletes",
+        "file-path": file_path,
+        "file-format": file_format,
+        "file-size-in-bytes": file_size_in_bytes,
+        "record-count": record_count,
+        "equality-ids": equality_ids,
+    }
+
 
 def test_count_map_valid() -> None:
     cm = CountMap(keys=[1, 2, 3], values=[100, 200, 300])
@@ -62,80 +144,45 @@ def test_value_map_mixed_types() -> None:
 
 
 def test_data_file_parsing() -> None:
-    data = {
-        "spec-id": 0,
-        "content": "data",
-        "file-path": "s3://bucket/table/file.parquet",
-        "file-format": "parquet",
-        "file-size-in-bytes": 1024,
-        "record-count": 100,
-    }
-    df = RESTDataFile.model_validate(data)
+    data_file = _rest_data_file(file_path="s3://bucket/table/file.parquet")
+    df = RESTDataFile.model_validate(data_file)
     assert df.content == "data"
     assert df.file_path == "s3://bucket/table/file.parquet"
     assert df.file_format == FileFormat.PARQUET
-    assert df.file_size_in_bytes == 1024
 
 
 def test_data_file_with_stats() -> None:
-    data = {
-        "spec-id": 0,
-        "content": "data",
-        "file-path": "s3://bucket/table/file.parquet",
-        "file-format": "parquet",
-        "file-size-in-bytes": 1024,
-        "record-count": 100,
+    data_file = _rest_data_file()
+
+    data_file_with_stats = {
+        **data_file,
         "column-sizes": {"keys": [1, 2], "values": [500, 524]},
         "value-counts": {"keys": [1, 2], "values": [100, 100]},
     }
-    df = RESTDataFile.model_validate(data)
+    df = RESTDataFile.model_validate(data_file_with_stats)
     assert df.column_sizes is not None
     assert df.column_sizes.to_dict() == {1: 500, 2: 524}
 
 
 def test_position_delete_file() -> None:
-    data = {
-        "spec-id": 0,
-        "content": "position-deletes",
-        "file-path": "s3://bucket/table/delete.parquet",
-        "file-format": "parquet",
-        "file-size-in-bytes": 512,
-        "record-count": 10,
-        "content-offset": 100,
-        "content-size-in-bytes": 200,
-    }
-    pdf = RESTPositionDeleteFile.model_validate(data)
+    delete_file = 
_rest_position_delete_file(file_path="s3://bucket/table/delete.puffin", 
file_format="puffin")
+    pdf = RESTPositionDeleteFile.model_validate(delete_file)
     assert pdf.content == "position-deletes"
     assert pdf.content_offset == 100
     assert pdf.content_size_in_bytes == 200
 
 
 def test_equality_delete_file() -> None:
-    data = {
-        "spec-id": 0,
-        "content": "equality-deletes",
-        "file-path": "s3://bucket/table/eq-delete.parquet",
-        "file-format": "parquet",
-        "file-size-in-bytes": 256,
-        "record-count": 5,
-        "equality-ids": [1, 2],
-    }
-    edf = RESTEqualityDeleteFile.model_validate(data)
-    assert edf.content == "equality-deletes"
-    assert edf.equality_ids == [1, 2]
+    delete_file = _rest_equality_delete_file(equality_ids=[1, 2])
+    equality_delete = RESTEqualityDeleteFile.model_validate(delete_file)
+    assert equality_delete.content == "equality-deletes"
+    assert equality_delete.equality_ids == [1, 2]
 
 
 def test_file_format_case_insensitive() -> None:
     for fmt in ["parquet", "PARQUET", "Parquet"]:
-        data = {
-            "spec-id": 0,
-            "content": "data",
-            "file-path": "/path",
-            "file-format": fmt,
-            "file-size-in-bytes": 100,
-            "record-count": 10,
-        }
-        df = RESTDataFile.model_validate(data)
+        data_file = _rest_data_file(file_format=fmt)
+        df = RESTDataFile.model_validate(data_file)
         assert df.file_format == FileFormat.PARQUET
 
 
@@ -148,56 +195,27 @@ def test_file_format_case_insensitive() -> None:
     ],
 )
 def test_file_formats(format_str: str, expected: FileFormat) -> None:
-    data = {
-        "spec-id": 0,
-        "content": "data",
-        "file-path": f"s3://bucket/table/path/file.{format_str}",
-        "file-format": format_str,
-        "file-size-in-bytes": 1024,
-        "record-count": 100,
-    }
-    df = RESTDataFile.model_validate(data)
+    data_file = _rest_data_file(file_format=format_str)
+    df = RESTDataFile.model_validate(data_file)
     assert df.file_format == expected
 
 
 def test_delete_file_discriminator_position() -> None:
-    data = {
-        "spec-id": 0,
-        "content": "position-deletes",
-        "file-path": "s3://bucket/table/delete.parquet",
-        "file-format": "parquet",
-        "file-size-in-bytes": 256,
-        "record-count": 5,
-    }
-    result = TypeAdapter(RESTDeleteFile).validate_python(data)
+    delete_file = _rest_position_delete_file()
+    result = TypeAdapter(RESTDeleteFile).validate_python(delete_file)
     assert isinstance(result, RESTPositionDeleteFile)
 
 
 def test_delete_file_discriminator_equality() -> None:
-    data = {
-        "spec-id": 0,
-        "content": "equality-deletes",
-        "file-path": "s3://bucket/table/delete.parquet",
-        "file-format": "parquet",
-        "file-size-in-bytes": 256,
-        "record-count": 5,
-        "equality-ids": [1],
-    }
-    result = TypeAdapter(RESTDeleteFile).validate_python(data)
+    delete_file = _rest_equality_delete_file(equality_ids=[1, 2])
+    result = TypeAdapter(RESTDeleteFile).validate_python(delete_file)
     assert isinstance(result, RESTEqualityDeleteFile)
 
 
 def test_basic_scan_task() -> None:
-    data = {
-        "data-file": {
-            "spec-id": 0,
-            "content": "data",
-            "file-path": "s3://bucket/table/file.parquet",
-            "file-format": "parquet",
-            "file-size-in-bytes": 1024,
-            "record-count": 100,
-        }
-    }
+    data_file = _rest_data_file(file_path="s3://bucket/table/file.parquet")
+
+    data = {"data-file": data_file}
     task = RESTFileScanTask.model_validate(data)
     assert task.data_file.file_path == "s3://bucket/table/file.parquet"
     assert task.delete_file_references is None
@@ -205,15 +223,9 @@ def test_basic_scan_task() -> None:
 
 
 def test_scan_task_with_delete_references() -> None:
+    data_file = _rest_data_file()
     data = {
-        "data-file": {
-            "spec-id": 0,
-            "content": "data",
-            "file-path": "s3://bucket/table/file.parquet",
-            "file-format": "parquet",
-            "file-size-in-bytes": 1024,
-            "record-count": 100,
-        },
+        "data-file": data_file,
         "delete-file-references": [0, 1, 2],
     }
     task = RESTFileScanTask.model_validate(data)
@@ -221,15 +233,9 @@ def test_scan_task_with_delete_references() -> None:
 
 
 def test_scan_task_with_residual_filter_true() -> None:
+    data_file = _rest_data_file()
     data = {
-        "data-file": {
-            "spec-id": 0,
-            "content": "data",
-            "file-path": "s3://bucket/table/file.parquet",
-            "file-format": "parquet",
-            "file-size-in-bytes": 1024,
-            "record-count": 100,
-        },
+        "data-file": data_file,
         "residual-filter": True,
     }
     task = RESTFileScanTask.model_validate(data)
@@ -249,27 +255,13 @@ def test_empty_scan_tasks() -> None:
 
 
 def test_scan_tasks_with_files() -> None:
+    data_file = _rest_data_file(file_path="s3://bucket/table/data.parquet")
+    delete_file = _rest_position_delete_file()
     data = {
-        "delete-files": [
-            {
-                "spec-id": 0,
-                "content": "position-deletes",
-                "file-path": "s3://bucket/table/delete.parquet",
-                "file-format": "parquet",
-                "file-size-in-bytes": 256,
-                "record-count": 5,
-            }
-        ],
+        "delete-files": [delete_file],
         "file-scan-tasks": [
             {
-                "data-file": {
-                    "spec-id": 0,
-                    "content": "data",
-                    "file-path": "s3://bucket/table/data.parquet",
-                    "file-format": "parquet",
-                    "file-size-in-bytes": 1024,
-                    "record-count": 100,
-                },
+                "data-file": data_file,
                 "delete-file-references": [0],
             }
         ],
@@ -282,18 +274,12 @@ def test_scan_tasks_with_files() -> None:
 
 
 def test_invalid_delete_file_reference() -> None:
+    data_file = _rest_data_file(file_path="s3://bucket/table/data.parquet")
     data = {
         "delete-files": [],
         "file-scan-tasks": [
             {
-                "data-file": {
-                    "spec-id": 0,
-                    "content": "data",
-                    "file-path": "s3://bucket/table/data.parquet",
-                    "file-format": "parquet",
-                    "file-size-in-bytes": 1024,
-                    "record-count": 100,
-                },
+                "data-file": data_file,
                 "delete-file-references": [0],
             }
         ],
@@ -305,17 +291,9 @@ def test_invalid_delete_file_reference() -> None:
 
 
 def test_delete_files_require_file_scan_tasks() -> None:
+    delete_file = _rest_position_delete_file()
     data = {
-        "delete-files": [
-            {
-                "spec-id": 0,
-                "content": "position-deletes",
-                "file-path": "s3://bucket/table/delete.parquet",
-                "file-format": "parquet",
-                "file-size-in-bytes": 256,
-                "record-count": 5,
-            }
-        ],
+        "delete-files": [delete_file],
         "file-scan-tasks": [],
         "plan-tasks": [],
     }
@@ -437,14 +415,156 @@ def test_cancelled_response() -> None:
     assert isinstance(result, PlanCancelled)
 
 
-def test_storage_credential_parsing() -> None:
-    data = {
-        "prefix": "s3://bucket/path/",
-        "config": {
-            "s3.access-key-id": "key",
-            "s3.secret-access-key": "secret",
+def test_plan_scan_completed_single_batch(rest_scan_catalog: RestCatalog, 
requests_mock: Mocker) -> None:
+    file_one = _rest_data_file(file_path="s3://bucket/tbl/data/file1.parquet")
+
+    requests_mock.post(
+        f"{TEST_URI}v1/namespaces/db/tables/tbl/plan",
+        json={
+            "status": "completed",
+            "plan-id": "plan-123",
+            "delete-files": [],
+            "file-scan-tasks": [{"data-file": file_one}],
+            "plan-tasks": [],
         },
-    }
-    cred = StorageCredential.model_validate(data)
-    assert cred.prefix == "s3://bucket/path/"
-    assert cred.config["s3.access-key-id"] == "key"
+        status_code=200,
+    )
+
+    request = PlanTableScanRequest()
+    tasks = list(rest_scan_catalog.plan_scan(("db", "tbl"), request))
+
+    assert len(tasks) == 1
+    assert tasks[0].file.file_path == "s3://bucket/tbl/data/file1.parquet"
+
+
+def test_plan_scan_with_pagination(rest_scan_catalog: RestCatalog, 
requests_mock: Mocker) -> None:
+    file_one = _rest_data_file(file_path="s3://bucket/tbl/data/file1.parquet")
+    file_two = _rest_data_file(file_path="s3://bucket/tbl/data/file2.parquet")
+
+    requests_mock.post(
+        f"{TEST_URI}v1/namespaces/db/tables/tbl/plan",
+        json={
+            "status": "completed",
+            "plan-id": "plan-123",
+            "delete-files": [],
+            "file-scan-tasks": [{"data-file": file_one}],
+            "plan-tasks": ["token-batch-2"],
+        },
+        status_code=200,
+    )
+
+    requests_mock.post(
+        f"{TEST_URI}v1/namespaces/db/tables/tbl/tasks",
+        json={
+            "delete-files": [],
+            "file-scan-tasks": [{"data-file": file_two}],
+            "plan-tasks": [],
+        },
+        status_code=200,
+    )
+
+    request = PlanTableScanRequest()
+
+    tasks = list(rest_scan_catalog.plan_scan(("db", "tbl"), request))
+
+    assert len(tasks) == 2
+    assert tasks[0].file.file_path == "s3://bucket/tbl/data/file1.parquet"
+    assert tasks[1].file.file_path == "s3://bucket/tbl/data/file2.parquet"
+
+
+def test_plan_scan_with_delete_files(rest_scan_catalog: RestCatalog, 
requests_mock: Mocker) -> None:
+    file_one = _rest_data_file(file_path="s3://bucket/tbl/data/file1.parquet")
+    delete_file = _rest_position_delete_file()
+    requests_mock.post(
+        f"{TEST_URI}v1/namespaces/db/tables/tbl/plan",
+        json={
+            "status": "completed",
+            "plan-id": "plan-123",
+            "delete-files": [delete_file],
+            "file-scan-tasks": [
+                {
+                    "data-file": file_one,
+                    "delete-file-references": [0],
+                }
+            ],
+            "plan-tasks": [],
+        },
+        status_code=200,
+    )
+
+    request = PlanTableScanRequest()
+    tasks = list(rest_scan_catalog.plan_scan(("db", "tbl"), request))
+
+    assert len(tasks) == 1
+    assert tasks[0].file.file_path == "s3://bucket/tbl/data/file1.parquet"
+    assert len(tasks[0].delete_files) == 1
+
+
+def test_plan_scan_async_not_supported(rest_scan_catalog: RestCatalog, 
requests_mock: Mocker) -> None:
+    requests_mock.post(
+        f"{TEST_URI}v1/namespaces/db/tables/tbl/plan",
+        json={
+            "status": "submitted",
+            "plan-id": "plan-456",
+        },
+        status_code=200,
+    )
+
+    request = PlanTableScanRequest()
+    with pytest.raises(NotImplementedError, match="Async scan planning not yet 
supported"):
+        list(rest_scan_catalog.plan_scan(("db", "tbl"), request))
+
+
+def test_plan_scan_empty_result(rest_scan_catalog: RestCatalog, requests_mock: 
Mocker) -> None:
+    requests_mock.post(
+        f"{TEST_URI}v1/namespaces/db/tables/tbl/plan",
+        json={
+            "status": "completed",
+            "plan-id": "plan-123",
+            "delete-files": [],
+            "file-scan-tasks": [],
+            "plan-tasks": [],
+        },
+        status_code=200,
+    )
+
+    request = PlanTableScanRequest()
+    tasks = list(rest_scan_catalog.plan_scan(("db", "tbl"), request))
+    assert len(tasks) == 0
+
+
+def test_plan_scan_cancelled(rest_scan_catalog: RestCatalog, requests_mock: 
Mocker) -> None:
+    requests_mock.post(
+        f"{TEST_URI}v1/namespaces/db/tables/tbl/plan",
+        json={"status": "cancelled"},
+        status_code=200,
+    )
+
+    request = PlanTableScanRequest()
+    with pytest.raises(RuntimeError, match="Received status: cancelled"):
+        list(rest_scan_catalog.plan_scan(("db", "tbl"), request))
+
+
+def test_plan_scan_equality_deletes_not_supported(rest_scan_catalog: 
RestCatalog, requests_mock: Mocker) -> None:
+    file_one = _rest_data_file(file_path="s3://bucket/tbl/data/file1.parquet")
+    equality_delete = _rest_equality_delete_file(equality_ids=[1, 2])
+    requests_mock.post(
+        f"{TEST_URI}v1/namespaces/db/tables/tbl/plan",
+        json={
+            "status": "completed",
+            "plan-id": "plan-123",
+            "delete-files": [equality_delete],
+            "file-scan-tasks": [
+                {
+                    "data-file": file_one,
+                    "delete-file-references": [0],
+                }
+            ],
+            "plan-tasks": [],
+        },
+        status_code=200,
+    )
+
+    request = PlanTableScanRequest()
+    with pytest.raises(NotImplementedError, match="PyIceberg does not yet 
support equality deletes"):
+        list(rest_scan_catalog.plan_scan(("db", "tbl"), request))
diff --git a/tests/integration/test_rest_scan_planning_integration.py 
b/tests/integration/test_rest_scan_planning_integration.py
new file mode 100644
index 00000000..456dbe41
--- /dev/null
+++ b/tests/integration/test_rest_scan_planning_integration.py
@@ -0,0 +1,346 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from datetime import date, datetime, time, timedelta, timezone
+from decimal import Decimal
+from typing import Any
+from uuid import uuid4
+
+import pyarrow as pa
+import pytest
+from pyspark.sql import SparkSession
+
+from pyiceberg.catalog import Catalog, load_catalog
+from pyiceberg.catalog.rest import RestCatalog
+from pyiceberg.exceptions import NoSuchTableError
+from pyiceberg.expressions import (
+    And,
+    BooleanExpression,
+    EqualTo,
+    GreaterThan,
+    GreaterThanOrEqual,
+    In,
+    IsNull,
+    LessThan,
+    LessThanOrEqual,
+    Not,
+    NotEqualTo,
+    NotIn,
+    NotNull,
+    Or,
+    StartsWith,
+)
+from pyiceberg.partitioning import PartitionField, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.table import ALWAYS_TRUE, Table
+from pyiceberg.transforms import (
+    IdentityTransform,
+)
+from pyiceberg.types import (
+    BinaryType,
+    BooleanType,
+    DateType,
+    DecimalType,
+    DoubleType,
+    FixedType,
+    LongType,
+    NestedField,
+    StringType,
+    TimestampType,
+    TimestamptzType,
+    TimeType,
+    UUIDType,
+)
+
+
[email protected](scope="session")
+def scan_catalog() -> Catalog:
+    catalog = load_catalog(
+        "local",
+        **{
+            "type": "rest",
+            "uri": "http://localhost:8181";,
+            "s3.endpoint": "http://localhost:9000";,
+            "s3.access-key-id": "admin",
+            "s3.secret-access-key": "password",
+            "rest-scan-planning-enabled": "true",
+        },
+    )
+    catalog.create_namespace_if_not_exists("default")
+    return catalog
+
+
+def recreate_table(catalog: Catalog, identifier: str, **kwargs: Any) -> Table:
+    """Drop table if exists and create a new one."""
+    try:
+        catalog.drop_table(identifier)
+    except NoSuchTableError:
+        pass
+    return catalog.create_table(identifier, **kwargs)
+
+
+def _assert_remote_scan_matches_local_scan(
+    rest_table: Table,
+    session_catalog: Catalog,
+    identifier: str,
+    row_filter: BooleanExpression = ALWAYS_TRUE,
+) -> None:
+    rest_tasks = list(rest_table.scan(row_filter=row_filter).plan_files())
+    rest_paths = {task.file.file_path for task in rest_tasks}
+
+    local_table = session_catalog.load_table(identifier)
+    local_tasks = list(local_table.scan(row_filter=row_filter).plan_files())
+    local_paths = {task.file.file_path for task in local_tasks}
+
+    assert rest_paths == local_paths
+
+
[email protected]
+def test_rest_scan_matches_local(scan_catalog: RestCatalog, session_catalog: 
Catalog) -> None:
+    identifier = "default.test_rest_scan"
+
+    table = recreate_table(
+        scan_catalog,
+        identifier,
+        schema=Schema(
+            NestedField(1, "id", LongType()),
+            NestedField(2, "data", StringType()),
+            NestedField(3, "num", LongType()),
+        ),
+    )
+    table.append(pa.Table.from_pydict({"id": [1, 2, 3], "data": ["a", "b", 
"c"], "num": [10, 20, 30]}))
+    table.append(pa.Table.from_pydict({"id": [4, 5, 6], "data": ["d", "e", 
"f"], "num": [40, 50, 60]}))
+
+    try:
+        _assert_remote_scan_matches_local_scan(table, session_catalog, 
identifier)
+    finally:
+        scan_catalog.drop_table(identifier)
+
+
[email protected]
+def test_rest_scan_with_filter(scan_catalog: RestCatalog, session_catalog: 
Catalog) -> None:
+    identifier = "default.test_rest_scan_filter"
+
+    table = recreate_table(
+        scan_catalog,
+        identifier,
+        schema=Schema(
+            NestedField(1, "id", LongType()),
+            NestedField(2, "data", LongType()),
+        ),
+    )
+    table.append(pa.Table.from_pydict({"id": [1, 2, 3], "data": [10, 20, 30]}))
+
+    try:
+        _assert_remote_scan_matches_local_scan(
+            table,
+            session_catalog,
+            identifier,
+            row_filter=And(GreaterThan("data", 5), LessThan("data", 25)),
+        )
+
+        _assert_remote_scan_matches_local_scan(
+            table,
+            session_catalog,
+            identifier,
+            row_filter=EqualTo("id", 1),
+        )
+    finally:
+        scan_catalog.drop_table(identifier)
+
+
[email protected]
+def test_rest_scan_with_deletes(spark: SparkSession, scan_catalog: 
RestCatalog, session_catalog: Catalog) -> None:
+    identifier = "default.test_rest_scan_deletes"
+
+    spark.sql(f"DROP TABLE IF EXISTS {identifier}")
+    spark.sql(f"""
+        CREATE TABLE {identifier} (id bigint, data bigint)
+        USING iceberg
+        TBLPROPERTIES(
+            'format-version' = 2,
+            'write.delete.mode'='merge-on-read'
+        )
+    """)
+    spark.sql(f"INSERT INTO {identifier} VALUES (1, 10), (2, 20), (3, 30)")
+    spark.sql(f"DELETE FROM {identifier} WHERE id = 2")
+
+    try:
+        rest_table = scan_catalog.load_table(identifier)
+        rest_tasks = list(rest_table.scan().plan_files())
+        rest_paths = {task.file.file_path for task in rest_tasks}
+        rest_delete_paths = {delete.file_path for task in rest_tasks for 
delete in task.delete_files}
+
+        local_table = session_catalog.load_table(identifier)
+        local_tasks = list(local_table.scan().plan_files())
+        local_paths = {task.file.file_path for task in local_tasks}
+        local_delete_paths = {delete.file_path for task in local_tasks for 
delete in task.delete_files}
+
+        assert rest_paths == local_paths
+        assert rest_delete_paths == local_delete_paths
+    finally:
+        spark.sql(f"DROP TABLE IF EXISTS {identifier}")
+
+
[email protected]
+def test_rest_scan_with_partitioning(scan_catalog: RestCatalog, 
session_catalog: Catalog) -> None:
+    identifier = "default.test_rest_scan_partitioned"
+
+    schema = Schema(
+        NestedField(1, "id", LongType()),
+        NestedField(2, "category", StringType()),
+        NestedField(3, "data", LongType()),
+    )
+    partition_spec = PartitionSpec(PartitionField(2, 1000, 
IdentityTransform(), "category"))
+
+    table = recreate_table(scan_catalog, identifier, schema=schema, 
partition_spec=partition_spec)
+
+    table.append(pa.Table.from_pydict({"id": [1, 2], "category": ["a", "a"], 
"data": [10, 20]}))
+    table.append(pa.Table.from_pydict({"id": [3, 4], "category": ["b", "b"], 
"data": [30, 40]}))
+
+    try:
+        _assert_remote_scan_matches_local_scan(table, session_catalog, 
identifier)
+
+        # test filter against partition
+        _assert_remote_scan_matches_local_scan(
+            table,
+            session_catalog,
+            identifier,
+            row_filter=EqualTo("category", "a"),
+        )
+    finally:
+        scan_catalog.drop_table(identifier)
+
+
[email protected]
+def test_rest_scan_primitive_types(scan_catalog: RestCatalog, session_catalog: 
Catalog) -> None:
+    identifier = "default.test_primitives"
+
+    schema = Schema(
+        NestedField(1, "bool_col", BooleanType()),
+        NestedField(2, "long_col", LongType()),
+        NestedField(3, "double_col", DoubleType()),
+        NestedField(4, "decimal_col", DecimalType(10, 2)),
+        NestedField(5, "string_col", StringType()),
+        NestedField(6, "date_col", DateType()),
+        NestedField(7, "time_col", TimeType()),
+        NestedField(8, "timestamp_col", TimestampType()),
+        NestedField(9, "timestamptz_col", TimestamptzType()),
+        NestedField(10, "uuid_col", UUIDType()),
+        NestedField(11, "fixed_col", FixedType(16)),
+        NestedField(12, "binary_col", BinaryType()),
+    )
+
+    table = recreate_table(scan_catalog, identifier, schema=schema)
+
+    now = datetime.now()
+    now_tz = datetime.now(tz=timezone.utc)
+    today = date.today()
+    uuid1, uuid2, uuid3 = uuid4(), uuid4(), uuid4()
+
+    arrow_table = pa.Table.from_pydict(
+        {
+            "bool_col": [True, False, True],
+            "long_col": [100, 200, 300],
+            "double_col": [1.11, 2.22, 3.33],
+            "decimal_col": [Decimal("1.23"), Decimal("4.56"), Decimal("7.89")],
+            "string_col": ["a", "b", "c"],
+            "date_col": [today, today - timedelta(days=1), today - 
timedelta(days=2)],
+            "time_col": [time(8, 30, 0), time(12, 0, 0), time(18, 45, 30)],
+            "timestamp_col": [now, now - timedelta(hours=1), now - 
timedelta(hours=2)],
+            "timestamptz_col": [now_tz, now_tz - timedelta(hours=1), now_tz - 
timedelta(hours=2)],
+            "uuid_col": [uuid1.bytes, uuid2.bytes, uuid3.bytes],
+            "fixed_col": [b"0123456789abcdef", b"abcdef0123456789", 
b"fedcba9876543210"],
+            "binary_col": [b"hello", b"world", b"test"],
+        },
+        schema=schema.as_arrow(),
+    )
+    table.append(arrow_table)
+
+    try:
+        _assert_remote_scan_matches_local_scan(table, session_catalog, 
identifier)
+    finally:
+        scan_catalog.drop_table(identifier)
+
+
[email protected]
+def test_rest_scan_with_filters(scan_catalog: RestCatalog, session_catalog: 
Catalog) -> None:
+    identifier = "default.test_complex_filters"
+
+    schema = Schema(
+        NestedField(1, "id", LongType()),
+        NestedField(2, "name", StringType()),
+        NestedField(3, "value", LongType()),
+        NestedField(4, "optional", StringType(), required=False),
+    )
+
+    table = recreate_table(scan_catalog, identifier, schema=schema)
+
+    table.append(
+        pa.Table.from_pydict(
+            {
+                "id": list(range(1, 21)),
+                "name": [f"item_{i}" for i in range(1, 21)],
+                "value": [i * 100 for i in range(1, 21)],
+                "optional": [None if i % 3 == 0 else f"opt_{i}" for i in 
range(1, 21)],
+            }
+        )
+    )
+
+    try:
+        filters = [
+            EqualTo("id", 10),
+            NotEqualTo("id", 10),
+            GreaterThan("value", 1000),
+            GreaterThanOrEqual("value", 1000),
+            LessThan("value", 500),
+            LessThanOrEqual("value", 500),
+            In("id", [1, 5, 10, 15]),
+            NotIn("id", [1, 5, 10, 15]),
+            IsNull("optional"),
+            NotNull("optional"),
+            StartsWith("name", "item_1"),
+            And(GreaterThan("id", 5), LessThan("id", 15)),
+            Or(EqualTo("id", 1), EqualTo("id", 20)),
+            Not(EqualTo("id", 10)),
+        ]
+
+        for filter_expr in filters:
+            _assert_remote_scan_matches_local_scan(table, session_catalog, 
identifier, row_filter=filter_expr)
+    finally:
+        scan_catalog.drop_table(identifier)
+
+
[email protected]
+def test_rest_scan_empty_table(scan_catalog: RestCatalog, session_catalog: 
Catalog) -> None:
+    identifier = "default.test_empty_table"
+
+    schema = Schema(
+        NestedField(1, "id", LongType()),
+        NestedField(2, "data", StringType()),
+    )
+
+    table = recreate_table(scan_catalog, identifier, schema=schema)
+
+    try:
+        rest_tasks = list(table.scan().plan_files())
+        local_table = session_catalog.load_table(identifier)
+        local_tasks = list(local_table.scan().plan_files())
+
+        assert len(rest_tasks) == 0
+        assert len(local_tasks) == 0
+    finally:
+        scan_catalog.drop_table(identifier)

Reply via email to