Fwd: 自动回复: Re: [DISCUSS] Show Python code examples first in Spark documentation

2023-02-26 Thread Mich Talebzadeh
Hi,

Can someone disable the below login from spark forums please?

Sounds like someone left this email and we are receiving a spam type
message anytime we respond.

thanks



   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




-- Forwarded message -
From: xyqiao 
Date: Sun, 26 Feb 2023 at 22:42
Subject: 自动回复: Re: [DISCUSS] Show Python code examples first in Spark
documentation
To: Mich Talebzadeh 


这是来自QQ邮箱的假期自动回复邮件。

您好,我最近正在休假中,无法亲自回复您的邮件。我将在假期结束后,尽快给您回复。


[JDBC] [PySpark] Possible bug when comparing incoming data frame from mssql and empty delta table

2023-02-26 Thread lennart

Hello,

I have been working on a small ETL framework for 
pyspark/delta/databricks on my spare time.


It looks like I might have encountered a bug, however I'm not totally 
sure its actually caused by spark itself and not one of the other 
technologies.


The error shows up when using spark sql to compare a incoming data frame 
from jdbc/mssql with a empty delta table.
Spark sends a query to mssql ending in 'WHERE (1)', which apparently is 
invalid syntax and causes exception to be thrown.
Unless reading in parallel no where clause at all should be needed as 
the code is reading all rows from the source.


The error does not happen on databricks 10.4 LTS with spark 3.2.1 but 
from databricks 11.3 LTS with spark 3.3.0 and beyond it shows up.
The error also does not happen with postgresql or mysql, so either the 
resulting sql is valid there or does not contain the extra 'WHERE (1)'.


I have provided some sample pyspark code below that can be used to 
reproduce the error on databricks community edition and a mssql server.
I have written 2 different versions of the sql statment. Both versions 
result in the same error.


If there is some option or other trick that can be used to circumvent 
the error on newer releases I would be grateful to learn about it.
However being able use a single sql statement for this is preferable to 
try to keep it short, idempotent and atomic.


Best regards
Lennart


Code:

# Add _checksum column to beginning of data frame.
def add_checksum_column(df):
from pyspark.sql.functions import concat_ws, md5
return df.select([md5(concat_ws("<|^|>", 
*sorted(df.columns))).alias("_checksum"), "*"])


# Add _key column to beginning of data frame.
def add_key_column(df, key):
if isinstance(key, str):
df = df.select([df[key].alias("_key"), "*"])
elif isinstance(key, list):
from pyspark.sql.functions import concat_ws
df = df.select([concat_ws("-", *key).alias("_key"), "*"])
else:
raise Exception("Invalid key")
return df

# Create Delta table.
def create_table(df, key, target):
if spark.catalog._jcatalog.tableExists(target):
return
from pyspark.sql.functions import current_timestamp, lit
df = add_checksum_column(df)   # (4) 
_checksum
df = add_key_column(df, key)   # (3) 
_key
df = df.select([current_timestamp().alias("_timestamp"), "*"]) # (2) 
_timestamp
df = df.select([lit("I").alias("_operation"), "*"])# (1) 
_operation

df.filter("1=0").write.format("delta") \
.option("delta.autoOptimize.optimizeWrite", "true") \
.option("delta.autoOptimize.autoCompact", "true") \
.saveAsTable(target)

# Capture inserted and updated records from full or partial source data 
frame.

def insert_update(df, key, target, query):
# Prepare source view.
df = add_checksum_column(df)
df = add_key_column(df, key)
df.createOrReplaceTempView("s")
# Prepare target view.
spark.table(target).createOrReplaceTempView("t")
# Insert records.
return spark.sql(query)

query1 = """
INSERT INTO t
SELECT CASE WHEN a._key IS NULL THEN "I" ELSE "U" END AS _operation, 
CURRENT_TIMESTAMP AS _timestamp, s.*

FROM s
LEFT JOIN
(
SELECT t._key, t._checksum
FROM t
INNER JOIN (SELECT _key, MAX(_timestamp) AS m FROM t GROUP BY _key) 
AS m

ON t._key = m._key AND t._timestamp = m.m
WHERE t._operation <> "D"
)
AS a
ON s._key = a._key
WHERE (a._key IS NULL)  -- Insert
OR (s._checksum <> a._checksum) -- Update
"""

query2 = """
INSERT INTO t
SELECT CASE WHEN a._key IS NULL THEN "I" ELSE "U" END AS _operation, 
CURRENT_TIMESTAMP AS _timestamp, s.*

FROM s
LEFT JOIN
(
SELECT _key, _checksum, ROW_NUMBER() OVER (PARTITION BY _key ORDER 
BY _timestamp DESC) AS rn

FROM t
WHERE _operation <> "D"
)
AS a
ON s._key = a._key AND a.rn = 1
WHERE (a._key IS NULL)  -- Insert
OR (s._checksum <> a._checksum) -- Update
"""

host = "mssql.test.com"
port = "1433"
database = "test"
username = "test"
password = "test"
table= "test"
key  = "test_id"
target   = "archive.test"

df = spark.read.jdbc(
properties = 
{"driver":"com.microsoft.sqlserver.jdbc.SQLServerDriver", 
"user":username, "password":password},

url = f"jdbc:sqlserver://{host}:{port};databaseName={database}",
table = table
)

create_table(df=df, key=key, target=target)
insert_update(df=df, key=key, target=target, query=query1)



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org