mgmarino opened a new issue, #9678:
URL: https://github.com/apache/iceberg/issues/9678
### Query engine
AWS Athena, verified on Spark/EMR as well (3.4.1, Iceberg 1.4.3)
### Question
We are currently looking to migrate several of our old Hive tables to
Iceberg. Our tables were partitioned like:
`year=../month=../day=..`
and we typically did things like the following:
```
WHERE
year || month || day BETWEEN '20230104' AND '20230110'
```
to select particular partitions in both Spark and Trino/Athena. It seems
this particular type of processing on the `WHERE` predicate does not work as
expected (i.e. does not partition prune) in Iceberg and I wanted to know if
this was a known limitation? (Apologies, I couldn't find this anywhere in the
docs!)
An example from Athena/Trino:
```sql
EXPLAIN ANALYZE
SELECT
COUNT(*) as num_rows
FROM prod.table_iceberg
WHERE year = '2024'
AND month = '1'
```
outputs:
```
Query Plan
Queued: 544.36us, Analysis: 502.40ms, Planning: 190.60ms, Execution: 8.55s
Fragment 1 [SINGLE]
CPU: 411.84ms, Scheduled: 1.43s, Blocked 33.56s (Input: 26.45s, Output:
0.00ns), Input: 13972 rows (122.80kB), Data Scanned: 0B; per task: avg.:
13972.00 std.dev.: 0.00, Output: 1 row (9B)
Output layout: [count]
Output partitioning: SINGLE []
Aggregate[type = FINAL]
│ Layout: [count:bigint]
│ Estimates: {rows: 1 (9B), cpu: 9, memory: 9B, network: 0B}
│ CPU: 67.00ms (0.01%), Scheduled: 134.00ms (0.00%), Blocked: 0.00ns
(0.00%), Output: 1 row (9B)
│ Input avg.: 13972.00 rows, Input std.dev.: 0.00%
│ count := count("count_0")
└─ LocalExchange[partitioning = SINGLE]
│ Layout: [count_0:bigint]
│ Estimates: {rows: 1 (9B), cpu: 0, memory: 0B, network: 0B}
│ CPU: 114.00ms (0.01%), Scheduled: 265.00ms (0.01%), Blocked:
7.15s (21.28%), Output: 13972 rows (122.80kB)
│ Input avg.: 3493.00 rows, Input std.dev.: 74.24%
└─ RemoteSource[sourceFragmentIds = [2]]
Layout: [count_0:bigint]
CPU: 187.00ms (0.02%), Scheduled: 956.00ms (0.03%), Blocked:
26.45s (78.72%), Output: 13972 rows (122.80kB)
Input avg.: 3493.00 rows, Input std.dev.: 74.24%
Fragment 2 [SOURCE]
CPU: 12.95m, Scheduled: 47.38m, Blocked 0.00ns (Input: 0.00ns, Output:
0.00ns), Input: 62896440400 rows (0B), Data Scanned: 0B; per task: avg.:
2096548013.33 std.dev.: 877588981.69, Output: 13972 rows (122.80kB)
Output layout: [count_0]
Output partitioning: SINGLE []
Aggregate[type = PARTIAL]
│ Layout: [count_0:bigint]
│ Estimates: {rows: 1 (9B), cpu: 0, memory: 9B, network: 0B}
│ CPU: 5.30m (40.94%), Scheduled: 15.36m (32.41%), Blocked: 0.00ns
(0.00%), Output: 13972 rows (122.80kB)
│ Input avg.: 4501606.10 rows, Input std.dev.: 103.44%
│ count_0 := count(*)
└─ TableScan[table =
awsdatacatalog$iceberg-aws:prod.table_iceberg$data@4799610969623733758
constraint on [year, month]]
Layout: []
Estimates: {rows: ? (0B), cpu: 0, memory: 0B, network: 0B}
CPU: 7.64m (59.01%), Scheduled: 32.00m (67.54%), Blocked: 0.00ns
(0.00%), Output: 62896440400 rows (0B)
Input avg.: 4501606.10 rows, Input std.dev.: 103.44%
181:year:varchar
:: [[2024]]
182:month:varchar
:: [[1]]
Input: 62896440400 rows (0B), Physical input time: 527230.00ms
```
where the output seems to indicate to me that this is a metadata-only read
(0B read). It is fairly fast (~9 s)
The same query using concatenation looks like:
```sql
EXPLAIN ANALYZE
SELECT COUNT(*) as num_rows
FROM production_metastore.processed_final_iceberg
WHERE year || lpad(month, 2, '0') = '202401'
```
and results in:
```
Query Plan
Queued: 270.97us, Analysis: 920.95ms, Planning: 465.63ms, Execution: 1.21m
Fragment 1 [SINGLE]
CPU: 466.36ms, Scheduled: 4.98s, Blocked 3.22m (Input: 2.30m, Output:
0.00ns), Input: 13972 rows (122.80kB), Data Scanned: 0B; per task: avg.:
13972.00 std.dev.: 0.00, Output: 1 row (9B)
Output layout: [count]
Output partitioning: SINGLE []
Aggregate[type = FINAL]
│ Layout: [count:bigint]
│ Estimates: {rows: 1 (9B), cpu: 9, memory: 9B, network: 0B}
│ CPU: 64.00ms (0.00%), Scheduled: 145.00ms (0.00%), Blocked: 0.00ns
(0.00%), Output: 1 row (9B)
│ Input avg.: 13972.00 rows, Input std.dev.: 0.00%
│ count := count("count_0")
└─ LocalExchange[partitioning = SINGLE]
│ Layout: [count_0:bigint]
│ Estimates: {rows: 1 (9B), cpu: 0, memory: 0B, network: 0B}
│ CPU: 136.00ms (0.00%), Scheduled: 3.47s (0.00%), Blocked: 57.18s
(29.30%), Output: 13972 rows (122.80kB)
│ Input avg.: 3493.00 rows, Input std.dev.: 116.81%
└─ RemoteSource[sourceFragmentIds = [2]]
Layout: [count_0:bigint]
CPU: 186.00ms (0.00%), Scheduled: 1.03s (0.00%), Blocked:
2.30m (70.70%), Output: 13972 rows (122.80kB)
Input avg.: 3493.00 rows, Input std.dev.: 116.81%
Fragment 2 [SOURCE]
CPU: 7.92h, Scheduled: 23.33h, Blocked 0.00ns (Input: 0.00ns, Output:
0.00ns), Input: 62896440400 rows (111.12MB), Data Scanned: 0B; per task: avg.:
2096548013.33 std.dev.: 436430202.74, Output: 13972 rows (122.80kB)
Output layout: [count_0]
Output partitioning: SINGLE []
Aggregate[type = PARTIAL]
│ Layout: [count_0:bigint]
│ Estimates: {rows: 1 (9B), cpu: 0, memory: 9B, network: 0B}
│ CPU: 1.34m (0.28%), Scheduled: 5.13m (0.37%), Blocked: 0.00ns
(0.00%), Output: 13972 rows (122.80kB)
│ Input avg.: 4501606.10 rows, Input std.dev.: 103.44%
│ count_0 := count(*)
└─ ScanFilterProject[table =
awsdatacatalog$iceberg-aws:prod.table_iceberg$data@4799610969623733758,
filterPredicate = (concat("year", lpad("month", BIGINT '2', '0')) = VARCHAR
'202401'), projectLocality = LOCAL, protectedBarrier = NONE]
Layout: []
Estimates: {rows: ? (0B), cpu: ?, memory: 0B, network: 0B}/{rows:
? (0B), cpu: ?, memory: 0B, network: 0B}/{rows: ? (0B), cpu: 0, memory: 0B,
network: 0B}
CPU: 7.90h (99.72%), Scheduled: 23.25h (99.63%), Blocked: 0.00ns
(0.00%), Output: 62896440400 rows (0B)
Input avg.: 4501606.10 rows, Input std.dev.: 103.44%
month := 182:month:varchar
year := 181:year:varchar
Input: 62896440400 rows (111.12MB), Filtered: 0.00%, Physical
input time: 973630.00ms
```
which looks, to me, to actually need to scan the data and takes much longer
(70 s).
We see similar behavior in Spark, which is actually where we originally came
across it as we were testing the migration. v
It this is a known issue, is there any expectation on when it may be
addressed and/or any workarounds aside from rewriting queries? We were hoping
to be able to migrate to Iceberg without having to adapt all of our queries
(aside from e.g. introducing an Iceberg catalog), but this would seem to limit
that. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]