korbel-jacek opened a new issue, #10891:
URL: https://github.com/apache/iceberg/issues/10891
### Apache Iceberg version
1.4.2
### Query engine
Spark
### Please describe the bug 🐞
Hi, I am trying to MERGE a small iceberg table into a large iceberg table,
but the performance is bad.
I have 2 iceberg tables split into buckets and sorted locally by these
columns to use Storage Partition Join and prevent shuffling. Additionally, I
use merge-on-read to make merge into operations faster, but it still takes a
lot of time to sort these tables when merging. There is a sort step before the
sort merge join. Is it possible to prevent this sorting step somehow, as I
assume we do not need additional sorting?
```
== Physical Plan ==
WriteDelta (14)
+- * Sort (13)
+- Exchange (12)
+- MergeRows (11)
+- * Project (10)
+- * SortMergeJoin RightOuter (9)
:- * Sort (5)
: +- * Filter (4)
: +- * Project (3)
: +- * ColumnarToRow (2)
: +- BatchScan spark_catalog.default.customer3 (1)
+- * Sort (8)
+- * ColumnarToRow (7)
+- BatchScan spark_catalog.default.customer4 (6)
(8) Sort [codegen id : 2]
Input [5]: [customer_id#127, name#128, country#129, order_id#130, amount#131]
Arguments: [customer_id#127 ASC NULLS FIRST, order_id#130 ASC NULLS FIRST],
false, 0
```

Example code to reproduce:
`spark-shell --packages
org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
--conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
--conf spark.sql.catalog.spark_catalog.type=hadoop --conf
spark.sql.sources.v2.bucketing.enabled=true --conf
spark.sql.sources.v2.bucketing.push.part.values.enabled=true --conf
spark.sql.requireAllClusterKeysForCoPartition=false --conf
spark.sql.iceberg.planning.preserve-data-grouping=true --conf
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled=true
--conf spark.sql.sources.v2.bucketing.pushPartValues.enabled=true --conf
spark.dynamicAllocation.enabled=false --conf
spark.shuffle.useOldFetchProtocol=true --conf spark.sql.shuffle.partitions=10
--conf spark.sql.adaptive.enabled=false --conf
spark.sql.join.preferSortMergeJoin=false --conf
spark.sql.bucketing.coalesceBucketsInJoin.enabled=true --conf spar
k.sql.catalog.spark_catalog.warehouse=spark-warehouse/iceberg
`
```
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SparkSession, DataFrame}
// Create sample data for customers
val customerData = Seq(
(1, "John Doe", "USA",1,1),
(2, "Jane Smith", "Canada",1,1),
(3, "Alice Johnson", "UK",1,1),
(4, "Bob Brown", "USA",1,1),
(5, "Charlie Davis", "Canada",1,1)
)
// Create DataFrame for customers
val customerDF = spark.createDataFrame(customerData).toDF("customer_id",
"name", "country", "order_id","amount")
// Partition both DataFrames by customer_id
val partitionedCustomerDF = customerDF.repartition(col("customer_id"))
partitionedCustomerDF.writeTo("default.customer3").tableProperty("write.distribution-mode","range").partitionedBy(bucket(10,col("customer_id")),bucket(10,col("order_id"))).using("iceberg").createOrReplace()
partitionedCustomerDF.writeTo("default.customer4").tableProperty("write.distribution-mode","range").partitionedBy(bucket(10,col("customer_id")),bucket(10,col("order_id"))).using("iceberg").createOrReplace()
spark.sql("TRUNCATE table default.customer4")
spark.sql("TRUNCATE table default.customer3")
spark.sql("ALTER TABLE default.customer4 WRITE LOCALLY ORDERED BY
customer_id, order_id").show
spark.sql("ALTER TABLE default.customer3 WRITE LOCALLY ORDERED BY
customer_id, order_id").show
spark.sql("ALTER TABLE default.customer3 SET TBLPROPERTIES
('write.delete.mode'='merge-on-read','write.update.mode'='merge-on-read','write.merge.mode'='merge-on-read')");
spark.sql("ALTER TABLE default.customer4 SET TBLPROPERTIES
('write.delete.mode'='merge-on-read','write.update.mode'='merge-on-read','write.merge.mode'='merge-on-read')");
partitionedCustomerDF.writeTo("default.customer4").append()
partitionedCustomerDF.writeTo("default.customer3").append()
spark.sql("""
MERGE INTO default.customer3 AS target
USING default.customer4 AS source
ON target.customer_id = source.customer_id AND target.order_id =
source.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""").show()
```
### Willingness to contribute
- [ ] I can contribute a fix for this bug independently
- [ ] I would be willing to contribute a fix for this bug with guidance from
the Iceberg community
- [ ] I cannot contribute a fix for this bug at this time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]