[
https://issues.apache.org/jira/browse/SPARK-49933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-49933:
-----------------------------------
Labels: pull-request-available (was: )
> MERGE INTO does not push down the predicate of the WHEN NOT MATCHED BY SOURCE
> THEN DELETE clause (Iceberg)
> ----------------------------------------------------------------------------------------------------------
>
> Key: SPARK-49933
> URL: https://issues.apache.org/jira/browse/SPARK-49933
> Project: Spark
> Issue Type: Bug
> Components: Optimizer, SQL
> Affects Versions: 3.5.2
> Reporter: Viacheslav Inozemtsev
> Priority: Major
> Labels: pull-request-available
>
> h2. Description
> When writing to an Iceberg table using MERGE INTO command, and using WHEN NOT
> MATCHED BY SOURCE AND ... THEN DELETE clause, it is doing a full Iceberg
> table scan, even though the table is well partitioned. Without this clause,
> there is no full table scan.
> I’m using Spark 3.5.2 and Iceberg 1.6.0.
> h2. An example
> Table schema is as such:
> CREATE TABLE iceberg_table (
> part_col_1 STRING,
> part_col_2 STRING,
> part_col_3 TIMESTAMP,
> some_col_x INT,
> some_col_y STRING,
> main_value_col BIGINT
> )
> USING iceberg
> PARTITIONED BY (part_col_1, part_col_2, days(part_col_3)){{}}
>
> MERGE INTO command is as follows:
> MERGE INTO iceberg_table t
> USING new_data_to_upsert s
> ON
> t.part_col_1 = 'some_part_value_a' AND
> t.part_col_2 = 'some_part_value_b' AND
> t.part_col_3 = s.part_col_3 AND
> t.some_col_x = s.some_col_x AND
> t.some_col_y = s.some_col_y
> WHEN MATCHED THEN UPDATE SET t.main_value_col = s.main_value_col
> WHEN NOT MATCHED BY TARGET THEN INSERT *
> WHEN NOT MATCHED BY SOURCE AND
> t.part_col_1 = 'some_part_value_a' AND
> t.part_col_2 = 'some_part_value_b' AND
> t.part_col_3 >= to_timestamp('2024-09-25 00:00:00') AND
> t.part_col_3 < to_timestamp('2024-09-25 04:00:00')
> THEN DELETE
>
> View "{{{}new_data_to_upsert"{}}} has the same columns as the table, and all
> its records have
> * {{part_col_1}} equal "{{{}some_part_value_a"{}}},
> * {{part_col_2}} equal "{{{}some_part_value_b"{}}},
> * {{part_col_3}} in the range {{{}[2024-09-25 00:00:00; 2024-09-25
> 04:00:00){}}}.
> I also {{.persist()}} it before doing
> {{{}.createOrReplaceTempView("new_data_to_upsert"){}}}.
> Query plan shows for the {{iceberg_table}} scan the following:
> (1) BatchScan iceberg_table
> Output [8]: [some_col_x#511, part_col_3#512, part_col_1#513, part_col_2#514,
> some_col_y#515, main_value_col#516L, _file#520]
> iceberg_table (branch=null) [filters=true, groupedBy=]{{}}
> As far as I understand {{filters=true}} is the problem - no predicate
> push-down.
> When I try a regular SELECT or DELETE, it shows smth like:
> filters=[part_col_1 = 'some_part_value_a', part_col_2 = 'some_part_value_b',
> part_col_3 >= 1727222400000, part_col_3 < 1727236800000]
>
> If I remove "{{{}WHEN NOT MATCHED BY SOURCE AND … THEN DELETE"{}}} clause
> from the query, the plan is different and the BatchScan steps to the Iceberg
> table do have {{filters}} specified. No matter what I've tried with the
> predicate in this clause, it never resulted in the predicate push-down.
>
> I've created an issue in the Iceberg GitHub repo as well, but nobody has
> responded so far: https://github.com/apache/iceberg/issues/11248
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]