[ 
https://issues.apache.org/jira/browse/SPARK-49933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-49933:
-----------------------------------
    Labels: pull-request-available  (was: )

> MERGE INTO does not push down the predicate of the WHEN NOT MATCHED BY SOURCE 
> THEN DELETE clause (Iceberg)
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-49933
>                 URL: https://issues.apache.org/jira/browse/SPARK-49933
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, SQL
>    Affects Versions: 3.5.2
>            Reporter: Viacheslav Inozemtsev
>            Priority: Major
>              Labels: pull-request-available
>
> h2. Description
> When writing to an Iceberg table using MERGE INTO command, and using WHEN NOT 
> MATCHED BY SOURCE AND ... THEN DELETE clause, it is doing a full Iceberg 
> table scan, even though the table is well partitioned. Without this clause, 
> there is no full table scan.
> I’m using Spark 3.5.2 and Iceberg 1.6.0.
> h2. An example
> Table schema is as such:
> CREATE TABLE iceberg_table (
>   part_col_1 STRING,
>   part_col_2 STRING,
>   part_col_3 TIMESTAMP,
>   some_col_x INT,
>   some_col_y STRING,
>   main_value_col BIGINT
> )
> USING iceberg
> PARTITIONED BY (part_col_1, part_col_2, days(part_col_3)){{}}
>  
> MERGE INTO command is as follows:
> MERGE INTO iceberg_table t
> USING new_data_to_upsert s
> ON
> t.part_col_1 = 'some_part_value_a' AND
> t.part_col_2 = 'some_part_value_b' AND
> t.part_col_3 = s.part_col_3 AND
> t.some_col_x = s.some_col_x AND
> t.some_col_y = s.some_col_y
> WHEN MATCHED THEN UPDATE SET t.main_value_col = s.main_value_col
> WHEN NOT MATCHED BY TARGET THEN INSERT *
> WHEN NOT MATCHED BY SOURCE AND
> t.part_col_1 = 'some_part_value_a' AND
> t.part_col_2 = 'some_part_value_b' AND
> t.part_col_3 >= to_timestamp('2024-09-25 00:00:00') AND
> t.part_col_3 < to_timestamp('2024-09-25 04:00:00')
> THEN DELETE
>  
> View "{{{}new_data_to_upsert"{}}} has the same columns as the table, and all 
> its records have
>  * {{part_col_1}} equal "{{{}some_part_value_a"{}}},
>  * {{part_col_2}} equal "{{{}some_part_value_b"{}}},
>  * {{part_col_3}} in the range {{{}[2024-09-25 00:00:00; 2024-09-25 
> 04:00:00){}}}.
> I also {{.persist()}} it before doing 
> {{{}.createOrReplaceTempView("new_data_to_upsert"){}}}.
> Query plan shows for the {{iceberg_table}} scan the following:
> (1) BatchScan iceberg_table
> Output [8]: [some_col_x#511, part_col_3#512, part_col_1#513, part_col_2#514, 
> some_col_y#515, main_value_col#516L, _file#520]
> iceberg_table (branch=null) [filters=true, groupedBy=]{{}}
> As far as I understand {{filters=true}} is the problem - no predicate 
> push-down.
> When I try a regular SELECT or DELETE, it shows smth like:
> filters=[part_col_1 = 'some_part_value_a', part_col_2 = 'some_part_value_b', 
> part_col_3 >= 1727222400000, part_col_3 < 1727236800000]
>  
> If I remove "{{{}WHEN NOT MATCHED BY SOURCE AND … THEN DELETE"{}}} clause 
> from the query, the plan is different and the BatchScan steps to the Iceberg 
> table do have {{filters}} specified. No matter what I've tried with the 
> predicate in this clause, it never resulted in the predicate push-down.
>  
> I've created an issue in the Iceberg GitHub repo as well, but nobody has 
> responded so far: https://github.com/apache/iceberg/issues/11248



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to