Fokko commented on code in PR #6233:
URL: https://github.com/apache/iceberg/pull/6233#discussion_r1028076839
##########
python/pyiceberg/table/__init__.py:
##########
@@ -199,16 +223,143 @@ def use_ref(self, name: str):
raise ValueError(f"Cannot scan unknown ref={name}")
- def select(self, *field_names: str) -> TableScan:
+ def select(self, *field_names: str) -> S:
if "*" in self.selected_fields:
return self.update(selected_fields=field_names)
return
self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names))))
- def filter_rows(self, new_row_filter: BooleanExpression) -> TableScan:
+ def filter_rows(self, new_row_filter: BooleanExpression) -> S:
return self.update(row_filter=And(self.row_filter, new_row_filter))
- def filter_partitions(self, new_partition_filter: BooleanExpression) ->
TableScan:
+ def filter_partitions(self, new_partition_filter: BooleanExpression) -> S:
return self.update(partition_filter=And(self.partition_filter,
new_partition_filter))
- def with_case_sensitive(self, case_sensitive: bool = True) -> TableScan:
+ def with_case_sensitive(self, case_sensitive: bool = True) -> S:
return self.update(case_sensitive=case_sensitive)
+
+
+class ScanTask(ABC):
+ pass
+
+
+@dataclass(init=False)
+class FileScanTask(ScanTask):
+ file: DataFile
+ start: int
+ length: int
+
+ def __init__(self, data_file: DataFile, start: Optional[int] = None,
length: Optional[int] = None):
+ self.file = data_file
+ self.start = start or 0
+ self.length = length or data_file.file_size_in_bytes
+
+
+class _DictAsStruct(StructProtocol):
+ pos_to_name: dict[int, str]
+ wrapped: dict[str, Any]
+
+ def __init__(self, partition_type: StructType):
+ self.pos_to_name = {}
+ for pos, field in enumerate(partition_type.fields):
+ self.pos_to_name[pos] = field.name
+
+ def wrap(self, to_wrap: dict[str, Any]) -> _DictAsStruct:
+ self.wrapped = to_wrap
+ return self
+
+ def get(self, pos: int) -> Any:
+ return self.wrapped[self.pos_to_name[pos]]
+
+ def set(self, pos: int, value: Any) -> None:
+ raise NotImplementedError("Cannot set values in DictAsStruct")
+
+
+class DataScan(TableScan["DataScan"]):
+ def __init__(
+ self,
+ table: Table,
+ row_filter: Optional[BooleanExpression] = None,
+ partition_filter: Optional[BooleanExpression] = None,
+ selected_fields: Tuple[str] = ("*",),
+ case_sensitive: bool = True,
+ snapshot_id: Optional[int] = None,
+ options: Properties = EMPTY_DICT,
+ ):
+ super().__init__(table, row_filter, partition_filter, selected_fields,
case_sensitive, snapshot_id, options)
+
+ def plan_files(self) -> Iterator[ScanTask]:
+ snapshot = self.snapshot()
+ if not snapshot:
+ return ()
+
+ io = self.table.io
+
+ # step 1: filter manifests using partition summaries
+ # the filter depends on the partition spec used to write the manifest
file, so create a cache of filters for each spec id
+
+ @cache
+ def manifest_filter(spec_id: int) -> Callable[[ManifestFile], bool]:
+ spec = self.table.specs()[spec_id]
+ return visitors.manifest_evaluator(spec, self.table.schema(),
self.partition_filter, self.case_sensitive)
+
+ def partition_summary_filter(manifest_file: ManifestFile) -> bool:
Review Comment:
Maybe we should also align on this as a community and stick with filter or
list comprehension, where I would prefer the latter :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]