amogh-jahagirdar commented on code in PR #245:
URL: https://github.com/apache/iceberg-python/pull/245#discussion_r1475571056


##########
tests/test_integration_partition_evolution.py:
##########
@@ -0,0 +1,397 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+
+import pytest
+
+from pyiceberg.catalog import Catalog, load_catalog
+from pyiceberg.exceptions import NoSuchTableError
+from pyiceberg.partitioning import PartitionField, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.table import Table
+from pyiceberg.transforms import (
+    BucketTransform,
+    DayTransform,
+    HourTransform,
+    IdentityTransform,
+    MonthTransform,
+    TruncateTransform,
+    VoidTransform,
+    YearTransform,
+)
+from pyiceberg.types import (
+    LongType,
+    NestedField,
+    StringType,
+    TimestampType,
+)
+
+
[email protected]()
+def catalog() -> Catalog:
+    return load_catalog(
+        "local",
+        **{
+            "type": "rest",
+            "uri": "http://localhost:8181";,
+            "s3.endpoint": "http://localhost:9000";,
+            "s3.access-key-id": "admin",
+            "s3.secret-access-key": "password",
+        },
+    )
+
+
[email protected]()
+def simple_table(catalog: Catalog, table_schema_simple: Schema) -> Table:
+    return _create_table_with_schema(catalog, table_schema_simple, "1")
+
+
[email protected]()
+def table(catalog: Catalog) -> Table:
+    schema_with_timestamp = Schema(
+        NestedField(1, "id", LongType(), required=False),
+        NestedField(2, "event_ts", TimestampType(), required=False),
+        NestedField(3, "str", StringType(), required=False),
+    )
+    return _create_table_with_schema(catalog, schema_with_timestamp, "1")
+
+
[email protected]()
+def table_v2(catalog: Catalog) -> Table:
+    schema_with_timestamp = Schema(
+        NestedField(1, "id", LongType(), required=False),
+        NestedField(2, "event_ts", TimestampType(), required=False),
+        NestedField(3, "str", StringType(), required=False),
+    )
+    return _create_table_with_schema(catalog, schema_with_timestamp, "2")
+
+
+def _create_table_with_schema(catalog: Catalog, schema: Schema, 
format_version: str) -> Table:
+    tbl_name = "default.test_schema_evolution"
+    try:
+        catalog.drop_table(tbl_name)
+    except NoSuchTableError:
+        pass
+    return catalog.create_table(identifier=tbl_name, schema=schema, 
properties={"format-version": format_version})
+
+
[email protected]
+def test_add_identity_partition(simple_table: Table) -> None:
+    simple_table.update_spec().add_identity("foo").commit()
+    specs = simple_table.specs()
+    assert len(specs) == 2
+    spec = simple_table.spec()
+    assert spec.spec_id == 1
+    assert spec.last_assigned_field_id == 1000
+
+
[email protected]
+def test_add_year(table: Table) -> None:
+    table.update_spec().add_field("event_ts", YearTransform(), 
"year_transform").commit()
+    _validate_new_partition_fields(table, 1000, 1, PartitionField(2, 1000, 
YearTransform(), "year_transform"))
+
+
[email protected]
+def test_add_month(table: Table) -> None:
+    table.update_spec().add_field("event_ts", MonthTransform(), 
"month_transform").commit()
+    _validate_new_partition_fields(table, 1000, 1, PartitionField(2, 1000, 
MonthTransform(), "month_transform"))
+
+
[email protected]
+def test_add_day(table: Table) -> None:
+    table.update_spec().add_field("event_ts", DayTransform(), 
"day_transform").commit()
+    _validate_new_partition_fields(table, 1000, 1, PartitionField(2, 1000, 
DayTransform(), "day_transform"))
+
+
[email protected]
+def test_add_hour(table: Table) -> None:
+    table.update_spec().add_field("event_ts", HourTransform(), 
"hour_transform").commit()
+    _validate_new_partition_fields(table, 1000, 1, PartitionField(2, 1000, 
HourTransform(), "hour_transform"))
+
+
[email protected]
+def test_add_bucket(simple_table: Table) -> None:
+    simple_table.update_spec().add_field("foo", BucketTransform(12), 
"bucket_transform").commit()
+    _validate_new_partition_fields(simple_table, 1000, 1, PartitionField(1, 
1000, BucketTransform(12), "bucket_transform"))
+
+
[email protected]
+def test_add_truncate(simple_table: Table) -> None:
+    simple_table.update_spec().add_field("foo", TruncateTransform(1), 
"truncate_transform").commit()
+    _validate_new_partition_fields(simple_table, 1000, 1, PartitionField(1, 
1000, TruncateTransform(1), "truncate_transform"))
+
+
[email protected]
+def test_multiple_adds(table: Table) -> None:
+    table.update_spec().add_identity("id").add_field("event_ts", 
HourTransform(), "hourly_partitioned").add_field(
+        "str", TruncateTransform(2), "truncate_str"
+    ).commit()
+    _validate_new_partition_fields(
+        table,
+        1002,
+        1,
+        PartitionField(1, 1000, IdentityTransform(), "id"),
+        PartitionField(2, 1001, HourTransform(), "hourly_partitioned"),
+        PartitionField(3, 1002, TruncateTransform(2), "truncate_str"),
+    )
+
+
[email protected]
+def test_add_hour_to_day(table: Table) -> None:
+    table.update_spec().add_field("event_ts", DayTransform(), 
"daily_partitioned").commit()
+    table.update_spec().add_field("event_ts", HourTransform(), 
"hourly_partitioned").commit()
+    _validate_new_partition_fields(
+        table,
+        1001,
+        2,
+        PartitionField(2, 1000, DayTransform(), "daily_partitioned"),
+        PartitionField(2, 1001, HourTransform(), "hourly_partitioned"),
+    )
+
+
[email protected]
+def test_add_multiple_buckets(table: Table) -> None:
+    table.update_spec().add_field("id", BucketTransform(16)).add_field("id", 
BucketTransform(4)).commit()
+    _validate_new_partition_fields(
+        table,
+        1001,
+        1,
+        PartitionField(1, 1000, BucketTransform(16), "id_bucket_16"),
+        PartitionField(1, 1001, BucketTransform(4), "id_bucket_4"),
+    )
+
+
[email protected]
+def test_remove_identity(table: Table) -> None:
+    table.update_spec().add_identity("id").commit()
+    table.update_spec().remove_field("id").commit()
+    assert len(table.specs()) == 3
+    assert table.spec().spec_id == 2
+    assert table.spec() == PartitionSpec(
+        PartitionField(source_id=1, field_id=1000, transform=VoidTransform(), 
name='id'), spec_id=2
+    )
+
+
[email protected]
+def test_remove_identity_v2(table_v2: Table) -> None:
+    table_v2.update_spec().add_identity("id").commit()
+    table_v2.update_spec().remove_field("id").commit()
+    assert len(table_v2.specs()) == 2
+    assert table_v2.spec().spec_id == 0
+    assert table_v2.spec() == PartitionSpec(spec_id=0)
+
+
[email protected]
+def test_remove_bucket(table: Table) -> None:
+    with table.update_spec() as update:
+        update.add_field("id", BucketTransform(16), "bucketed_id")
+        update.add_field("event_ts", DayTransform(), "day_ts")
+    with table.update_spec() as remove:
+        remove.remove_field("bucketed_id")
+
+    assert len(table.specs()) == 3
+    _validate_new_partition_fields(
+        table,
+        1001,
+        2,
+        PartitionField(source_id=1, field_id=1000, transform=VoidTransform(), 
name='bucketed_id'),
+        PartitionField(source_id=2, field_id=1001, transform=DayTransform(), 
name='day_ts'),
+    )
+
+
[email protected]
+def test_remove_bucket_v2(table_v2: Table) -> None:
+    with table_v2.update_spec() as update:
+        update.add_field("id", BucketTransform(16), "bucketed_id")
+        update.add_field("event_ts", DayTransform(), "day_ts")
+    with table_v2.update_spec() as remove:
+        remove.remove_field("bucketed_id")
+    assert len(table_v2.specs()) == 3
+    _validate_new_partition_fields(
+        table_v2, 1001, 2, PartitionField(source_id=2, field_id=1001, 
transform=DayTransform(), name='day_ts')
+    )
+
+
[email protected]
+def test_remove_day(table: Table) -> None:
+    with table.update_spec() as update:
+        update.add_field("id", BucketTransform(16), "bucketed_id")
+        update.add_field("event_ts", DayTransform(), "day_ts")
+    with table.update_spec() as remove:
+        remove.remove_field("day_ts")
+
+    assert len(table.specs()) == 3
+    _validate_new_partition_fields(
+        table,
+        1001,
+        2,
+        PartitionField(source_id=1, field_id=1000, 
transform=BucketTransform(16), name='bucketed_id'),
+        PartitionField(source_id=2, field_id=1001, transform=VoidTransform(), 
name='day_ts'),
+    )
+
+
[email protected]
+def test_remove_day_v2(table_v2: Table) -> None:
+    with table_v2.update_spec() as update:
+        update.add_field("id", BucketTransform(16), "bucketed_id")
+        update.add_field("event_ts", DayTransform(), "day_ts")
+    with table_v2.update_spec() as remove:
+        remove.remove_field("day_ts")
+    assert len(table_v2.specs()) == 3
+    _validate_new_partition_fields(
+        table_v2, 1000, 2, PartitionField(source_id=1, field_id=1000, 
transform=BucketTransform(16), name='bucketed_id')
+    )
+
+
[email protected]
+def test_rename(table: Table) -> None:
+    table.update_spec().add_identity("id").commit()
+    table.update_spec().rename_field("id", "sharded_id").commit()
+    assert len(table.specs()) == 3
+    assert table.spec().spec_id == 2
+    _validate_new_partition_fields(table, 1000, 2, PartitionField(1, 1000, 
IdentityTransform(), "sharded_id"))
+
+
[email protected]
+def test_cannot_add_and_remove(table: Table) -> None:
+    with pytest.raises(ValueError) as exc_info:
+        table.update_spec().add_identity("id").remove_field("id").commit()
+    assert "Cannot delete newly added field id" in str(exc_info.value)
+
+
[email protected]
+def test_cannot_add_redundant_time_partition(table: Table) -> None:
+    with pytest.raises(ValueError) as exc_info:
+        table.update_spec().add_field("event_ts", YearTransform(), 
"year_transform").add_field(
+            "event_ts", HourTransform(), "hour_transform"
+        ).commit()
+    assert "Cannot add time partition field: hour_transform conflicts with 
year_transform" in str(exc_info.value)
+
+
[email protected]
+def test_cannot_delete_and_rename(table: Table) -> None:
+    with pytest.raises(ValueError) as exc_info:
+        table.update_spec().add_identity("id").commit()
+        table.update_spec().remove_field("id").rename_field("id", 
"sharded_id").commit()
+    assert "Cannot delete and rename partition field id" in str(exc_info.value)
+
+
[email protected]
+def test_cannot_rename_and_delete(table: Table) -> None:
+    with pytest.raises(ValueError) as exc_info:
+        table.update_spec().add_identity("id").commit()
+        table.update_spec().rename_field("id", 
"sharded_id").remove_field("id").commit()
+    assert "Cannot rename and delete field id" in str(exc_info.value)
+
+
[email protected]
+def test_cannot_add_same_tranform_for_same_field(table: Table) -> None:
+    with pytest.raises(ValueError) as exc_info:
+        table.update_spec().add_field("str", TruncateTransform(4), 
"truncated_str").add_field(
+            "str", TruncateTransform(4)
+        ).commit()
+    assert "Already added partition" in str(exc_info.value)
+
+
[email protected]
+def test_cannot_add_same_field_multiple_times(table: Table) -> None:
+    with pytest.raises(ValueError) as exc_info:
+        table.update_spec().add_field("id", IdentityTransform(), 
"duplicate").add_field(
+            "id", IdentityTransform(), "duplicate"
+        ).commit()
+    assert "Already added partition" in str(exc_info.value)
+
+
[email protected]
+def test_cannot_add_multiple_specs_same_name(table: Table) -> None:
+    with pytest.raises(ValueError) as exc_info:
+        table.update_spec().add_field("id", IdentityTransform(), 
"duplicate").add_field(
+            "event_ts", IdentityTransform(), "duplicate"
+        ).commit()
+    assert "Already added partition" in str(exc_info.value)
+
+
[email protected]
+def test_change_specs_and_schema_transaction(table: Table) -> None:
+    with table.transaction() as transaction:
+        with transaction.update_spec() as update_spec:
+            update_spec.add_identity("id").add_field("event_ts", 
HourTransform(), "hourly_partitioned").add_field(
+                "str", TruncateTransform(2), "truncate_str"
+            )
+
+        with transaction.update_schema() as update_schema:
+            update_schema.add_column("col_string", StringType())
+
+    _validate_new_partition_fields(
+        table,
+        1002,
+        1,
+        PartitionField(1, 1000, IdentityTransform(), "id"),
+        PartitionField(2, 1001, HourTransform(), "hourly_partitioned"),
+        PartitionField(3, 1002, TruncateTransform(2), "truncate_str"),
+    )
+
+    assert table.schema() == Schema(
+        NestedField(field_id=1, name='id', field_type=LongType(), 
required=False),
+        NestedField(field_id=2, name='event_ts', field_type=TimestampType(), 
required=False),
+        NestedField(field_id=3, name='str', field_type=StringType(), 
required=False),
+        NestedField(field_id=4, name='col_string', field_type=StringType(), 
required=False),
+        schema_id=1,
+        identifier_field_ids=[],
+    )
+
+
[email protected]
+def test_multiple_adds_and_remove_v1(table: Table) -> None:
+    with table.update_spec() as update:
+        update.add_field("id", BucketTransform(16), "bucketed_id")
+        update.add_field("event_ts", DayTransform(), "day_ts")
+    with table.update_spec() as update:
+        update.remove_field("day_ts").remove_field("bucketed_id")
+    with table.update_spec() as update:
+        update.add_field("str", TruncateTransform(2), "truncated_str")
+    _validate_new_partition_fields(
+        table,
+        1002,
+        3,
+        PartitionField(1, 1000, VoidTransform(), "bucketed_id"),
+        PartitionField(2, 1001, VoidTransform(), "day_ts"),
+        PartitionField(3, 1002, TruncateTransform(2), "truncated_str"),
+    )
+
+
[email protected]
+def test_multiple_adds_and_remove_v2(table_v2: Table) -> None:
+    with table_v2.update_spec() as update:
+        update.add_field("id", BucketTransform(16), "bucketed_id")
+        update.add_field("event_ts", DayTransform(), "day_ts")
+    with table_v2.update_spec() as update:
+        update.remove_field("day_ts").remove_field("bucketed_id")
+    with table_v2.update_spec() as update:
+        update.add_field("str", TruncateTransform(2), "truncated_str")
+    _validate_new_partition_fields(table_v2, 1002, 2, PartitionField(3, 1002, 
TruncateTransform(2), "truncated_str"))

Review Comment:
   This test shows why assigning new field IDs based on the last field ID 
across all specs is important to avoid collisions. In this case if we just used 
the last spec, after the remove_field is done for the original partitions, the 
latest spec would just be the unpartitioned spec. Then when we go and add the 
new truncated_str partitioned field, we would create a field ID of 1000 which 
is  not what we want



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to