AndrejIring commented on issue #3105:
URL:
https://github.com/apache/iceberg-python/issues/3105#issuecomment-4351068631
Same issue with `add_column`, the issue stems from the `get_rows_to_update`
in `upsert_utils` where the data is cast to the underlying table schema. Which
fails since the schemas are different (`pyarrow` cast is not
adding/deleting/reordering the columns).
This makes `upsert` unusable for the tables with updated schemas.
```python
from pyiceberg.catalog import load_catalog
import polars as pl
from pyiceberg.types import StringType
catalog = load_catalog("default", **{"type": "in-memory"})
df = pl.DataFrame(
[
{"id": 1, "name": "Alice", "age": 30, "city": "São Paulo"},
{"id": 2, "name": "Bob", "age": 25, "city": "Rio de Janeiro"},
{"id": 3, "name": "Carol", "age": 35, "city": "Belo Horizonte"},
{"id": 4, "name": "David", "age": 28, "city": "Curitiba"},
]
)
arrow = df.to_arrow()
catalog.create_namespace_if_not_exists("default")
catalog.create_table_if_not_exists("default.my_table", arrow.schema)
table = catalog.load_table("default.my_table")
table.append(arrow)
# Add a new column
arrow = df.with_columns(ping=pl.lit("pong")).to_arrow()
# Update schema to include the new column
with table.update_schema() as update_schema:
update_schema.add_column("ping", StringType())
table = table.refresh()
print(table.schema())
try:
# This fails with ValueError
table.upsert(arrow, ["id"])
finally:
catalog.drop_table("default.my_table")
```
Hopefully I will create today a PR for resolving this basically these lines
https://github.com/apache/iceberg-python/blob/b67b7241690e05413f3cfea69287f897b0b2410e/pyiceberg/table/upsert_util.py#L91-L95
should be changed to something like this:
``` python
join_cols_schema = pa.schema([target_table.schema.field(name) for name
in join_cols_set])
source_index = (
source_table.select(join_cols_set).cast(join_cols_schema)
.append_column(SOURCE_INDEX_COLUMN_NAME,
pa.array(range(len(source_table))))
)
```
Currently working workaround is to use `table.overwrite` after the schema
evolution, to update the underlying tables.
Example of the workaround:
``` python
from pyiceberg.catalog import load_catalog
import polars as pl
from pyiceberg.types import StringType
catalog = load_catalog("default", **{"type": "in-memory"})
df = pl.DataFrame(
[
{"id": 1, "name": "Alice", "age": 30, "city": "São Paulo"},
{"id": 2, "name": "Bob", "age": 25, "city": "Rio de Janeiro"},
{"id": 3, "name": "Carol", "age": 35, "city": "Belo Horizonte"},
{"id": 4, "name": "David", "age": 28, "city": "Curitiba"},
]
)
arrow = df.to_arrow()
catalog.create_namespace_if_not_exists("default")
catalog.create_table_if_not_exists("default.my_table", arrow.schema)
table = catalog.load_table("default.my_table")
table.append(arrow)
# Add a new column
# Update schema to include the new column
with table.update_schema() as update_schema:
update_schema.add_column("ping", StringType())
table = table.refresh()
arrow = df.with_columns(ping=pl.lit(None, dtype=pl.String)).to_arrow()
table.overwrite(arrow)
print(table.scan().to_polars())
try:
arrow = df.with_columns(ping=pl.lit("pong")).to_arrow()
# No longer fails
table.upsert(arrow, ["id"])
finally:
catalog.drop_table("default.my_table")
print(table.scan().to_polars())
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]