Fokko commented on code in PR #6131:
URL: https://github.com/apache/iceberg/pull/6131#discussion_r1018441052
##########
python/pyiceberg/table/__init__.py:
##########
@@ -90,3 +103,90 @@ def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
def history(self) -> List[SnapshotLogEntry]:
"""Get the snapshot history of this table."""
return self.metadata.snapshot_log
+
+
+class TableScan:
+ _always_true: ClassVar[BooleanExpression] = AlwaysTrue()
+ table: Table
+ row_filter: BooleanExpression
+ partition_filter: BooleanExpression
+ selected_fields: tuple[str]
+ case_sensitive: bool
+ snapshot_id: Optional[int]
+ options: Properties
+
+ def __init__(
+ self,
+ *,
+ table: Table,
+ row_filter: BooleanExpression = _always_true,
+ partition_filter: BooleanExpression = _always_true,
+ selected_fields: tuple[str] = ("*",),
+ case_sensitive: bool = True,
+ snapshot_id: Optional[int] = None,
+ options: Properties = EMPTY_DICT,
+ ):
+ self.table = table
+ self.row_filter = row_filter
+ self.partition_filter = partition_filter
+ self.selected_fields = selected_fields
+ self.case_sensitive = case_sensitive
+ self.snapshot_id = snapshot_id
+ self.options = options
+
+ def update(self, **overrides):
+ """Creates a copy of this table scan with updated fields."""
+ return TableScan(**{**self.__dict__, **overrides})
+
+ def snapshot(self):
+ if self.snapshot_id:
+ return self.table.snapshot_by_id(self.snapshot_id)
+
+ return self.table.current_snapshot()
+
+ def projection(self):
+ snapshot_schema = self.table.schemas().get(self.snapshot().schema_id)
or self.table.schema()
+
+ if "*" in self.selected_fields:
+ return snapshot_schema
+
+ return snapshot_schema.select(*self.selected_fields,
case_sensitive=self.case_sensitive)
+
+ def use_snapshot(self, snapshot_id: int):
+ if self.snapshot_id:
+ raise ValueError(f"Cannot override snapshot, already set snapshot
id={self.snapshot_id}")
+ if self.table.snapshot_by_id(snapshot_id):
+ return self.update(snapshot_id=snapshot_id)
+
+ raise ValueError(f"Cannot scan unknown snapshot id={snapshot_id}")
+
+ def use_ref(self, name: str):
+ if self.snapshot_id:
+ raise ValueError(f"Cannot override ref, already set snapshot
id={self.snapshot_id}")
+ if snapshot := self.table.snapshot_by_name(name):
+ return self.update(snapshot_id=snapshot.snapshot_id)
+
+ raise ValueError(f"Cannot scan unknown ref={name}")
+
+ def select(self, *field_names: str) -> "TableScan":
+ if "*" in self.selected_fields:
+ return self.update(selected_fields=field_names)
+ return
self.update(selected_fields=tuple(set(self.selected_fields).intersection(field_names)))
Review Comment:
Small bug:
```suggestion
return
self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names))))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]