[Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-02 Thread Chloe He

I am attempting to write a streaming pipeline that would consume data from a 
Kafka source, manipulate the data, and then write results to a downstream sink 
(Kafka, Redis, etc). I want to write fully formed SQL instead of using the 
function API that Spark offers. I read a few guides on how to do this and my 
understanding is that I need to create a temp view in order to execute my raw 
SQL queries via spark.sql(). 

However, I’m having trouble defining watermarks on my source. It doesn’t seem 
like there is a way to introduce watermark in the raw SQL that Spark supports, 
so I’m using the .withWatermark() function. However, this watermark does not 
work on the temp view.

Example code:
 "10 seconds”)


window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
FROM json_df
GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
ORDER BY window.start
.format("kafka") \
.option("checkpointLocation", "checkpoint") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "sink") \
This throws
AnalysisException: Append output mode not supported when there are streaming 
aggregations on streaming DataFrames/DataSets without watermark;

If I switch out the SQL query and write it in the function API instead, 
everything seems to work fine.

How can I use .sql() in conjunction with watermarks?


Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-02 Thread Mich Talebzadeh
ok let us take it for a test.

The original code of mine

def fetch_data(self):
schema = StructType() \
 .add("rowkey", StringType()) \
 .add("timestamp", TimestampType()) \
 .add("temperature", IntegerType())
checkpoint_path = "file:///ssd/hduser/avgtemperature/chkpt"

# construct a streaming dataframe 'streamingDataFrame' that
subscribes to topic temperature
streamingDataFrame = self.spark \
.readStream \
.format("kafka") \
config['MDVariables']['bootstrapServers'],) \
config['MDVariables']['schemaRegistryURL']) \
.option("group.id", config['common']['appName']) \
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
config['MDVariables']['rebalanceBackoffMS']) \
config['MDVariables']['zookeeperSessionTimeOutMs']) \
config['MDVariables']['autoCommitIntervalMS']) \
.option("subscribe", "temperature") \
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "earliest") \
.load() \

resultC = streamingDataFrame.select( \
 col("parsed_value.rowkey").alias("rowkey") \
   , col("parsed_value.timestamp").alias("timestamp") \
   , col("parsed_value.temperature").alias("temperature"))

We work out the window and the AVG(temperature) in the window's
timeframe below
This should return back the following Dataframe as struct

 |-- window: struct (nullable = false)
 ||-- start: timestamp (nullable = true)
 ||-- end: timestamp (nullable = true)
 |-- avg(temperature): double (nullable = true)

resultM = resultC. \
 withWatermark("timestamp", "5 minutes"). \
 groupBy(window(resultC.timestamp, "5 minutes", "5
minutes")). \

# We take the above DataFrame and flatten it to get the columns
aliased as "startOfWindowFrame", "endOfWindowFrame" and "AVGTemperature"
resultMF = resultM. \
   select( \
F.col("window.start").alias("startOfWindow") \
  , F.col("window.end").alias("endOfWindow") \

# Kafka producer requires a key, value pair. We generate UUID
key as the unique identifier of Kafka record
uuidUdf= F.udf(lambda : str(uuid.uuid4()),StringType())

We take DataFrame resultMF containing temperature info and
write it to Kafka. The uuid is serialized as a string and used as the key.
We take all the columns of the DataFrame and serialize them as
a JSON string, putting the results in the "value" of the record.
result = resultMF.withColumn("uuid",uuidUdf()) \
 .selectExpr("CAST(uuid AS STRING) AS key",
"to_json(struct(startOfWindow, endOfWindow, AVGTemperature)) AS value") \
 .writeStream \
 .outputMode('complete') \
 .format("kafka") \
config['MDVariables']['bootstrapServers'],) \
 .option("topic", "avgtemperature") \
 .option('checkpointLocation', checkpoint_path) \
 .queryName("avgtemperature") \

except Exception as e:
print(f"""{e}, quitting""")



Now try to use sql for the entire transformation and aggression

#import this and anything else needed
from pyspark.sql.functions import from_json, col, window
from pyspark.sql.types import StructType, StringType,IntegerType,
FloatType, TimestampType

# Define the schema for the JSON data
schema = ... # Replace with your schema definition

# construct a streaming dataframe 'streamingDataFrame' that
subscribes to topic temperature
streamingDataFrame = self.spark \
.readStream \
.format("kafka") \
config['MDVariables']['bootstrapServers'],) \

RE: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-02 Thread Chloe He
Hi Mich,

Thank you so much for your response. I really appreciate your help!

You mentioned "defining the watermark using the withWatermark function on the 
streaming_df before creating the temporary view” - I believe this is what I’m 
doing and it’s not working for me. Here is the exact code snippet that I’m 

>>> streaming_df = session.readStream\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "payment_msg")\
.withWatermark("createTime", "10 seconds")

>>> streaming_df.createOrReplaceTempView("streaming_df”)

>>> spark.sql("""
window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
FROM streaming_df
GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
ORDER BY window.start
  .withWatermark("start", "10 seconds")\
  .format("kafka") \
  .option("checkpointLocation", "checkpoint") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("topic", "sink") \

AnalysisException: Append output mode not supported when there are streaming 
aggregations on streaming DataFrames/DataSets without watermark;
EventTimeWatermark start#37: timestamp, 10 seconds

I’m using pyspark 3.5.1. Please let me know if I missed something. Thanks again!


Mich Talebzadeh,

Mich Talebzadeh,
Re: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-06 Thread 刘唯
This indeed looks like a bug. I will take some time to look into it.

> Mich Talebzadeh,
> Mich Talebzadeh,
Re: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-09 Thread Mich Talebzadeh
interesting. So below should be the corrected code with the suggestion in
the [SPARK-47718] .sql() does not recognize watermark defined upstream -
ASF JIRA (apache.org) 

# Define schema for parsing Kafka messages
schema = StructType([
StructField('createTime', TimestampType(), True),
StructField('orderId', LongType(), True),
StructField('payAmount', DoubleType(), True),
StructField('payPlatform', IntegerType(), True),
StructField('provinceId', IntegerType(), True),

# Read streaming data from Kafka source
streaming_df = session.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "payment_msg") \
.option("startingOffsets", "earliest") \
.load() \
schema).alias("parsed_value")) \
.select("parsed_value.*") \
.withWatermark("createTime", "10 seconds")

# Create temporary view for SQL queries
# Define SQL query with correct window function usage
query = """
*window(start, '1 hour', '30 minutes') as window,*
sum(payAmount) as totalPayAmount
FROM streaming_df
GROUP BY provinceId, window(start, '1 hour', '30 minutes')
ORDER BY window.start

# Write the aggregated results to Kafka sink
stream = session.sql(query) \
.writeStream \
.format("kafka") \
.option("checkpointLocation", "checkpoint") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "sink") \

Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
United Kingdom

   view my Linkedin profile


*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".

