tmnd1991 commented on PR #9233:
URL: https://github.com/apache/iceberg/pull/9233#issuecomment-1847179494
Sure, let me add a bit of context:
I have two table with the exact same schema/layout, partitioned on 3 columns:
- identity(MEAS_YM)
- identity(MEAS_DD)
- bucket(POD, 4)
The source table (small one) has strictly a subset of partitions w/r/t the
target table (big one).
In this example I will talk about a local reproducer but keep in mind we are
talking about a 65TB table with 400k partitions, so every 1% improvement
actually means a lot.
I started running a merge statement as following, taking advantage of SPJ:
```
MERGE INTO target USING (SELECT * FROM source)
ON target.MEAS_YM = source.MEAS_YM AND target. MEAS_DD = source. MEAS_DD AND
target.POD = source.POD
WHEN MATCHED THEN UPDATE SET ...
```
This results in the following physical plan:
```
== Physical Plan ==
ReplaceData (13)
+- * Sort (12)
+- * Project (11)
+- MergeRows (10)
+- SortMergeJoin FullOuter (9)
:- * Sort (4)
: +- * Project (3)
: +- * ColumnarToRow (2)
: +- BatchScan target (1)
+- * Sort (8)
+- * Project (7)
+- * ColumnarToRow (6)
+- BatchScan source (5)
===== Subqueries =====
Subquery:1 Hosting operator id = 1 Hosting Expression = _file#2274 IN
subquery#2672
* HashAggregate (26)
+- Exchange (25)
+- * HashAggregate (24)
+- * Project (23)
+- * SortMergeJoin LeftSemi (22)
:- * Sort (17)
: +- * Filter (16)
: +- * ColumnarToRow (15)
: +- BatchScan target (14)
+- * Sort (21)
+- * Filter (20)
+- * ColumnarToRow (19)
+- BatchScan source (18)
```
with
```
(1) BatchScan target
Output [60]: [..., _file#2274]
target (branch=null) [filters=, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]
(5) BatchScan source
Output [60]: [...]
source (branch=null) [filters=, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]
(14) BatchScan target
Output [8]: [..., _file#2590]
target (branch=null) [filters=POD IS NOT NULL, MEAS_YM IS NOT NULL, MEAS_DD
IS NOT NULL, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]
(18) BatchScan source
Output [7]: [...]
source (branch=null) [filters=POD IS NOT NULL, MEAS_YM IS NOT NULL, MEAS_DD
IS NOT NULL, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]
```
This was creating 33 (+10 to exchange the file names) tasks for the subquery
and 33 tasks for the second join.
Practically I know for sure that I hit only 25 partitions, not 33 (i.e. some
files were still read even if we know upfront that they are not needed, also
the `_file IN (subquery)` can't prune any file because it's dynamic. On top of
that, I observed that even if files should've been excluded by Spark in
post-scan filter, still the execution of the task was not as fast as I expected
(i.e. close to 0ms)).
Therefore, knowing exactly the partitions that I hit beforehand, I tried to
help iceberg/spark a little enumerating the partitions values that are actually
hit:
```
MERGE INTO target USING (SELECT * FROM source)
ON target.`POD` = source.`POD` AND target.`MEAS_YM` = source.`MEAS_YM` AND
target.`MEAS_DD` = source.`MEAS_DD` AND (
(target.`meas_ym` = '202306' AND target.`meas_dd` = '02' AND
system.bucket(4, target.`pod`) IN (0,2,3)) OR
(target.`meas_ym` = '202306' AND target.`meas_dd` = '01') OR
(target.`meas_ym` = '202307' AND target.`meas_dd` = '02' AND
system.bucket(4, target.`pod`) IN (1,3)) OR
(target.`meas_ym` = '202306' AND target.`meas_dd` = '03') OR
(target.`meas_ym` = '202308' AND target.`meas_dd` = '01' AND
system.bucket(4, target.`pod`) IN (0,1,2)) OR
(target.`meas_ym` = '202307' AND target.`meas_dd` = '03' AND
system.bucket(4, target.`pod`) IN (0,1,2)) OR
(target.`meas_ym` = '202308' AND target.`meas_dd` = '03' AND
system.bucket(4, target.`pod`) IN (0,3)) OR
(target.`meas_ym` = '202307' AND target.`meas_dd` = '01' AND
system.bucket(4, target.`pod`) IN (0,1,2)) OR
(target.`meas_ym` = '202308' AND target.`meas_dd` = '02' AND
system.bucket(4, target.`pod`) IN (3)))
WHEN MATCHED THEN UPDATE SET ...
```
To my surprise the plan was exactly the same...
Then I fixed this issue and also #9191 locally (adding an optimiser to my
spark session) and the scans actually changed:
```
(1) BatchScan target
Output [60]: [..., _file#2279]
target (branch=null) [filters=((((MEAS_YM = '202306' AND ((MEAS_DD = '02'
AND bucket[4](POD) IN (0, 2, 3)) OR MEAS_DD = '01')) OR ((MEAS_YM = '202307'
AND MEAS_DD = '02') AND bucket[4](POD) IN (1, 3))) OR ((MEAS_YM = '202306' AND
MEAS_DD = '03') OR ((MEAS_YM = '202308' AND MEAS_DD = '01') AND bucket[4](POD)
IN (0, 1, 2)))) OR ((MEAS_DD = '03' AND ((MEAS_YM = '202307' AND bucket[4](POD)
IN (0, 1, 2)) OR (MEAS_YM = '202308' AND bucket[4](POD) IN (0, 3)))) OR
(((MEAS_YM = '202307' AND MEAS_DD = '01') AND bucket[4](POD) IN (0, 1, 2)) OR
((MEAS_YM = '202308' AND MEAS_DD = '02') AND bucket[4](POD) = 3)))),
groupedBy=MEAS_YM, MEAS_DD, POD_bucket]
(5) BatchScan source
Output [60]: [...]
source (branch=null) [filters=, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]
(14) BatchScan target
Output [8]: [..., _file#2590]
target (branch=null) [filters=((((MEAS_YM = '202306' AND ((MEAS_DD = '02'
AND bucket[4](POD) IN (0, 2, 3)) OR MEAS_DD = '01')) OR ((MEAS_YM = '202307'
AND MEAS_DD = '02') AND bucket[4](POD) IN (1, 3))) OR ((MEAS_YM = '202306' AND
MEAS_DD = '03') OR ((MEAS_YM = '202308' AND MEAS_DD = '01') AND bucket[4](POD)
IN (0, 1, 2)))) OR ((MEAS_DD = '03' AND ((MEAS_YM = '202307' AND bucket[4](POD)
IN (0, 1, 2)) OR (MEAS_YM = '202308' AND bucket[4](POD) IN (0, 3)))) OR
(((MEAS_YM = '202307' AND MEAS_DD = '01') AND bucket[4](POD) IN (0, 1, 2)) OR
((MEAS_YM = '202308' AND MEAS_DD = '02') AND bucket[4](POD) = 3)))), POD IS NOT
NULL, MEAS_YM IS NOT NULL, MEAS_DD IS NOT NULL, MAGNITUDE IS NOT NULL,
METER_KEY IS NOT NULL, REC_ID IS NOT NULL, COLLECT_ID IS NOT NULL,
groupedBy=MEAS_YM, MEAS_DD, POD_bucket]
(18) BatchScan source
Output [7]: [...]
source (branch=null) [filters=POD IS NOT NULL, MEAS_YM IS NOT NULL, MEAS_DD
IS NOT NULL, MAGNITUDE IS NOT NULL, METER_KEY IS NOT NULL, REC_ID IS NOT NULL,
COLLECT_ID IS NOT NULL, groupedBy=MEAS_YM, MEAS_DD, POD_bucket]
```
With this plan I obtain 25 (+10 of shuffle) + 25 tasks, hitting actually
only the minimum number of partitions.
----
Given the context, I think that I probably highlighted 2 "bugs":
1. the fact that also the full-outer join condition can be used to prune
partitions (fixed in this PR)
2. for some reason spark is not able to detect correctly the minimum subset
of hit partitions (maybe I can work on another PR for this, but I guess it's
much harder and maybe part of Spark codebase)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]