Fokko commented on code in PR #433: URL: https://github.com/apache/iceberg-python/pull/433#discussion_r1524545642
########## pyiceberg/catalog/__init__.py: ########## @@ -710,6 +760,45 @@ def _get_updated_props_and_update_summary( return properties_update_summary, updated_properties + def _replace_table( + self, + identifier: Union[str, Identifier], + new_schema: Union[Schema, "pa.Schema"], + new_partition_spec: PartitionSpec, + new_sort_order: SortOrder, + new_properties: Properties, + new_location: Optional[str] = None, + ) -> Table: + table = self.load_table(identifier) + with table.transaction() as tx: + base_schema = table.schema() + new_schema = assign_fresh_schema_ids(schema_or_type=new_schema, base_schema=base_schema) + new_sort_order = assign_fresh_sort_order_ids( + sort_order=new_sort_order, + old_schema=base_schema, + fresh_schema=new_schema, + sort_order_id=table.sort_order().order_id + 1, + ) + new_partition_spec = assign_fresh_partition_spec_ids( + spec=new_partition_spec, old_schema=base_schema, fresh_schema=new_schema, spec_id=table.spec().spec_id + 1 + ) + + requirements: Tuple[TableRequirement, ...] = (AssertTableUUID(uuid=table.metadata.table_uuid),) + updates: Tuple[TableUpdate, ...] = ( + AddSchemaUpdate(schema=new_schema, last_column_id=new_schema.highest_field_id), + SetCurrentSchemaUpdate(schema_id=-1), + AddSortOrderUpdate(sort_order=new_sort_order), + SetDefaultSortOrderUpdate(sort_order_id=-1), + AddPartitionSpecUpdate(spec=new_partition_spec), + SetDefaultSpecUpdate(spec_id=-1), Review Comment: Same goes here, the spec is being re-used: ```sql CREATE TABLE default.t2 (name string, age int) PARTITIONED BY (name); ``` ```json { "format-version" : 2, "table-uuid" : "27f1ab29-7a9f-4324-bb64-c10c5bd2be53", "location" : "s3://warehouse/default/t2", "last-sequence-number" : 0, "last-updated-ms" : 1710409060360, "last-column-id" : 2, "current-schema-id" : 0, "schemas" : [ { "type" : "struct", "schema-id" : 0, "fields" : [ { "id" : 1, "name" : "name", "required" : false, "type" : "string" }, { "id" : 2, "name" : "age", "required" : false, "type" : "int" } ] } ], "default-spec-id" : 0, "partition-specs" : [ { "spec-id" : 0, "fields" : [ { "name" : "name", "transform" : "identity", "source-id" : 1, "field-id" : 1000 } ] } ], "last-partition-id" : 1000, "default-sort-order-id" : 0, "sort-orders" : [ { "order-id" : 0, "fields" : [ ] } ], "properties" : { "owner" : "root", "write.parquet.compression-codec" : "zstd" }, "current-snapshot-id" : -1, "refs" : { }, "snapshots" : [ ], "statistics" : [ ], "snapshot-log" : [ ], "metadata-log" : [ ] } ``` ```sql CREATE OR REPLACE TABLE default.t2 (name string, age int) PARTITIONED BY (name, age); ``` ```json { "format-version" : 2, "table-uuid" : "27f1ab29-7a9f-4324-bb64-c10c5bd2be53", "location" : "s3://warehouse/default/t2", "last-sequence-number" : 0, "last-updated-ms" : 1710409079414, "last-column-id" : 2, "current-schema-id" : 0, "schemas" : [ { "type" : "struct", "schema-id" : 0, "fields" : [ { "id" : 1, "name" : "name", "required" : false, "type" : "string" }, { "id" : 2, "name" : "age", "required" : false, "type" : "int" } ] } ], "default-spec-id" : 1, "partition-specs" : [ { "spec-id" : 0, "fields" : [ { "name" : "name", "transform" : "identity", "source-id" : 1, "field-id" : 1000 } ] }, { "spec-id" : 1, "fields" : [ { "name" : "name", "transform" : "identity", "source-id" : 1, "field-id" : 1000 }, { "name" : "age", "transform" : "identity", "source-id" : 2, "field-id" : 1001 } ] } ], "last-partition-id" : 1001, "default-sort-order-id" : 0, "sort-orders" : [ { "order-id" : 0, "fields" : [ ] } ], "properties" : { "owner" : "root", "write.parquet.compression-codec" : "zstd" }, "current-snapshot-id" : -1, "refs" : { }, "snapshots" : [ ], "statistics" : [ ], "snapshot-log" : [ ], "metadata-log" : [ { "timestamp-ms" : 1710409060360, "metadata-file" : "s3://warehouse/default/t2/metadata/00000-bc21fe27-d037-4b98-a8ca-cc1502eb19ec.metadata.json" } ] } ``` ```sql CREATE OR REPLACE TABLE default.t2 (name string, age int) PARTITIONED BY (name); ``` ```json { "format-version" : 2, "table-uuid" : "27f1ab29-7a9f-4324-bb64-c10c5bd2be53", "location" : "s3://warehouse/default/t2", "last-sequence-number" : 0, "last-updated-ms" : 1710409086268, "last-column-id" : 2, "current-schema-id" : 0, "schemas" : [ { "type" : "struct", "schema-id" : 0, "fields" : [ { "id" : 1, "name" : "name", "required" : false, "type" : "string" }, { "id" : 2, "name" : "age", "required" : false, "type" : "int" } ] } ], "default-spec-id" : 0, "partition-specs" : [ { "spec-id" : 0, "fields" : [ { "name" : "name", "transform" : "identity", "source-id" : 1, "field-id" : 1000 } ] }, { "spec-id" : 1, "fields" : [ { "name" : "name", "transform" : "identity", "source-id" : 1, "field-id" : 1000 }, { "name" : "age", "transform" : "identity", "source-id" : 2, "field-id" : 1001 } ] } ], "last-partition-id" : 1001, "default-sort-order-id" : 0, "sort-orders" : [ { "order-id" : 0, "fields" : [ ] } ], "properties" : { "owner" : "root", "write.parquet.compression-codec" : "zstd" }, "current-snapshot-id" : -1, "refs" : { }, "snapshots" : [ ], "statistics" : [ ], "snapshot-log" : [ ], "metadata-log" : [ { "timestamp-ms" : 1710409060360, "metadata-file" : "s3://warehouse/default/t2/metadata/00000-bc21fe27-d037-4b98-a8ca-cc1502eb19ec.metadata.json" }, { "timestamp-ms" : 1710409079414, "metadata-file" : "s3://warehouse/default/t2/metadata/00001-8062294f-a8d6-493d-905f-b82dfe01cb29.metadata.json" } ] } ``` Ideally, we also want to-reuse the `update_spec` class here. ########## pyiceberg/catalog/__init__.py: ########## @@ -710,6 +760,45 @@ def _get_updated_props_and_update_summary( return properties_update_summary, updated_properties + def _replace_table( + self, + identifier: Union[str, Identifier], + new_schema: Union[Schema, "pa.Schema"], + new_partition_spec: PartitionSpec, + new_sort_order: SortOrder, + new_properties: Properties, + new_location: Optional[str] = None, + ) -> Table: + table = self.load_table(identifier) + with table.transaction() as tx: + base_schema = table.schema() + new_schema = assign_fresh_schema_ids(schema_or_type=new_schema, base_schema=base_schema) Review Comment: Let's consider the following: ```sql CREATE TABLE default.t1 (name string); ``` Results in: ```json { "format-version" : 2, "table-uuid" : "c10da17c-c40c-4e02-9e81-15b6f0f35cb9", "location" : "s3://warehouse/default/t1", "last-sequence-number" : 0, "last-updated-ms" : 1710407936565, "last-column-id" : 1, "current-schema-id" : 0, "schemas" : [ { "type" : "struct", "schema-id" : 0, "fields" : [ { "id" : 1, "name" : "name", "required" : false, "type" : "string" } ] } ], "default-spec-id" : 0, "partition-specs" : [ { "spec-id" : 0, "fields" : [ ] } ], "last-partition-id" : 999, "default-sort-order-id" : 0, "sort-orders" : [ { "order-id" : 0, "fields" : [ ] } ], "properties" : { "owner" : "root", "write.parquet.compression-codec" : "zstd" }, "current-snapshot-id" : -1, "refs" : { }, "snapshots" : [ ], "statistics" : [ ], "snapshot-log" : [ ], "metadata-log" : [ ] } ``` ```sql CREATE OR REPLACE TABLE default.t1 (name string, age int); ``` The second schema is added: ```json { "format-version" : 2, "table-uuid" : "c10da17c-c40c-4e02-9e81-15b6f0f35cb9", "location" : "s3://warehouse/default/t1", "last-sequence-number" : 0, "last-updated-ms" : 1710407992389, "last-column-id" : 2, "current-schema-id" : 1, "schemas" : [ { "type" : "struct", "schema-id" : 0, "fields" : [ { "id" : 1, "name" : "name", "required" : false, "type" : "string" } ] }, { "type" : "struct", "schema-id" : 1, "fields" : [ { "id" : 1, "name" : "name", "required" : false, "type" : "string" }, { "id" : 2, "name" : "age", "required" : false, "type" : "int" } ] } ], "default-spec-id" : 0, "partition-specs" : [ { "spec-id" : 0, "fields" : [ ] } ], "last-partition-id" : 999, "default-sort-order-id" : 0, "sort-orders" : [ { "order-id" : 0, "fields" : [ ] } ], "properties" : { "owner" : "root", "write.parquet.compression-codec" : "zstd" }, "current-snapshot-id" : -1, "refs" : { }, "snapshots" : [ ], "statistics" : [ ], "snapshot-log" : [ ], "metadata-log" : [ { "timestamp-ms" : 1710407936565, "metadata-file" : "s3://warehouse/default/t1/metadata/00000-83e6fd53-d3c9-41f9-aeea-b978fb882c7f.metadata.json" } ] } ``` And then go back to the original schema: ```sql CREATE OR REPLACE TABLE default.t1 (name string); ``` You'll see that no new schema is being added: ```json { "format-version" : 2, "table-uuid" : "c10da17c-c40c-4e02-9e81-15b6f0f35cb9", "location" : "s3://warehouse/default/t1", "last-sequence-number" : 0, "last-updated-ms" : 1710408026710, "last-column-id" : 2, "current-schema-id" : 0, "schemas" : [ { "type" : "struct", "schema-id" : 0, "fields" : [ { "id" : 1, "name" : "name", "required" : false, "type" : "string" } ] }, { "type" : "struct", "schema-id" : 1, "fields" : [ { "id" : 1, "name" : "name", "required" : false, "type" : "string" }, { "id" : 2, "name" : "age", "required" : false, "type" : "int" } ] } ], "default-spec-id" : 0, "partition-specs" : [ { "spec-id" : 0, "fields" : [ ] } ], "last-partition-id" : 999, "default-sort-order-id" : 0, "sort-orders" : [ { "order-id" : 0, "fields" : [ ] } ], "properties" : { "owner" : "root", "write.parquet.compression-codec" : "zstd" }, "current-snapshot-id" : -1, "refs" : { }, "snapshots" : [ ], "statistics" : [ ], "snapshot-log" : [ ], "metadata-log" : [ { "timestamp-ms" : 1710407936565, "metadata-file" : "s3://warehouse/default/t1/metadata/00000-83e6fd53-d3c9-41f9-aeea-b978fb882c7f.metadata.json" }, { "timestamp-ms" : 1710407992389, "metadata-file" : "s3://warehouse/default/t1/metadata/00001-00192fa8-9019-481a-b4da-ebd99d11eeb9.metadata.json" } ] } ``` What do you think of re-using the `update_schema()` class: ```python with table.transaction() as transaction: with transaction.update_schema(allow_incompatible_changes=True) as update_schema: # Remove old fields removed_column_names = base_schema._name_to_id().keys() - schema._name_to_id().keys() for removed_column_name in removed_column_names: update_schema.delete_column(removed_column_name) # Add new and evolve existing fields update_schema.union_by_name(schema) ``` ^ Pseudocode, could be cleaner. Ideally, the removal should be done with a `visit_with_partner` (that's the opposite of the `union_by_name`. ########## pyiceberg/catalog/__init__.py: ########## @@ -710,6 +760,45 @@ def _get_updated_props_and_update_summary( return properties_update_summary, updated_properties + def _replace_table( + self, + identifier: Union[str, Identifier], + new_schema: Union[Schema, "pa.Schema"], + new_partition_spec: PartitionSpec, + new_sort_order: SortOrder, + new_properties: Properties, + new_location: Optional[str] = None, + ) -> Table: + table = self.load_table(identifier) + with table.transaction() as tx: + base_schema = table.schema() + new_schema = assign_fresh_schema_ids(schema_or_type=new_schema, base_schema=base_schema) + new_sort_order = assign_fresh_sort_order_ids( + sort_order=new_sort_order, + old_schema=base_schema, + fresh_schema=new_schema, + sort_order_id=table.sort_order().order_id + 1, + ) + new_partition_spec = assign_fresh_partition_spec_ids( + spec=new_partition_spec, old_schema=base_schema, fresh_schema=new_schema, spec_id=table.spec().spec_id + 1 + ) + + requirements: Tuple[TableRequirement, ...] = (AssertTableUUID(uuid=table.metadata.table_uuid),) + updates: Tuple[TableUpdate, ...] = ( Review Comment: We need to clear the snapshots here as well: ```sql CREATE TABLE default.t3 AS SELECT 'Fokko' as name ``` ```json { "format-version" : 2, "table-uuid" : "c61404c8-2211-46c7-866f-2eb87022b728", "location" : "s3://warehouse/default/t3", "last-sequence-number" : 1, "last-updated-ms" : 1710409653861, "last-column-id" : 1, "current-schema-id" : 0, "schemas" : [ { "type" : "struct", "schema-id" : 0, "fields" : [ { "id" : 1, "name" : "name", "required" : false, "type" : "string" } ] } ], "default-spec-id" : 0, "partition-specs" : [ { "spec-id" : 0, "fields" : [ ] } ], "last-partition-id" : 999, "default-sort-order-id" : 0, "sort-orders" : [ { "order-id" : 0, "fields" : [ ] } ], "properties" : { "owner" : "root", "created-at" : "2024-03-14T09:47:10.455199504Z", "write.parquet.compression-codec" : "zstd" }, "current-snapshot-id" : -1, "refs" : { }, "snapshots" : [ { "sequence-number" : 1, "snapshot-id" : 3622792816294171432, "timestamp-ms" : 1710409631964, "summary" : { "operation" : "append", "spark.app.id" : "local-1710405058122", "added-data-files" : "1", "added-records" : "1", "added-files-size" : "416", "changed-partition-count" : "1", "total-records" : "1", "total-files-size" : "416", "total-data-files" : "1", "total-delete-files" : "0", "total-position-deletes" : "0", "total-equality-deletes" : "0" }, "manifest-list" : "s3://warehouse/default/t3/metadata/snap-3622792816294171432-1-e457c732-62e5-41eb-998b-abbb8f021ed5.avro", "schema-id" : 0 } ], "statistics" : [ ], "snapshot-log" : [ ], "metadata-log" : [ { "timestamp-ms" : 1710409631964, "metadata-file" : "s3://warehouse/default/t3/metadata/00000-c82191f3-e6e2-4001-8e85-8623e3915ff7.metadata.json" } ] } ``` ```sql CREATE OR REPLACE TABLE default.t3 (name string); ``` ```json { "format-version" : 2, "table-uuid" : "c61404c8-2211-46c7-866f-2eb87022b728", "location" : "s3://warehouse/default/t3", "last-sequence-number" : 1, "last-updated-ms" : 1710411760623, "last-column-id" : 1, "current-schema-id" : 0, "schemas" : [ { "type" : "struct", "schema-id" : 0, "fields" : [ { "id" : 1, "name" : "name", "required" : false, "type" : "string" } ] } ], "default-spec-id" : 0, "partition-specs" : [ { "spec-id" : 0, "fields" : [ ] } ], "last-partition-id" : 999, "default-sort-order-id" : 0, "sort-orders" : [ { "order-id" : 0, "fields" : [ ] } ], "properties" : { "owner" : "root", "created-at" : "2024-03-14T09:47:10.455199504Z", "write.parquet.compression-codec" : "zstd" }, "current-snapshot-id" : -1, "refs" : { }, "snapshots" : [ { "sequence-number" : 1, "snapshot-id" : 3622792816294171432, "timestamp-ms" : 1710409631964, "summary" : { "operation" : "append", "spark.app.id" : "local-1710405058122", "added-data-files" : "1", "added-records" : "1", "added-files-size" : "416", "changed-partition-count" : "1", "total-records" : "1", "total-files-size" : "416", "total-data-files" : "1", "total-delete-files" : "0", "total-position-deletes" : "0", "total-equality-deletes" : "0" }, "manifest-list" : "s3://warehouse/default/t3/metadata/snap-3622792816294171432-1-e457c732-62e5-41eb-998b-abbb8f021ed5.avro", "schema-id" : 0 } ], "statistics" : [ ], "snapshot-log" : [ ], "metadata-log" : [ { "timestamp-ms" : 1710409631964, "metadata-file" : "s3://warehouse/default/t3/metadata/00000-c82191f3-e6e2-4001-8e85-8623e3915ff7.metadata.json" }, { "timestamp-ms" : 1710409653861, "metadata-file" : "s3://warehouse/default/t3/metadata/00001-0297d4e7-2468-4c0d-b4ed-ea717df8c3e6.metadata.json" } ] } ``` ########## pyiceberg/catalog/__init__.py: ########## @@ -710,6 +760,45 @@ def _get_updated_props_and_update_summary( return properties_update_summary, updated_properties + def _replace_table( + self, + identifier: Union[str, Identifier], + new_schema: Union[Schema, "pa.Schema"], + new_partition_spec: PartitionSpec, + new_sort_order: SortOrder, + new_properties: Properties, + new_location: Optional[str] = None, + ) -> Table: + table = self.load_table(identifier) + with table.transaction() as tx: + base_schema = table.schema() + new_schema = assign_fresh_schema_ids(schema_or_type=new_schema, base_schema=base_schema) + new_sort_order = assign_fresh_sort_order_ids( + sort_order=new_sort_order, + old_schema=base_schema, + fresh_schema=new_schema, + sort_order_id=table.sort_order().order_id + 1, + ) + new_partition_spec = assign_fresh_partition_spec_ids( + spec=new_partition_spec, old_schema=base_schema, fresh_schema=new_schema, spec_id=table.spec().spec_id + 1 + ) + + requirements: Tuple[TableRequirement, ...] = (AssertTableUUID(uuid=table.metadata.table_uuid),) + updates: Tuple[TableUpdate, ...] = ( + AddSchemaUpdate(schema=new_schema, last_column_id=new_schema.highest_field_id), + SetCurrentSchemaUpdate(schema_id=-1), + AddSortOrderUpdate(sort_order=new_sort_order), + SetDefaultSortOrderUpdate(sort_order_id=-1), + AddPartitionSpecUpdate(spec=new_partition_spec), + SetDefaultSpecUpdate(spec_id=-1), + ) + tx._apply(updates, requirements) Review Comment: With https://github.com/apache/iceberg-python/pull/471 in this is not necessary anymore! 🥳 ```suggestion tx._apply(updates, requirements) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org