arunb2w opened a new issue, #6453:
URL: https://github.com/apache/iceberg/issues/6453
### Apache Iceberg version
1.1.0 (latest release)
### Query engine
Spark
### Please describe the bug 🐞
I am working with a table that is around 200G(2B rows) in size and contains
more than 100 columns.
My incoming incremental contains 6M events which am trying to merge using
the merge statement. But the merge took around 30 minutes on a 10 node cluster
of r5.16xlarge.
So, I tried with delete-append with **'write.delete.mode'='merge-on-read'**
to see how the performance is but seeing some discrepancy here.
Below are the steps am doing
1. Creating temp view for the incoming events.
```
input_df.createOrReplaceTempView("source")
input_df.cache()
```
2. Getting full row representation to append after delete
```
inflate_sql = "select if(array_contains(source.changed_cols, 'ID'),
source.ID, target.ID) as ID, if(array_contains(source.changed_cols,
'_ETL_RUN_ID_'), source._ETL_RUN_ID_, target._ETL_RUN_ID_) as
_ETL_RUN_ID_,if(array_contains(source.changed_cols, '_ETL_MODIFIED_'),
source._ETL_MODIFIED_, target._ETL_MODIFIED_) as
_ETL_MODIFIED_,if(array_contains(deflate.changed_cols, '_EXTRACTED_'),
deflate._EXTRACTED_, target._EXTRACTED_) as
_EXTRACTED_,if(array_contains(source.changed_cols, '_SOURCE_EXTRACTED_'),
source._SOURCE_EXTRACTED_, target._SOURCE_EXTRACTED_) as
_SOURCE_EXTRACTED_,if(array_contains(source.changed_cols, '_IS_DELETED_'),
source._IS_DELETED_, target._IS_DELETED_) as
_IS_DELETED_,if(array_contains(source.changed_cols, '_LAST_MODIFIED_SEQ_'),
source._LAST_MODIFIED_SEQ_, target._LAST_MODIFIED_SEQ_) as
_LAST_MODIFIED_SEQ_,if(array_contains(source.changed_cols, '_SCHEMA_CLASS_'),
source._SCHEMA_CLASS_, target._SCHEMA_CLASS_) as
_SCHEMA_CLASS_,if(array_contains(source.changed_c
ols, '_CONTEXT_ID_'), source._CONTEXT_ID_, target._CONTEXT_ID_) as
_CONTEXT_ID_ from glue_dev.datalakectxsort.clinicalprescription target join
source on target._context_id_ = cast(source._context_id_ as decimal(12, 0)) and
target.id = cast(source.id as decimal(12, 0))"
inflated_df = spark.sql(inflate_sql)
```
3. Deleting from the target
```
delete_sql = "delete from glue_dev.datalakectxsort.clinicalprescription as
target
where exists(select 1 from deflate_table source where
target._context_id_ = source._context_id_ and target.id = source.id)"
spark.sql(delete_sql)
```
4. Appending the deleted records based on full row df that prepeared earlier
`inflated_df.writeTo("glue_dev.datalakectxsort.clinicalprescription").append()`
The problem here is that the deleted rows are not getting inserted even
though I prepared the full row dataframe(inflated_df) before the delete command
execution. The reason I believe is due to sparks lazy execution where it
executes the full row query only during the append call.
To overcome this, I also tried timetravel query to prepare the full row by
pointing to a previous version before delete using timestamp keyword, basically
my target would look like this in the above select query `(select * from
glue_dev.datalakectxsort.clinicalprescription timestamp as of '2022-12-19
06:13:02')` target but this is causing the below error.
`Cannot write to table at a specific snapshot: 7305533148640597772`
I am using snapshot only in the SELECT query while preparing the full row
for append and during append am using the target table only but not sure why
append is throwing this error.
Please advise on how to overcome this. Thanks in advance!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]