This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new fc3f778860 Python: Raise exception on deletes (#6574)
fc3f778860 is described below
commit fc3f778860fae5c6cfa09046b97e198856ec70c5
Author: Fokko Driesprong <[email protected]>
AuthorDate: Sat Jan 14 21:08:22 2023 +0100
Python: Raise exception on deletes (#6574)
---
python/pyiceberg/table/__init__.py | 21 +++++++++++++++++++--
python/tests/table/test_init.py | 24 +++++++++++++++++++++++-
2 files changed, 42 insertions(+), 3 deletions(-)
diff --git a/python/pyiceberg/table/__init__.py
b/python/pyiceberg/table/__init__.py
index c4c23100c9..910ea7f0e0 100644
--- a/python/pyiceberg/table/__init__.py
+++ b/python/pyiceberg/table/__init__.py
@@ -43,7 +43,12 @@ from pyiceberg.expressions import (
from pyiceberg.expressions.visitors import inclusive_projection
from pyiceberg.io import FileIO
from pyiceberg.io.pyarrow import project_table
-from pyiceberg.manifest import DataFile, ManifestFile, files
+from pyiceberg.manifest import (
+ DataFile,
+ ManifestContent,
+ ManifestFile,
+ files,
+)
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.metadata import TableMetadata
@@ -257,6 +262,16 @@ class FileScanTask(ScanTask):
self.length = length or data_file.file_size_in_bytes
+def _check_content(file: DataFile) -> DataFile:
+ try:
+ if file.content == ManifestContent.DELETES:
+ raise ValueError("PyIceberg does not support deletes:
https://github.com/apache/iceberg/issues/6568")
+ return file
+ except AttributeError:
+ # If the attribute is not there, it is a V1 record
+ return file
+
+
class DataScan(TableScan["DataScan"]):
def __init__(
self,
@@ -318,7 +333,9 @@ class DataScan(TableScan["DataScan"]):
all_files = files(io.new_input(manifest.manifest_path))
matching_partition_files = filter(partition_filter, all_files)
- yield from (FileScanTask(file) for file in
matching_partition_files)
+ matching_partition_data_files = map(_check_content,
matching_partition_files)
+
+ yield from (FileScanTask(file) for file in
matching_partition_data_files)
def to_arrow(self) -> pa.Table:
return project_table(
diff --git a/python/tests/table/test_init.py b/python/tests/table/test_init.py
index 3c663ceeb4..506c6609f7 100644
--- a/python/tests/table/test_init.py
+++ b/python/tests/table/test_init.py
@@ -26,9 +26,10 @@ from pyiceberg.expressions import (
In,
)
from pyiceberg.io import load_file_io
+from pyiceberg.manifest import DataFile, ManifestContent
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
-from pyiceberg.table import Table
+from pyiceberg.table import Table, _check_content
from pyiceberg.table.metadata import TableMetadataV2
from pyiceberg.table.snapshots import (
Operation,
@@ -43,6 +44,7 @@ from pyiceberg.table.sorting import (
SortOrder,
)
from pyiceberg.transforms import BucketTransform, IdentityTransform
+from pyiceberg.typedef import Record
from pyiceberg.types import LongType, NestedField
@@ -243,3 +245,23 @@ def test_table_scan_projection_unknown_column(table:
Table) -> None:
_ = scan.select("a").projection()
assert "Could not find column: 'a'" in str(exc_info.value)
+
+
+def test_check_content_deletes() -> None:
+ with pytest.raises(ValueError) as exc_info:
+ _check_content(
+ DataFile(
+ content=ManifestContent.DELETES,
+ )
+ )
+ assert "PyIceberg does not support deletes:
https://github.com/apache/iceberg/issues/6568" in str(exc_info.value)
+
+
+def test_check_content_data() -> None:
+ manifest_file = DataFile(content=ManifestContent.DATA)
+ assert _check_content(manifest_file) == manifest_file
+
+
+def test_check_content_missing_attr() -> None:
+ r = Record(*([None] * 15))
+ assert _check_content(r) == r # type: ignore