rdblue commented on code in PR #6233:
URL: https://github.com/apache/iceberg/pull/6233#discussion_r1027370136


##########
python/pyiceberg/table/__init__.py:
##########
@@ -199,16 +223,114 @@ def use_ref(self, name: str):
 
         raise ValueError(f"Cannot scan unknown ref={name}")
 
-    def select(self, *field_names: str) -> TableScan:
+    def select(self, *field_names: str) -> S:
         if "*" in self.selected_fields:
             return self.update(selected_fields=field_names)
         return 
self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names))))
 
-    def filter_rows(self, new_row_filter: BooleanExpression) -> TableScan:
+    def filter_rows(self, new_row_filter: BooleanExpression) -> S:
         return self.update(row_filter=And(self.row_filter, new_row_filter))
 
-    def filter_partitions(self, new_partition_filter: BooleanExpression) -> 
TableScan:
+    def filter_partitions(self, new_partition_filter: BooleanExpression) -> S:
         return self.update(partition_filter=And(self.partition_filter, 
new_partition_filter))
 
-    def with_case_sensitive(self, case_sensitive: bool = True) -> TableScan:
+    def with_case_sensitive(self, case_sensitive: bool = True) -> S:
         return self.update(case_sensitive=case_sensitive)
+
+
+class ScanTask(ABC):
+    pass
+
+
+@dataclass(init=False)
+class FileScanTask(ScanTask):
+    data_file: DataFile
+    start: int
+    length: int
+
+    def __init__(self, data_file: DataFile, start: Optional[int] = None, 
length: Optional[int] = None):
+        self.data_file = data_file
+        self.start = start or 0
+        self.length = length or data_file.file_size_in_bytes
+
+
+class _DictAsStruct(StructProtocol):
+    pos_to_name: dict[int, str]
+    wrapped: dict[str, Any]
+
+    def __init__(self, partition_type: StructType):
+        self.pos_to_name = {}
+        for pos, field in enumerate(partition_type.fields):
+            self.pos_to_name[pos] = field.name
+
+    def wrap(self, to_wrap: dict[str, Any]) -> _DictAsStruct:
+        self.wrapped = to_wrap
+        return self
+
+    def get(self, pos: int) -> Any:
+        return self.wrapped[self.pos_to_name[pos]]
+
+    def set(self, pos: int, value: Any) -> None:
+        raise NotImplementedError("Cannot set values in DictAsStruct")
+
+
+class DataScan(TableScan["DataScan"]):
+    def __init__(
+        self,
+        table: Table,
+        row_filter: Optional[BooleanExpression] = None,
+        partition_filter: Optional[BooleanExpression] = None,
+        selected_fields: Tuple[str] = ("*",),
+        case_sensitive: bool = True,
+        snapshot_id: Optional[int] = None,
+        options: Properties = EMPTY_DICT,
+    ):
+        super().__init__(table, row_filter, partition_filter, selected_fields, 
case_sensitive, snapshot_id, options)
+
+    def plan_files(self) -> Iterator[ScanTask]:
+        snapshot = self.snapshot()
+        if not snapshot:
+            return ()
+
+        io = self.table.io
+
+        # step 1: filter manifests using partition summaries
+        # the filter depends on the partition spec used to write the manifest 
file, so create a cache of filters for each spec id
+
+        @cache
+        def manifest_filter(spec_id: int) -> Callable[[ManifestFile], bool]:
+            spec = self.table.specs()[spec_id]
+            return visitors.manifest_evaluator(spec, self.table.schema(), 
self.partition_filter, self.case_sensitive)
+
+        def partition_summary_filter(manifest_file: ManifestFile) -> bool:
+            return 
manifest_filter(manifest_file.partition_spec_id)(manifest_file)
+
+        manifests = list(filter(partition_summary_filter, 
snapshot.manifests(io)))
+
+        # step 2: filter the data files in each manifest
+        # this filter depends on the partition spec used to write the manifest 
file
+
+        @cache
+        def partition_evaluator(spec_id: int) -> Callable[[DataFile], bool]:
+            spec = self.table.specs()[spec_id]
+            partition_type = spec.partition_type(self.table.schema())
+            partition_schema = Schema(*partition_type.fields)
+
+            # TODO: project the row filter  # pylint: disable=W0511
+            partition_expr = And(self.partition_filter, AlwaysTrue())
+
+            # TODO: remove the dict to struct wrapper by using a 
StructProtocol record  # pylint: disable=W0511
+            wrapper = _DictAsStruct(partition_type)
+            evaluator = visitors.expression_evaluator(partition_schema, 
partition_expr, self.case_sensitive)
+
+            return lambda data_file: 
evaluator(wrapper.wrap(data_file.partition))
+
+        for manifest in manifests:
+            partition_filter = partition_evaluator(manifest.partition_spec_id)
+            all_files = files(io.new_input(manifest.manifest_path))

Review Comment:
   This uses `files`, which doesn't do any column pruning. Pruning columns 
would help performance in the short term, but stats are going to be used in the 
long term so it's not a great way to speed up scan planning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to