jqin61 commented on code in PR #931:
URL: https://github.com/apache/iceberg-python/pull/931#discussion_r1764223333
##########
tests/integration/test_writes/test_partitioned_writes.py:
##########
@@ -221,6 +276,98 @@ def test_query_filter_v1_v2_append_null(
assert df.where(f"{col} is not null").count() == 4, f"Expected 4
non-null rows for {col}"
assert df.where(f"{col} is null").count() == 2, f"Expected 2 null rows
for {col}"
[email protected]
[email protected](
+ "spec",
+ [
+ (PartitionSpec(PartitionField(source_id=4, field_id=1001,
transform=BucketTransform(2), name="int_bucket"))),
+ (PartitionSpec(PartitionField(source_id=5, field_id=1001,
transform=BucketTransform(2), name="long_bucket"))),
+ (PartitionSpec(PartitionField(source_id=10, field_id=1001,
transform=BucketTransform(2), name="date_bucket"))),
+ (PartitionSpec(PartitionField(source_id=8, field_id=1001,
transform=BucketTransform(2), name="timestamp_bucket"))),
+ (PartitionSpec(PartitionField(source_id=9, field_id=1001,
transform=BucketTransform(2), name="timestamptz_bucket"))),
+ (PartitionSpec(PartitionField(source_id=2, field_id=1001,
transform=BucketTransform(2), name="string_bucket"))),
+ (PartitionSpec(PartitionField(source_id=12, field_id=1001,
transform=BucketTransform(2), name="fixed_bucket"))),
+ (PartitionSpec(PartitionField(source_id=11, field_id=1001,
transform=BucketTransform(2), name="binary_bucket"))),
+ (PartitionSpec(PartitionField(source_id=4, field_id=1001,
transform=TruncateTransform(2), name="int_trunc"))),
+ (PartitionSpec(PartitionField(source_id=5, field_id=1001,
transform=TruncateTransform(2), name="long_trunc"))),
+ (PartitionSpec(PartitionField(source_id=2, field_id=1001,
transform=TruncateTransform(2), name="string_trunc"))),
+ (PartitionSpec(PartitionField(source_id=11, field_id=1001,
transform=TruncateTransform(2), name="binary_trunc"))),
+ (PartitionSpec(PartitionField(source_id=8, field_id=1001,
transform=YearTransform(), name="timestamp_year"))),
+ (PartitionSpec(PartitionField(source_id=9, field_id=1001,
transform=YearTransform(), name="timestamptz_year"))),
+ (PartitionSpec(PartitionField(source_id=10, field_id=1001,
transform=YearTransform(), name="date_year"))),
+ (PartitionSpec(PartitionField(source_id=8, field_id=1001,
transform=MonthTransform(), name="timestamp_month"))),
+ (PartitionSpec(PartitionField(source_id=9, field_id=1001,
transform=MonthTransform(), name="timestamptz_month"))),
+ (PartitionSpec(PartitionField(source_id=10, field_id=1001,
transform=MonthTransform(), name="date_month"))),
+ (PartitionSpec(PartitionField(source_id=8, field_id=1001,
transform=DayTransform(), name="timestamp_day"))),
+ (PartitionSpec(PartitionField(source_id=9, field_id=1001,
transform=DayTransform(), name="timestamptz_day"))),
+ (PartitionSpec(PartitionField(source_id=10, field_id=1001,
transform=DayTransform(), name="date_day"))),
+ (PartitionSpec(PartitionField(source_id=8, field_id=1001,
transform=HourTransform(), name="timestamp_hour"))),
+ (PartitionSpec(PartitionField(source_id=9, field_id=1001,
transform=HourTransform(), name="timestamptz_hour"))),
+ (PartitionSpec(PartitionField(source_id=10, field_id=1001,
transform=HourTransform(), name="date_hour"))),
+ ],
+)
+def test_dynamic_overwrite_non_identity_transform(
+ session_catalog: Catalog, arrow_table_with_null: pa.Table, spec:
PartitionSpec
+) -> None:
+ identifier = "default.dynamic_overwrite_non_identity_transform"
+ try:
+ session_catalog.drop_table(identifier=identifier)
+ except NoSuchTableError:
+ pass
+
+ tbl = session_catalog.create_table(
+ identifier=identifier,
+ schema=TABLE_SCHEMA,
+ properties={"format-version": "2"},
+ partition_spec=spec,
+ )
+ with pytest.raises(
+ ValueError,
+ match="For now dynamic overwrite does not support a table with
non-identity-transform field in the latest partition spec: *",
+ ):
+ tbl.dynamic_overwrite(arrow_table_with_null.slice(0, 1))
+
+
[email protected]
[email protected](
+ "part_col",
+ [
+ "int",
+ "bool",
+ "string",
+ "string_long",
+ "long",
+ "float",
+ "double",
+ "date",
+ "timestamp",
+ "binary",
+ "timestamptz",
+ ],
+)
[email protected](
+ "format_version",
+ [1, 2],
+)
+def test_dynamic_overwrite_unpartitioned_evolve_to_identity_transform(
+ spark: SparkSession, session_catalog: Catalog, arrow_table_with_null:
pa.Table, part_col: str, format_version: int
+) -> None:
+ identifier =
f"default.unpartitioned_table_v{format_version}_evolve_into_identity_transformed_partition_field_{part_col}"
+ tbl = session_catalog.create_table(
+ identifier=identifier,
+ schema=TABLE_SCHEMA,
+ properties={"format-version": "2"},
+ )
+ tbl.append(arrow_table_with_null)
+ tbl.update_spec().add_field(part_col, IdentityTransform(),
f"{part_col}_identity").commit()
+ tbl.append(arrow_table_with_null)
+ # each column should be [a, null, b, a, null, b]
+ # dynamic overwrite a non-null row a, resulting in [null, b, null, b, a]
+ tbl.dynamic_overwrite(arrow_table_with_null.slice(0, 1))
+ df = spark.table(identifier)
+ assert df.where(f"{part_col} is not null").count() == 3, f"Expected 3
non-null rows for {part_col},"
+ assert df.where(f"{part_col} is null").count() == 2, f"Expected 2 null
rows for {part_col},"
+
Review Comment:
I added the check.
One interesting thing to notice is that a data file partitioned on a long
string field cannot match strict metrics evaluator and will end up with an
overwrite rather than delete (although the overwritten new file is the same).
The reason is: for a long string, the lower bound and upper bound is
truncated e.g. `"aaaaaaaaaaaaaaaaaaaaaa"` has lower bound of
`"aaaaaaaaaaaaaaaa"` and upper bound of `"aaaaaaaaaaaaaaab"` . This makes
strict metric evaluator determine the file evaluate as ROWS_MIGHT_NOT_MATCH
which further causes the partitioned data file to be overwriten rather than
deleted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]