Thijsvandepoll opened a new issue, #7656:
URL: https://github.com/apache/iceberg/issues/7656
### Apache Iceberg version
1.2.1 (latest release)
### Query engine
Spark
### Please describe the bug 🐞
Hi, I have encountered a major bug with `MERGE INTO` in Spark when using a
python UDF. The problem is that when the column that is used in the `ON` clause
of `MERGE INTO` has been affected by a UDF, the merge throws an error:
```
java.util.concurrent.ExecutionException:
org.apache.spark.SparkUnsupportedOperationException: Cannot generate code for
expression: ...
```
I have created an example to showcase the issue:
```
import os
import pyspark.sql.functions as fn
import pyspark.sql.types as tp
from pyspark.sql import SparkSession
deps = [
"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1",
"org.apache.iceberg:iceberg-aws:1.2.1",
"software.amazon.awssdk:bundle:2.17.257",
"software.amazon.awssdk:url-connection-client:2.17.257",
]
os.environ["PYSPARK_SUBMIT_ARGS"] = f"--packages {','.join(deps)}
pyspark-shell"
os.environ["AWS_ACCESS_KEY_ID"] = "minioadmin"
os.environ["AWS_SECRET_ACCESS_KEY"] = "minioadmin"
os.environ["AWS_REGION"] = "eu-east-1"
catalog = "hive"
spark = (
SparkSession.builder.appName("Iceberg Reader")
.config(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
)
.config(f"spark.sql.catalog.{catalog}",
"org.apache.iceberg.spark.SparkCatalog")
.config(f"spark.sql.catalog.{catalog}.type", "hive")
.config(f"spark.sql.catalog.{catalog}.uri", "thrift://localhost:9083")
.config(
f"spark.sql.catalog.{catalog}.io-impl",
"org.apache.iceberg.aws.s3.S3FileIO"
)
.config(f"spark.sql.catalog.{catalog}.s3.endpoint",
"http://localhost:9000")
.config(f"spark.sql.catalog.{catalog}.warehouse", "s3a://lakehouse")
.config("spark.sql.defaultCatalog", catalog)
.config("hive.metastore.uris", "thrift://localhost:9083")
.enableHiveSupport()
.getOrCreate()
)
@fn.udf(returnType=tp.IntegerType())
def return_self(inpt):
return inpt
namespace = "ns"
table = "tab1"
# Create namespaces and table
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {namespace};")
spark.sql(f"CREATE TABLE IF NOT EXISTS {namespace}.{table} (col1 INT, col2
INT);")
# Create some data
df = spark.createDataFrame(
[(1, 2), (2, 3), (3, 4)],
schema=tp.StructType(
[
tp.StructField("col1", tp.IntegerType()),
tp.StructField("col2", tp.IntegerType()),
]
),
)
# This does work, because col1 is NOT affected!
df = df.withColumn("col2", return_self("col1"))
df.createOrReplaceTempView("tmp")
spark.sql(
f"""
MERGE INTO {namespace}.{table} A USING (SELECT * FROM tmp) B
ON A.col1 = B.col1
WHEN MATCHED THEN UPDATE SET A.col1 = B.col1, A.col2 = B.col2
WHEN NOT MATCHED THEN INSERT *
"""
)
# This does NOT work!
df = df.withColumn("col1", return_self("col1"))
df.createOrReplaceTempView("tmp")
spark.sql(
f"""
MERGE INTO {namespace}.{table} A USING (SELECT * FROM tmp) B
ON A.col1 = B.col1
WHEN MATCHED THEN UPDATE SET A.col1 = B.col1, A.col2 = B.col2
WHEN NOT MATCHED THEN INSERT *
"""
)
```
1. I created a very simple UDF that does not do any transformations and just
returns the same data as the input.
2. When we transform `col2`, and perform `MERGE INTO` on `col1`, nothing
happens and the data will be inserted normally.
3. When we "transform" `col1`, by passing it through the UDF, the `MERGE
INTO` fails with the error:
```
Caused by: java.util.concurrent.ExecutionException:
org.apache.spark.SparkUnsupportedOperationException: Cannot generate code for
expression: return_self(input[0, int, true])
```
Somehow it cannot deal with these UDFs.
Persisting the data, breaking the computation chain is a workaround:
```
# This is a workaround
df = df.withColumn("col1", return_self("col1"))
df.cache()
df.createOrReplaceTempView("tmp")
spark.sql(
f"""
MERGE INTO {namespace}.{table} A USING (SELECT * FROM tmp) B
ON A.col1 = B.col1
WHEN MATCHED THEN UPDATE SET A.col1 = B.col1, A.col2 = B.col2
WHEN NOT MATCHED THEN INSERT *
"""
)
```
The workaround is annoying though. Especially for very large data. Probably
you need to write the data somewhere then.
Does someone have a better idea of what is going on here? Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]