wzx140 opened a new issue, #11763:
URL: https://github.com/apache/iceberg/issues/11763
### Apache Iceberg version
1.5.0
### Query engine
Spark
### Please describe the bug 🐞
**Description**:
In Spark’s optimization rule *PartitionPruning*, the method
`SparkBatchQueryScan#filterAttributes` is called, which triggers the
computation of `Set<PartitionSpec> specs`. During this process, it iterates
over each file and parses the jsonString into `PartitionSpec`. To avoid
repeated parsing, a cache map was added in
`org.apache.iceberg.PartitionSpecParser#fromJson` with `(schema, jsonStr) ->
PartitionSpec`.
However, when dealing with tables containing a large number of files and
columns, **calculating the schema hash can consume significant CPU time**.
**Proposed Solution**:
Avro Schema mitigates this issue by **caching the schema’s hashCode** to
avoid repeated computations. A similar optimization could be applied to
Iceberg’s schema to reduce the performance regression caused by frequent schema
hash calculations.
**Reproduction Example**:
I added a timer to the method
`org.apache.iceberg.spark.source.SparkPartitioningAwareScan`
```java
protected Set<PartitionSpec> specs() {
if (specs == null) {
long ts = System.currentTimeMillis();
// avoid calling equals/hashCode on specs as those methods are
relatively expensive
IntStream specIds = tasks().stream().mapToInt(task ->
task.spec().specId()).distinct();
this.specs = specIds.mapToObj(id ->
table().specs().get(id)).collect(Collectors.toSet());
LOG.warn("Scanned {} specs in {} ms", specs.size(),
System.currentTimeMillis() - ts);
}
return specs;
}
```
and tested the following SQL query on a table with 900,000 files and 1500+
columns:
```sql
SELECT SUM(HASH(s.reqId + t.reqId))
FROM table s
JOIN table t
ON s.reqId = t.reqId and s.partition = 'part1' and t.partition = 'part1'
```
This query triggers
`org.apache.spark.sql.execution.dynamicpruning.PartitionPruning` optimization
rule twice. Before the task execution, **the driver spends approximately 150
seconds on pre-execution preparation, with over 140 seconds consumed in
calculating PartitionSpec**.
**Flame Graph**:
<img width="1279" alt="1"
src="https://github.com/user-attachments/assets/636e891d-48b6-4fb2-9bf8-e444769082a4"
/>
**Thread Dump**:
```
[email protected]/java.util.Arrays.hashCode(Arrays.java:4499)
[email protected]/java.util.Objects.hash(Objects.java:133)
org.apache.iceberg.types.Types$NestedField.hashCode(Types.java:523)
[email protected]/java.util.Arrays.hashCode(Arrays.java:4499)
[email protected]/java.util.Objects.hash(Objects.java:133)
org.apache.iceberg.types.Types$ListType.hashCode(Types.java:763)
[email protected]/java.util.Arrays.hashCode(Arrays.java:4499)
[email protected]/java.util.Objects.hash(Objects.java:133)
org.apache.iceberg.types.Types$NestedField.hashCode(Types.java:523)
[email protected]/java.util.Arrays.hashCode(Arrays.java:4499)
org.apache.iceberg.types.Types$StructType.hashCode(Types.java:630)
[email protected]/java.util.Arrays.hashCode(Arrays.java:4499)
org.apache.iceberg.relocated.com.google.common.base.Objects.hashCode(Objects.java:79)
org.apache.iceberg.util.Pair.hashCode(Pair.java:117)
[email protected]/java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2370)
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
org.apache.iceberg.PartitionSpecParser.fromJson(PartitionSpecParser.java:86)
org.apache.iceberg.BaseContentScanTask.spec(BaseContentScanTask.java:71) =>
holding Monitor(org.apache.iceberg.BaseFileScanTask@520850087)
org.apache.iceberg.BaseFileScanTask.spec(BaseFileScanTask.java:34)
org.apache.iceberg.spark.source.SparkPartitioningAwareScan.lambda$specs$1(SparkPartitioningAwareScan.java:165)
org.apache.iceberg.spark.source.SparkPartitioningAwareScan$$Lambda$3617/0x00007f58ed482c28.applyAsInt(Unknown
Source)
[email protected]/java.util.stream.ReferencePipeline$4$1.accept(ReferencePipeline.java:214)
[email protected]/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
[email protected]/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
[email protected]/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
[email protected]/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
[email protected]/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
[email protected]/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
org.apache.iceberg.spark.source.SparkPartitioningAwareScan.specs(SparkPartitioningAwareScan.java:166)
org.apache.iceberg.spark.source.SparkBatchQueryScan.filterAttributes(SparkBatchQueryScan.java:103)
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$.$anonfun$getFilterableTableScan$1(PartitionPruning.scala:82)
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$$Lambda$3616/0x00007f58ed48c3b0.apply(Unknown
Source)
app//scala.Option.flatMap(Option.scala:271)
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$.getFilterableTableScan(PartitionPruning.scala:62)
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.$anonfun$applyOrElse$1(PartitionPruning.scala:258)
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.$anonfun$applyOrElse$1$adapted(PartitionPruning.scala:241)
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1$$Lambda$3615/0x00007f58ed48bfd0.apply(Unknown
Source)
app//scala.collection.immutable.List.foreach(List.scala:431)
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.applyOrElse(PartitionPruning.scala:241)
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.applyOrElse(PartitionPruning.scala:219)
```
**Environment**:
I tested this issue on Iceberg 1.5.0, and it is expected to persist in the
latest version as well.
### Willingness to contribute
- [X] I can contribute a fix for this bug independently
- [X] I would be willing to contribute a fix for this bug with guidance from
the Iceberg community
- [ ] I cannot contribute a fix for this bug at this time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]