Hello,
I have been working on a small ETL framework for
pyspark/delta/databricks on my spare time.
It looks like I might have encountered a bug, however I'm not totally
sure its actually caused by spark itself and not one of the other
technologies.
The error shows up when using spark sql to compare a incoming data frame
from jdbc/mssql with a empty delta table.
Spark sends a query to mssql ending in 'WHERE (1)', which apparently is
invalid syntax and causes exception to be thrown.
Unless reading in parallel no where clause at all should be needed as
the code is reading all rows from the source.
The error does not happen on databricks 10.4 LTS with spark 3.2.1 but
from databricks 11.3 LTS with spark 3.3.0 and beyond it shows up.
The error also does not happen with postgresql or mysql, so either the
resulting sql is valid there or does not contain the extra 'WHERE (1)'.
I have provided some sample pyspark code below that can be used to
reproduce the error on databricks community edition and a mssql server.
I have written 2 different versions of the sql statment. Both versions
result in the same error.
If there is some option or other trick that can be used to circumvent
the error on newer releases I would be grateful to learn about it.
However being able use a single sql statement for this is preferable to
try to keep it short, idempotent and atomic.
Best regards
Lennart
Code:
# Add _checksum column to beginning of data frame.
def add_checksum_column(df):
from pyspark.sql.functions import concat_ws, md5
return df.select([md5(concat_ws("<|^|>",
*sorted(df.columns))).alias("_checksum"), "*"])
# Add _key column to beginning of data frame.
def add_key_column(df, key):
if isinstance(key, str):
df = df.select([df[key].alias("_key"), "*"])
elif isinstance(key, list):
from pyspark.sql.functions import concat_ws
df = df.select([concat_ws("-", *key).alias("_key"), "*"])
else:
raise Exception("Invalid key")
return df
# Create Delta table.
def create_table(df, key, target):
if spark.catalog._jcatalog.tableExists(target):
return
from pyspark.sql.functions import current_timestamp, lit
df = add_checksum_column(df) # (4)
_checksum
df = add_key_column(df, key) # (3)
_key
df = df.select([current_timestamp().alias("_timestamp"), "*"]) # (2)
_timestamp
df = df.select([lit("I").alias("_operation"), "*"]) # (1)
_operation
df.filter("1=0").write.format("delta") \
.option("delta.autoOptimize.optimizeWrite", "true") \
.option("delta.autoOptimize.autoCompact", "true") \
.saveAsTable(target)
# Capture inserted and updated records from full or partial source data
frame.
def insert_update(df, key, target, query):
# Prepare source view.
df = add_checksum_column(df)
df = add_key_column(df, key)
df.createOrReplaceTempView("s")
# Prepare target view.
spark.table(target).createOrReplaceTempView("t")
# Insert records.
return spark.sql(query)
query1 = """
INSERT INTO t
SELECT CASE WHEN a._key IS NULL THEN "I" ELSE "U" END AS _operation,
CURRENT_TIMESTAMP AS _timestamp, s.*
FROM s
LEFT JOIN
(
SELECT t._key, t._checksum
FROM t
INNER JOIN (SELECT _key, MAX(_timestamp) AS m FROM t GROUP BY _key)
AS m
ON t._key = m._key AND t._timestamp = m.m
WHERE t._operation <> "D"
)
AS a
ON s._key = a._key
WHERE (a._key IS NULL) -- Insert
OR (s._checksum <> a._checksum) -- Update
"""
query2 = """
INSERT INTO t
SELECT CASE WHEN a._key IS NULL THEN "I" ELSE "U" END AS _operation,
CURRENT_TIMESTAMP AS _timestamp, s.*
FROM s
LEFT JOIN
(
SELECT _key, _checksum, ROW_NUMBER() OVER (PARTITION BY _key ORDER
BY _timestamp DESC) AS rn
FROM t
WHERE _operation <> "D"
)
AS a
ON s._key = a._key AND a.rn = 1
WHERE (a._key IS NULL) -- Insert
OR (s._checksum <> a._checksum) -- Update
"""
host = "mssql.test.com"
port = "1433"
database = "test"
username = "test"
password = "test"
table = "test"
key = "test_id"
target = "archive.test"
df = spark.read.jdbc(
properties =
{"driver":"com.microsoft.sqlserver.jdbc.SQLServerDriver",
"user":username, "password":password},
url = f"jdbc:sqlserver://{host}:{port};databaseName={database}",
table = table
)
create_table(df=df, key=key, target=target)
insert_update(df=df, key=key, target=target, query=query1)
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org