Fokko commented on code in PR #433:
URL: https://github.com/apache/iceberg-python/pull/433#discussion_r1524545642


##########
pyiceberg/catalog/__init__.py:
##########
@@ -710,6 +760,45 @@ def _get_updated_props_and_update_summary(
 
         return properties_update_summary, updated_properties
 
+    def _replace_table(
+        self,
+        identifier: Union[str, Identifier],
+        new_schema: Union[Schema, "pa.Schema"],
+        new_partition_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_properties: Properties,
+        new_location: Optional[str] = None,
+    ) -> Table:
+        table = self.load_table(identifier)
+        with table.transaction() as tx:
+            base_schema = table.schema()
+            new_schema = assign_fresh_schema_ids(schema_or_type=new_schema, 
base_schema=base_schema)
+            new_sort_order = assign_fresh_sort_order_ids(
+                sort_order=new_sort_order,
+                old_schema=base_schema,
+                fresh_schema=new_schema,
+                sort_order_id=table.sort_order().order_id + 1,
+            )
+            new_partition_spec = assign_fresh_partition_spec_ids(
+                spec=new_partition_spec, old_schema=base_schema, 
fresh_schema=new_schema, spec_id=table.spec().spec_id + 1
+            )
+
+            requirements: Tuple[TableRequirement, ...] = 
(AssertTableUUID(uuid=table.metadata.table_uuid),)
+            updates: Tuple[TableUpdate, ...] = (
+                AddSchemaUpdate(schema=new_schema, 
last_column_id=new_schema.highest_field_id),
+                SetCurrentSchemaUpdate(schema_id=-1),
+                AddSortOrderUpdate(sort_order=new_sort_order),
+                SetDefaultSortOrderUpdate(sort_order_id=-1),
+                AddPartitionSpecUpdate(spec=new_partition_spec),
+                SetDefaultSpecUpdate(spec_id=-1),

Review Comment:
   Same goes here, the spec is being re-used:
   
   ```sql
   CREATE TABLE default.t2 (name string, age int) PARTITIONED BY (name);
   ```
   
   ```json
   {
     "format-version" : 2,
     "table-uuid" : "27f1ab29-7a9f-4324-bb64-c10c5bd2be53",
     "location" : "s3://warehouse/default/t2",
     "last-sequence-number" : 0,
     "last-updated-ms" : 1710409060360,
     "last-column-id" : 2,
     "current-schema-id" : 0,
     "schemas" : [ {
       "type" : "struct",
       "schema-id" : 0,
       "fields" : [ {
         "id" : 1,
         "name" : "name",
         "required" : false,
         "type" : "string"
       }, {
         "id" : 2,
         "name" : "age",
         "required" : false,
         "type" : "int"
       } ]
     } ],
     "default-spec-id" : 0,
     "partition-specs" : [ {
       "spec-id" : 0,
       "fields" : [ {
         "name" : "name",
         "transform" : "identity",
         "source-id" : 1,
         "field-id" : 1000
       } ]
     } ],
     "last-partition-id" : 1000,
     "default-sort-order-id" : 0,
     "sort-orders" : [ {
       "order-id" : 0,
       "fields" : [ ]
     } ],
     "properties" : {
       "owner" : "root",
       "write.parquet.compression-codec" : "zstd"
     },
     "current-snapshot-id" : -1,
     "refs" : { },
     "snapshots" : [ ],
     "statistics" : [ ],
     "snapshot-log" : [ ],
     "metadata-log" : [ ]
   }
   ```
   
   ```sql
   CREATE OR REPLACE TABLE default.t2 (name string, age int) PARTITIONED BY 
(name, age);
   ```
   
   
   ```json
   {
     "format-version" : 2,
     "table-uuid" : "27f1ab29-7a9f-4324-bb64-c10c5bd2be53",
     "location" : "s3://warehouse/default/t2",
     "last-sequence-number" : 0,
     "last-updated-ms" : 1710409079414,
     "last-column-id" : 2,
     "current-schema-id" : 0,
     "schemas" : [ {
       "type" : "struct",
       "schema-id" : 0,
       "fields" : [ {
         "id" : 1,
         "name" : "name",
         "required" : false,
         "type" : "string"
       }, {
         "id" : 2,
         "name" : "age",
         "required" : false,
         "type" : "int"
       } ]
     } ],
     "default-spec-id" : 1,
     "partition-specs" : [ {
       "spec-id" : 0,
       "fields" : [ {
         "name" : "name",
         "transform" : "identity",
         "source-id" : 1,
         "field-id" : 1000
       } ]
     }, {
       "spec-id" : 1,
       "fields" : [ {
         "name" : "name",
         "transform" : "identity",
         "source-id" : 1,
         "field-id" : 1000
       }, {
         "name" : "age",
         "transform" : "identity",
         "source-id" : 2,
         "field-id" : 1001
       } ]
     } ],
     "last-partition-id" : 1001,
     "default-sort-order-id" : 0,
     "sort-orders" : [ {
       "order-id" : 0,
       "fields" : [ ]
     } ],
     "properties" : {
       "owner" : "root",
       "write.parquet.compression-codec" : "zstd"
     },
     "current-snapshot-id" : -1,
     "refs" : { },
     "snapshots" : [ ],
     "statistics" : [ ],
     "snapshot-log" : [ ],
     "metadata-log" : [ {
       "timestamp-ms" : 1710409060360,
       "metadata-file" : 
"s3://warehouse/default/t2/metadata/00000-bc21fe27-d037-4b98-a8ca-cc1502eb19ec.metadata.json"
     } ]
   }
   ```
   
   ```sql
   CREATE OR REPLACE TABLE default.t2 (name string, age int) PARTITIONED BY 
(name);
   ```
   
   ```json
   {
     "format-version" : 2,
     "table-uuid" : "27f1ab29-7a9f-4324-bb64-c10c5bd2be53",
     "location" : "s3://warehouse/default/t2",
     "last-sequence-number" : 0,
     "last-updated-ms" : 1710409086268,
     "last-column-id" : 2,
     "current-schema-id" : 0,
     "schemas" : [ {
       "type" : "struct",
       "schema-id" : 0,
       "fields" : [ {
         "id" : 1,
         "name" : "name",
         "required" : false,
         "type" : "string"
       }, {
         "id" : 2,
         "name" : "age",
         "required" : false,
         "type" : "int"
       } ]
     } ],
     "default-spec-id" : 0,
     "partition-specs" : [ {
       "spec-id" : 0,
       "fields" : [ {
         "name" : "name",
         "transform" : "identity",
         "source-id" : 1,
         "field-id" : 1000
       } ]
     }, {
       "spec-id" : 1,
       "fields" : [ {
         "name" : "name",
         "transform" : "identity",
         "source-id" : 1,
         "field-id" : 1000
       }, {
         "name" : "age",
         "transform" : "identity",
         "source-id" : 2,
         "field-id" : 1001
       } ]
     } ],
     "last-partition-id" : 1001,
     "default-sort-order-id" : 0,
     "sort-orders" : [ {
       "order-id" : 0,
       "fields" : [ ]
     } ],
     "properties" : {
       "owner" : "root",
       "write.parquet.compression-codec" : "zstd"
     },
     "current-snapshot-id" : -1,
     "refs" : { },
     "snapshots" : [ ],
     "statistics" : [ ],
     "snapshot-log" : [ ],
     "metadata-log" : [ {
       "timestamp-ms" : 1710409060360,
       "metadata-file" : 
"s3://warehouse/default/t2/metadata/00000-bc21fe27-d037-4b98-a8ca-cc1502eb19ec.metadata.json"
     }, {
       "timestamp-ms" : 1710409079414,
       "metadata-file" : 
"s3://warehouse/default/t2/metadata/00001-8062294f-a8d6-493d-905f-b82dfe01cb29.metadata.json"
     } ]
   }
   ```
   
   Ideally, we also want to-reuse the `update_spec` class here.



##########
pyiceberg/catalog/__init__.py:
##########
@@ -710,6 +760,45 @@ def _get_updated_props_and_update_summary(
 
         return properties_update_summary, updated_properties
 
+    def _replace_table(
+        self,
+        identifier: Union[str, Identifier],
+        new_schema: Union[Schema, "pa.Schema"],
+        new_partition_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_properties: Properties,
+        new_location: Optional[str] = None,
+    ) -> Table:
+        table = self.load_table(identifier)
+        with table.transaction() as tx:
+            base_schema = table.schema()
+            new_schema = assign_fresh_schema_ids(schema_or_type=new_schema, 
base_schema=base_schema)

Review Comment:
   Let's consider the following:
   
   ```sql
   CREATE TABLE default.t1 (name string);
   ```
   
   Results in:
   ```json
   {
     "format-version" : 2,
     "table-uuid" : "c10da17c-c40c-4e02-9e81-15b6f0f35cb9",
     "location" : "s3://warehouse/default/t1",
     "last-sequence-number" : 0,
     "last-updated-ms" : 1710407936565,
     "last-column-id" : 1,
     "current-schema-id" : 0,
     "schemas" : [ {
       "type" : "struct",
       "schema-id" : 0,
       "fields" : [ {
         "id" : 1,
         "name" : "name",
         "required" : false,
         "type" : "string"
       } ]
     } ],
     "default-spec-id" : 0,
     "partition-specs" : [ {
       "spec-id" : 0,
       "fields" : [ ]
     } ],
     "last-partition-id" : 999,
     "default-sort-order-id" : 0,
     "sort-orders" : [ {
       "order-id" : 0,
       "fields" : [ ]
     } ],
     "properties" : {
       "owner" : "root",
       "write.parquet.compression-codec" : "zstd"
     },
     "current-snapshot-id" : -1,
     "refs" : { },
     "snapshots" : [ ],
     "statistics" : [ ],
     "snapshot-log" : [ ],
     "metadata-log" : [ ]
   }
   ```
   
   ```sql
   CREATE OR REPLACE TABLE default.t1 (name string, age int);
   ```
   
   The second schema is added:
   
   ```json
   {
     "format-version" : 2,
     "table-uuid" : "c10da17c-c40c-4e02-9e81-15b6f0f35cb9",
     "location" : "s3://warehouse/default/t1",
     "last-sequence-number" : 0,
     "last-updated-ms" : 1710407992389,
     "last-column-id" : 2,
     "current-schema-id" : 1,
     "schemas" : [ {
       "type" : "struct",
       "schema-id" : 0,
       "fields" : [ {
         "id" : 1,
         "name" : "name",
         "required" : false,
         "type" : "string"
       } ]
     }, {
       "type" : "struct",
       "schema-id" : 1,
       "fields" : [ {
         "id" : 1,
         "name" : "name",
         "required" : false,
         "type" : "string"
       }, {
         "id" : 2,
         "name" : "age",
         "required" : false,
         "type" : "int"
       } ]
     } ],
     "default-spec-id" : 0,
     "partition-specs" : [ {
       "spec-id" : 0,
       "fields" : [ ]
     } ],
     "last-partition-id" : 999,
     "default-sort-order-id" : 0,
     "sort-orders" : [ {
       "order-id" : 0,
       "fields" : [ ]
     } ],
     "properties" : {
       "owner" : "root",
       "write.parquet.compression-codec" : "zstd"
     },
     "current-snapshot-id" : -1,
     "refs" : { },
     "snapshots" : [ ],
     "statistics" : [ ],
     "snapshot-log" : [ ],
     "metadata-log" : [ {
       "timestamp-ms" : 1710407936565,
       "metadata-file" : 
"s3://warehouse/default/t1/metadata/00000-83e6fd53-d3c9-41f9-aeea-b978fb882c7f.metadata.json"
     } ]
   }
   ```
   
   And then go back to the original schema:
   
   ```sql
   CREATE OR REPLACE TABLE default.t1 (name string);
   ```
   
   You'll see that no new schema is being added:
   
   ```json
   {
     "format-version" : 2,
     "table-uuid" : "c10da17c-c40c-4e02-9e81-15b6f0f35cb9",
     "location" : "s3://warehouse/default/t1",
     "last-sequence-number" : 0,
     "last-updated-ms" : 1710408026710,
     "last-column-id" : 2,
     "current-schema-id" : 0,
     "schemas" : [ {
       "type" : "struct",
       "schema-id" : 0,
       "fields" : [ {
         "id" : 1,
         "name" : "name",
         "required" : false,
         "type" : "string"
       } ]
     }, {
       "type" : "struct",
       "schema-id" : 1,
       "fields" : [ {
         "id" : 1,
         "name" : "name",
         "required" : false,
         "type" : "string"
       }, {
         "id" : 2,
         "name" : "age",
         "required" : false,
         "type" : "int"
       } ]
     } ],
     "default-spec-id" : 0,
     "partition-specs" : [ {
       "spec-id" : 0,
       "fields" : [ ]
     } ],
     "last-partition-id" : 999,
     "default-sort-order-id" : 0,
     "sort-orders" : [ {
       "order-id" : 0,
       "fields" : [ ]
     } ],
     "properties" : {
       "owner" : "root",
       "write.parquet.compression-codec" : "zstd"
     },
     "current-snapshot-id" : -1,
     "refs" : { },
     "snapshots" : [ ],
     "statistics" : [ ],
     "snapshot-log" : [ ],
     "metadata-log" : [ {
       "timestamp-ms" : 1710407936565,
       "metadata-file" : 
"s3://warehouse/default/t1/metadata/00000-83e6fd53-d3c9-41f9-aeea-b978fb882c7f.metadata.json"
     }, {
       "timestamp-ms" : 1710407992389,
       "metadata-file" : 
"s3://warehouse/default/t1/metadata/00001-00192fa8-9019-481a-b4da-ebd99d11eeb9.metadata.json"
     } ]
   }
   ```
   
   What do you think of re-using the `update_schema()` class:
   
   ```python
   with table.transaction() as transaction:
       with transaction.update_schema(allow_incompatible_changes=True) as 
update_schema:
           # Remove old fields
           removed_column_names = base_schema._name_to_id().keys() - 
schema._name_to_id().keys()
           for removed_column_name in removed_column_names:
               update_schema.delete_column(removed_column_name)
           # Add new and evolve existing fields
           update_schema.union_by_name(schema)
   ```
   ^ Pseudocode, could be cleaner. Ideally, the removal should be done with a 
`visit_with_partner` (that's the opposite of the `union_by_name`.



##########
pyiceberg/catalog/__init__.py:
##########
@@ -710,6 +760,45 @@ def _get_updated_props_and_update_summary(
 
         return properties_update_summary, updated_properties
 
+    def _replace_table(
+        self,
+        identifier: Union[str, Identifier],
+        new_schema: Union[Schema, "pa.Schema"],
+        new_partition_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_properties: Properties,
+        new_location: Optional[str] = None,
+    ) -> Table:
+        table = self.load_table(identifier)
+        with table.transaction() as tx:
+            base_schema = table.schema()
+            new_schema = assign_fresh_schema_ids(schema_or_type=new_schema, 
base_schema=base_schema)
+            new_sort_order = assign_fresh_sort_order_ids(
+                sort_order=new_sort_order,
+                old_schema=base_schema,
+                fresh_schema=new_schema,
+                sort_order_id=table.sort_order().order_id + 1,
+            )
+            new_partition_spec = assign_fresh_partition_spec_ids(
+                spec=new_partition_spec, old_schema=base_schema, 
fresh_schema=new_schema, spec_id=table.spec().spec_id + 1
+            )
+
+            requirements: Tuple[TableRequirement, ...] = 
(AssertTableUUID(uuid=table.metadata.table_uuid),)
+            updates: Tuple[TableUpdate, ...] = (

Review Comment:
   We need to clear the snapshots here as well:
   ```sql
   CREATE TABLE default.t3 AS SELECT 'Fokko' as name
   ```
   
   ```json
   {
     "format-version" : 2,
     "table-uuid" : "c61404c8-2211-46c7-866f-2eb87022b728",
     "location" : "s3://warehouse/default/t3",
     "last-sequence-number" : 1,
     "last-updated-ms" : 1710409653861,
     "last-column-id" : 1,
     "current-schema-id" : 0,
     "schemas" : [ {
       "type" : "struct",
       "schema-id" : 0,
       "fields" : [ {
         "id" : 1,
         "name" : "name",
         "required" : false,
         "type" : "string"
       } ]
     } ],
     "default-spec-id" : 0,
     "partition-specs" : [ {
       "spec-id" : 0,
       "fields" : [ ]
     } ],
     "last-partition-id" : 999,
     "default-sort-order-id" : 0,
     "sort-orders" : [ {
       "order-id" : 0,
       "fields" : [ ]
     } ],
     "properties" : {
       "owner" : "root",
       "created-at" : "2024-03-14T09:47:10.455199504Z",
       "write.parquet.compression-codec" : "zstd"
     },
     "current-snapshot-id" : -1,
     "refs" : { },
     "snapshots" : [ {
       "sequence-number" : 1,
       "snapshot-id" : 3622792816294171432,
       "timestamp-ms" : 1710409631964,
       "summary" : {
         "operation" : "append",
         "spark.app.id" : "local-1710405058122",
         "added-data-files" : "1",
         "added-records" : "1",
         "added-files-size" : "416",
         "changed-partition-count" : "1",
         "total-records" : "1",
         "total-files-size" : "416",
         "total-data-files" : "1",
         "total-delete-files" : "0",
         "total-position-deletes" : "0",
         "total-equality-deletes" : "0"
       },
       "manifest-list" : 
"s3://warehouse/default/t3/metadata/snap-3622792816294171432-1-e457c732-62e5-41eb-998b-abbb8f021ed5.avro",
       "schema-id" : 0
     } ],
     "statistics" : [ ],
     "snapshot-log" : [ ],
     "metadata-log" : [ {
       "timestamp-ms" : 1710409631964,
       "metadata-file" : 
"s3://warehouse/default/t3/metadata/00000-c82191f3-e6e2-4001-8e85-8623e3915ff7.metadata.json"
     } ]
   }
   ```
   
   ```sql
   CREATE OR REPLACE TABLE default.t3 (name string);
   ```
   
   ```json
   {
     "format-version" : 2,
     "table-uuid" : "c61404c8-2211-46c7-866f-2eb87022b728",
     "location" : "s3://warehouse/default/t3",
     "last-sequence-number" : 1,
     "last-updated-ms" : 1710411760623,
     "last-column-id" : 1,
     "current-schema-id" : 0,
     "schemas" : [ {
       "type" : "struct",
       "schema-id" : 0,
       "fields" : [ {
         "id" : 1,
         "name" : "name",
         "required" : false,
         "type" : "string"
       } ]
     } ],
     "default-spec-id" : 0,
     "partition-specs" : [ {
       "spec-id" : 0,
       "fields" : [ ]
     } ],
     "last-partition-id" : 999,
     "default-sort-order-id" : 0,
     "sort-orders" : [ {
       "order-id" : 0,
       "fields" : [ ]
     } ],
     "properties" : {
       "owner" : "root",
       "created-at" : "2024-03-14T09:47:10.455199504Z",
       "write.parquet.compression-codec" : "zstd"
     },
     "current-snapshot-id" : -1,
     "refs" : { },
     "snapshots" : [ {
       "sequence-number" : 1,
       "snapshot-id" : 3622792816294171432,
       "timestamp-ms" : 1710409631964,
       "summary" : {
         "operation" : "append",
         "spark.app.id" : "local-1710405058122",
         "added-data-files" : "1",
         "added-records" : "1",
         "added-files-size" : "416",
         "changed-partition-count" : "1",
         "total-records" : "1",
         "total-files-size" : "416",
         "total-data-files" : "1",
         "total-delete-files" : "0",
         "total-position-deletes" : "0",
         "total-equality-deletes" : "0"
       },
       "manifest-list" : 
"s3://warehouse/default/t3/metadata/snap-3622792816294171432-1-e457c732-62e5-41eb-998b-abbb8f021ed5.avro",
       "schema-id" : 0
     } ],
     "statistics" : [ ],
     "snapshot-log" : [ ],
     "metadata-log" : [ {
       "timestamp-ms" : 1710409631964,
       "metadata-file" : 
"s3://warehouse/default/t3/metadata/00000-c82191f3-e6e2-4001-8e85-8623e3915ff7.metadata.json"
     }, {
       "timestamp-ms" : 1710409653861,
       "metadata-file" : 
"s3://warehouse/default/t3/metadata/00001-0297d4e7-2468-4c0d-b4ed-ea717df8c3e6.metadata.json"
     } ]
   }
   ```
   



##########
pyiceberg/catalog/__init__.py:
##########
@@ -710,6 +760,45 @@ def _get_updated_props_and_update_summary(
 
         return properties_update_summary, updated_properties
 
+    def _replace_table(
+        self,
+        identifier: Union[str, Identifier],
+        new_schema: Union[Schema, "pa.Schema"],
+        new_partition_spec: PartitionSpec,
+        new_sort_order: SortOrder,
+        new_properties: Properties,
+        new_location: Optional[str] = None,
+    ) -> Table:
+        table = self.load_table(identifier)
+        with table.transaction() as tx:
+            base_schema = table.schema()
+            new_schema = assign_fresh_schema_ids(schema_or_type=new_schema, 
base_schema=base_schema)
+            new_sort_order = assign_fresh_sort_order_ids(
+                sort_order=new_sort_order,
+                old_schema=base_schema,
+                fresh_schema=new_schema,
+                sort_order_id=table.sort_order().order_id + 1,
+            )
+            new_partition_spec = assign_fresh_partition_spec_ids(
+                spec=new_partition_spec, old_schema=base_schema, 
fresh_schema=new_schema, spec_id=table.spec().spec_id + 1
+            )
+
+            requirements: Tuple[TableRequirement, ...] = 
(AssertTableUUID(uuid=table.metadata.table_uuid),)
+            updates: Tuple[TableUpdate, ...] = (
+                AddSchemaUpdate(schema=new_schema, 
last_column_id=new_schema.highest_field_id),
+                SetCurrentSchemaUpdate(schema_id=-1),
+                AddSortOrderUpdate(sort_order=new_sort_order),
+                SetDefaultSortOrderUpdate(sort_order_id=-1),
+                AddPartitionSpecUpdate(spec=new_partition_spec),
+                SetDefaultSpecUpdate(spec_id=-1),
+            )
+            tx._apply(updates, requirements)

Review Comment:
   With https://github.com/apache/iceberg-python/pull/471 in this is not 
necessary anymore! 🥳 
   ```suggestion
               tx._apply(updates, requirements)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to