sungwy commented on code in PR #931:
URL: https://github.com/apache/iceberg-python/pull/931#discussion_r1761797345
##########
pyiceberg/table/__init__.py:
##########
@@ -456,6 +461,85 @@ def append(self, df: pa.Table, snapshot_properties:
Dict[str, str] = EMPTY_DICT)
for data_file in data_files:
append_files.append_data_file(data_file)
+ def _build_partition_predicate(self, partition_records: List[Record]) ->
BooleanExpression:
+ partition_spec = self.table_metadata.spec()
+ schema = self.table_metadata.schema()
+ partition_fields = [schema.find_field(field.source_id).name for field
in partition_spec.fields]
+
+ expr: BooleanExpression = AlwaysFalse()
+ for partition_record in partition_records:
+ match_partition_expression: BooleanExpression = AlwaysTrue()
+
+ for pos in range(len(partition_fields)):
+ predicate = (
+ EqualTo(Reference(partition_fields[pos]),
partition_record[pos])
+ if partition_record[pos] is not None
+ else IsNull(Reference(partition_fields[pos]))
+ )
+ match_partition_expression = And(match_partition_expression,
predicate)
+ expr = Or(expr, match_partition_expression)
+ return expr
+
+ def dynamic_overwrite(self, df: pa.Table, snapshot_properties: Dict[str,
str] = EMPTY_DICT) -> None:
Review Comment:
nit: should we rename this to:
```suggestion
def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties:
Dict[str, str] = EMPTY_DICT) -> None:
```
##########
tests/integration/test_writes/test_partitioned_writes.py:
##########
@@ -221,6 +276,98 @@ def test_query_filter_v1_v2_append_null(
assert df.where(f"{col} is not null").count() == 4, f"Expected 4
non-null rows for {col}"
assert df.where(f"{col} is null").count() == 2, f"Expected 2 null rows
for {col}"
[email protected]
[email protected](
+ "spec",
+ [
+ (PartitionSpec(PartitionField(source_id=4, field_id=1001,
transform=BucketTransform(2), name="int_bucket"))),
+ (PartitionSpec(PartitionField(source_id=5, field_id=1001,
transform=BucketTransform(2), name="long_bucket"))),
+ (PartitionSpec(PartitionField(source_id=10, field_id=1001,
transform=BucketTransform(2), name="date_bucket"))),
+ (PartitionSpec(PartitionField(source_id=8, field_id=1001,
transform=BucketTransform(2), name="timestamp_bucket"))),
+ (PartitionSpec(PartitionField(source_id=9, field_id=1001,
transform=BucketTransform(2), name="timestamptz_bucket"))),
+ (PartitionSpec(PartitionField(source_id=2, field_id=1001,
transform=BucketTransform(2), name="string_bucket"))),
+ (PartitionSpec(PartitionField(source_id=12, field_id=1001,
transform=BucketTransform(2), name="fixed_bucket"))),
+ (PartitionSpec(PartitionField(source_id=11, field_id=1001,
transform=BucketTransform(2), name="binary_bucket"))),
+ (PartitionSpec(PartitionField(source_id=4, field_id=1001,
transform=TruncateTransform(2), name="int_trunc"))),
+ (PartitionSpec(PartitionField(source_id=5, field_id=1001,
transform=TruncateTransform(2), name="long_trunc"))),
+ (PartitionSpec(PartitionField(source_id=2, field_id=1001,
transform=TruncateTransform(2), name="string_trunc"))),
+ (PartitionSpec(PartitionField(source_id=11, field_id=1001,
transform=TruncateTransform(2), name="binary_trunc"))),
+ (PartitionSpec(PartitionField(source_id=8, field_id=1001,
transform=YearTransform(), name="timestamp_year"))),
+ (PartitionSpec(PartitionField(source_id=9, field_id=1001,
transform=YearTransform(), name="timestamptz_year"))),
+ (PartitionSpec(PartitionField(source_id=10, field_id=1001,
transform=YearTransform(), name="date_year"))),
+ (PartitionSpec(PartitionField(source_id=8, field_id=1001,
transform=MonthTransform(), name="timestamp_month"))),
+ (PartitionSpec(PartitionField(source_id=9, field_id=1001,
transform=MonthTransform(), name="timestamptz_month"))),
+ (PartitionSpec(PartitionField(source_id=10, field_id=1001,
transform=MonthTransform(), name="date_month"))),
+ (PartitionSpec(PartitionField(source_id=8, field_id=1001,
transform=DayTransform(), name="timestamp_day"))),
+ (PartitionSpec(PartitionField(source_id=9, field_id=1001,
transform=DayTransform(), name="timestamptz_day"))),
+ (PartitionSpec(PartitionField(source_id=10, field_id=1001,
transform=DayTransform(), name="date_day"))),
+ (PartitionSpec(PartitionField(source_id=8, field_id=1001,
transform=HourTransform(), name="timestamp_hour"))),
+ (PartitionSpec(PartitionField(source_id=9, field_id=1001,
transform=HourTransform(), name="timestamptz_hour"))),
+ (PartitionSpec(PartitionField(source_id=10, field_id=1001,
transform=HourTransform(), name="date_hour"))),
+ ],
+)
+def test_dynamic_overwrite_non_identity_transform(
+ session_catalog: Catalog, arrow_table_with_null: pa.Table, spec:
PartitionSpec
+) -> None:
+ identifier = "default.dynamic_overwrite_non_identity_transform"
+ try:
+ session_catalog.drop_table(identifier=identifier)
+ except NoSuchTableError:
+ pass
+
+ tbl = session_catalog.create_table(
+ identifier=identifier,
+ schema=TABLE_SCHEMA,
+ properties={"format-version": "2"},
+ partition_spec=spec,
+ )
+ with pytest.raises(
+ ValueError,
+ match="For now dynamic overwrite does not support a table with
non-identity-transform field in the latest partition spec: *",
+ ):
+ tbl.dynamic_overwrite(arrow_table_with_null.slice(0, 1))
+
+
[email protected]
[email protected](
+ "part_col",
+ [
+ "int",
+ "bool",
+ "string",
+ "string_long",
+ "long",
+ "float",
+ "double",
+ "date",
+ "timestamp",
+ "binary",
+ "timestamptz",
+ ],
+)
[email protected](
+ "format_version",
+ [1, 2],
+)
+def test_dynamic_overwrite_unpartitioned_evolve_to_identity_transform(
+ spark: SparkSession, session_catalog: Catalog, arrow_table_with_null:
pa.Table, part_col: str, format_version: int
+) -> None:
+ identifier =
f"default.unpartitioned_table_v{format_version}_evolve_into_identity_transformed_partition_field_{part_col}"
+ tbl = session_catalog.create_table(
+ identifier=identifier,
+ schema=TABLE_SCHEMA,
+ properties={"format-version": "2"},
+ )
+ tbl.append(arrow_table_with_null)
+ tbl.update_spec().add_field(part_col, IdentityTransform(),
f"{part_col}_identity").commit()
+ tbl.append(arrow_table_with_null)
+ # each column should be [a, null, b, a, null, b]
+ # dynamic overwrite a non-null row a, resulting in [null, b, null, b, a]
+ tbl.dynamic_overwrite(arrow_table_with_null.slice(0, 1))
+ df = spark.table(identifier)
+ assert df.where(f"{part_col} is not null").count() == 3, f"Expected 3
non-null rows for {part_col},"
+ assert df.where(f"{part_col} is null").count() == 2, f"Expected 2 null
rows for {part_col},"
+
Review Comment:
nit: I think it would be interesting to inspect the snapshot summary here,
because we would expect `overwrite` type snapshot summary due to the fact that
the unpartitioned data is being overwritten.
https://github.com/jqin61/iceberg-python/blob/1854c50a88c49cff2a209aa1a1365198a4cf1456/pyiceberg/table/inspect.py#L58-L68
##########
pyiceberg/table/__init__.py:
##########
@@ -456,6 +461,85 @@ def append(self, df: pa.Table, snapshot_properties:
Dict[str, str] = EMPTY_DICT)
for data_file in data_files:
append_files.append_data_file(data_file)
+ def _build_partition_predicate(self, partition_records: List[Record]) ->
BooleanExpression:
+ partition_spec = self.table_metadata.spec()
+ schema = self.table_metadata.schema()
+ partition_fields = [schema.find_field(field.source_id).name for field
in partition_spec.fields]
+
+ expr: BooleanExpression = AlwaysFalse()
+ for partition_record in partition_records:
+ match_partition_expression: BooleanExpression = AlwaysTrue()
+
+ for pos in range(len(partition_fields)):
+ predicate = (
+ EqualTo(Reference(partition_fields[pos]),
partition_record[pos])
+ if partition_record[pos] is not None
+ else IsNull(Reference(partition_fields[pos]))
+ )
+ match_partition_expression = And(match_partition_expression,
predicate)
+ expr = Or(expr, match_partition_expression)
+ return expr
+
+ def dynamic_overwrite(self, df: pa.Table, snapshot_properties: Dict[str,
str] = EMPTY_DICT) -> None:
+ """
+ Shorthand for adding a table dynamic overwrite with a PyArrow table to
the transaction.
+
+ Args:
+ df: The Arrow dataframe that will be used to overwrite the table
+ snapshot_properties: Custom properties to be added to the snapshot
summary
+ """
+ try:
+ import pyarrow as pa
+ except ModuleNotFoundError as e:
+ raise ModuleNotFoundError("For writes PyArrow needs to be
installed") from e
+
+ from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible,
_dataframe_to_data_files
+
+ if not isinstance(df, pa.Table):
+ raise ValueError(f"Expected PyArrow table, got: {df}")
+
+ if self.table_metadata.spec().is_unpartitioned():
+ raise ValueError("Cannot apply dynamic overwrite on an
unpartitioned table.")
+
+ for field in self.table_metadata.spec().fields:
+ if not isinstance(field.transform, IdentityTransform):
+ raise ValueError(
+ f"For now dynamic overwrite does not support a table with
non-identity-transform field in the latest partition spec: {field}"
+ )
+
+ downcast_ns_timestamp_to_us =
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
+ _check_pyarrow_schema_compatible(
+ self.table_metadata.schema(), provided_schema=df.schema,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
+ )
+
+ # If dataframe does not have data, there is no need to overwrite
+ if df.shape[0] == 0:
+ return
+
+ append_snapshot_commit_uuid = uuid.uuid4()
+ data_files: List[DataFile] = list(
+ _dataframe_to_data_files(
+ table_metadata=self._table.metadata,
write_uuid=append_snapshot_commit_uuid, df=df, io=self._table.io
+ )
+ )
+
+ overlapping_partitions = [data_file.partition for data_file in
data_files]
Review Comment:
nit: Should we turn this into a set of partitions instead, and run
`_build_partition_predicate` on a unique set of partition records?
also thought for rename: `overwritten_partitions`?
##########
pyiceberg/table/__init__.py:
##########
@@ -456,6 +461,85 @@ def append(self, df: pa.Table, snapshot_properties:
Dict[str, str] = EMPTY_DICT)
for data_file in data_files:
append_files.append_data_file(data_file)
+ def _build_partition_predicate(self, partition_records: List[Record]) ->
BooleanExpression:
+ partition_spec = self.table_metadata.spec()
+ schema = self.table_metadata.schema()
+ partition_fields = [schema.find_field(field.source_id).name for field
in partition_spec.fields]
+
+ expr: BooleanExpression = AlwaysFalse()
+ for partition_record in partition_records:
+ match_partition_expression: BooleanExpression = AlwaysTrue()
+
+ for pos in range(len(partition_fields)):
Review Comment:
nit:
```suggestion
for pos, partition_field in enumerate(partition_fields)):
```
##########
pyiceberg/table/__init__.py:
##########
@@ -456,6 +461,85 @@ def append(self, df: pa.Table, snapshot_properties:
Dict[str, str] = EMPTY_DICT)
for data_file in data_files:
append_files.append_data_file(data_file)
+ def _build_partition_predicate(self, partition_records: List[Record]) ->
BooleanExpression:
+ partition_spec = self.table_metadata.spec()
+ schema = self.table_metadata.schema()
+ partition_fields = [schema.find_field(field.source_id).name for field
in partition_spec.fields]
+
+ expr: BooleanExpression = AlwaysFalse()
+ for partition_record in partition_records:
+ match_partition_expression: BooleanExpression = AlwaysTrue()
+
+ for pos in range(len(partition_fields)):
+ predicate = (
+ EqualTo(Reference(partition_fields[pos]),
partition_record[pos])
+ if partition_record[pos] is not None
+ else IsNull(Reference(partition_fields[pos]))
+ )
+ match_partition_expression = And(match_partition_expression,
predicate)
+ expr = Or(expr, match_partition_expression)
+ return expr
+
+ def dynamic_overwrite(self, df: pa.Table, snapshot_properties: Dict[str,
str] = EMPTY_DICT) -> None:
+ """
+ Shorthand for adding a table dynamic overwrite with a PyArrow table to
the transaction.
Review Comment:
How do you feel about adding a more descriptive docstring that describes its
behavior? Here's my suggestion
```suggestion
Shorthand for overwriting existing partitions with a PyArrow table.
The function
detects partition values in the provided arrow table that using the
current table
partition spec, and deletes existing partitions matching these
values. Finally, the
data in the table is appended to the table.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]