Re: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-09 Thread Mich Talebzadeh
interesting. So below should be the corrected code with the suggestion in
the [SPARK-47718] .sql() does not recognize watermark defined upstream -
ASF JIRA (apache.org) 

# Define schema for parsing Kafka messages
schema = StructType([
StructField('createTime', TimestampType(), True),
StructField('orderId', LongType(), True),
StructField('payAmount', DoubleType(), True),
StructField('payPlatform', IntegerType(), True),
StructField('provinceId', IntegerType(), True),
])

# Read streaming data from Kafka source
streaming_df = session.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "payment_msg") \
.option("startingOffsets", "earliest") \
.load() \
.select(from_json(col("value").cast("string"),
schema).alias("parsed_value")) \
.select("parsed_value.*") \
.withWatermark("createTime", "10 seconds")

# Create temporary view for SQL queries
*streaming_df.createOrReplaceTempView("streaming_df")*
# Define SQL query with correct window function usage
query = """
*SELECT*
*window(start, '1 hour', '30 minutes') as window,*
provinceId,
sum(payAmount) as totalPayAmount
FROM streaming_df
GROUP BY provinceId, window(start, '1 hour', '30 minutes')
ORDER BY window.start
"""

# Write the aggregated results to Kafka sink
stream = session.sql(query) \
.writeStream \
.format("kafka") \
.option("checkpointLocation", "checkpoint") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "sink") \
.start()


Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 9 Apr 2024 at 21:45, 刘唯  wrote:

> Sorry this is not a bug but essentially a user error. Spark throws a
> really confusing error and I'm also confused. Please see the reply in the
> ticket for how to make things correct.
> https://issues.apache.org/jira/browse/SPARK-47718
>
> 刘唯  于2024年4月6日周六 11:41写道:
>
>> This indeed looks like a bug. I will take some time to look into it.
>>
>> Mich Talebzadeh  于2024年4月3日周三 01:55写道:
>>
>>>
>>> hm. you are getting below
>>>
>>> AnalysisException: Append output mode not supported when there are
>>> streaming aggregations on streaming DataFrames/DataSets without watermark;
>>>
>>> The problem seems to be that you are using the append output mode when
>>> writing the streaming query results to Kafka. This mode is designed for
>>> scenarios where you want to append new data to an existing dataset at the
>>> sink (in this case, the "sink" topic in Kafka). However, your query
>>> involves a streaming aggregation: group by provinceId, window('createTime',
>>> '1 hour', '30 minutes'). The problem is that Spark Structured Streaming
>>> requires a watermark to ensure exactly-once processing when using
>>> aggregations with append mode. Your code already defines a watermark on the
>>> "createTime" column with a delay of 10 seconds (withWatermark("createTime",
>>> "10 seconds")). However, the error message indicates it is missing on the
>>> start column. Try adding watermark to "start" Column: Modify your code as
>>> below  to include a watermark on the "start" column generated by the
>>> window function:
>>>
>>> from pyspark.sql.functions import col, from_json, explode, window, sum,
>>> watermark
>>>
>>> streaming_df = session.readStream \
>>>   .format("kafka") \
>>>   .option("kafka.bootstrap.servers", "localhost:9092") \
>>>   .option("subscribe", "payment_msg") \
>>>   .option("startingOffsets", "earliest") \
>>>   .load() \
>>>   .select(from_json(col("value").cast("string"),
>>> schema).alias("parsed_value")) \
>>>   .select("parsed_value.*") \
>>>   .withWatermark("createTime", "10 seconds")  # Existing watermark on
>>> createTime
>>>
>>> *# Modified section with watermark on 'start' column*
>>> streaming_df = streaming_df.groupBy(
>>>   col("provinceId"),
>>>   window(col("createTime"), "1 hour", "30 minutes")
>>> ).agg(
>>>   sum(col("payAmount")).alias("totalPayAmount")
>>> ).withWatermark(expr("start"), "10 seconds")  # Watermark on
>>> window-generated 'start'
>>>
>>> # Rest of the code remains the same
>>> streaming_df.createOrReplaceTempView("streaming_df")
>>>
>>> spark.sql("""
>>> SELECT
>>>   window.start, window.end, provinceId, totalPayAmount
>>> FROM streaming_df
>>> ORDER BY window.start
>>> """) \
>>> .writeStream \
>>> .format("kafka") \
>>> .option("checkpointLocation", 

Re: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-09 Thread 刘唯
Sorry this is not a bug but essentially a user error. Spark throws a really
confusing error and I'm also confused. Please see the reply in the ticket
for how to make things correct.
https://issues.apache.org/jira/browse/SPARK-47718

刘唯  于2024年4月6日周六 11:41写道:

> This indeed looks like a bug. I will take some time to look into it.
>
> Mich Talebzadeh  于2024年4月3日周三 01:55写道:
>
>>
>> hm. you are getting below
>>
>> AnalysisException: Append output mode not supported when there are
>> streaming aggregations on streaming DataFrames/DataSets without watermark;
>>
>> The problem seems to be that you are using the append output mode when
>> writing the streaming query results to Kafka. This mode is designed for
>> scenarios where you want to append new data to an existing dataset at the
>> sink (in this case, the "sink" topic in Kafka). However, your query
>> involves a streaming aggregation: group by provinceId, window('createTime',
>> '1 hour', '30 minutes'). The problem is that Spark Structured Streaming
>> requires a watermark to ensure exactly-once processing when using
>> aggregations with append mode. Your code already defines a watermark on the
>> "createTime" column with a delay of 10 seconds (withWatermark("createTime",
>> "10 seconds")). However, the error message indicates it is missing on the
>> start column. Try adding watermark to "start" Column: Modify your code as
>> below  to include a watermark on the "start" column generated by the
>> window function:
>>
>> from pyspark.sql.functions import col, from_json, explode, window, sum,
>> watermark
>>
>> streaming_df = session.readStream \
>>   .format("kafka") \
>>   .option("kafka.bootstrap.servers", "localhost:9092") \
>>   .option("subscribe", "payment_msg") \
>>   .option("startingOffsets", "earliest") \
>>   .load() \
>>   .select(from_json(col("value").cast("string"),
>> schema).alias("parsed_value")) \
>>   .select("parsed_value.*") \
>>   .withWatermark("createTime", "10 seconds")  # Existing watermark on
>> createTime
>>
>> *# Modified section with watermark on 'start' column*
>> streaming_df = streaming_df.groupBy(
>>   col("provinceId"),
>>   window(col("createTime"), "1 hour", "30 minutes")
>> ).agg(
>>   sum(col("payAmount")).alias("totalPayAmount")
>> ).withWatermark(expr("start"), "10 seconds")  # Watermark on
>> window-generated 'start'
>>
>> # Rest of the code remains the same
>> streaming_df.createOrReplaceTempView("streaming_df")
>>
>> spark.sql("""
>> SELECT
>>   window.start, window.end, provinceId, totalPayAmount
>> FROM streaming_df
>> ORDER BY window.start
>> """) \
>> .writeStream \
>> .format("kafka") \
>> .option("checkpointLocation", "checkpoint") \
>> .option("kafka.bootstrap.servers", "localhost:9092") \
>> .option("topic", "sink") \
>> .start()
>>
>> Try and see how it goes
>>
>> HTH
>>
>> Mich Talebzadeh,
>>
>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>>
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> Disclaimer: The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner Von Braun)".
>>
>> Mich Talebzadeh,
>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Tue, 2 Apr 2024 at 22:43, Chloe He 
>> wrote:
>>
>>> Hi Mich,
>>>
>>> Thank you so much for your response. I really appreciate your help!
>>>
>>> You mentioned "defining the watermark using the withWatermark function
>>> on the streaming_df before creating the temporary view” - I believe this is
>>> what I’m doing and it’s not working for me. Here is the exact code snippet
>>> that I’m running:
>>>
>>> ```
>>> >>> streaming_df = session.readStream\
>>> .format("kafka")\
>>> .option("kafka.bootstrap.servers", "localhost:9092")\
>>> .option("subscribe", "payment_msg")\
>>> .option("startingOffsets","earliest")\
>>> .load()\
>>> .select(from_json(col("value").cast("string"),
>>> schema).alias("parsed_value"))\
>>> .select("parsed_value.*")\
>>> .withWatermark("createTime", "10 seconds")
>>>
>>> >>> streaming_df.createOrReplaceTempView("streaming_df”)
>>>
>>> >>> spark.sql("""
>>> SELECT
>>> window.start, window.end, provinceId, sum(payAmount) as
>>> totalPayAmount
>>> FROM 

Re: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-06 Thread 刘唯
This indeed looks like a bug. I will take some time to look into it.

Mich Talebzadeh  于2024年4月3日周三 01:55写道:

>
> hm. you are getting below
>
> AnalysisException: Append output mode not supported when there are
> streaming aggregations on streaming DataFrames/DataSets without watermark;
>
> The problem seems to be that you are using the append output mode when
> writing the streaming query results to Kafka. This mode is designed for
> scenarios where you want to append new data to an existing dataset at the
> sink (in this case, the "sink" topic in Kafka). However, your query
> involves a streaming aggregation: group by provinceId, window('createTime',
> '1 hour', '30 minutes'). The problem is that Spark Structured Streaming
> requires a watermark to ensure exactly-once processing when using
> aggregations with append mode. Your code already defines a watermark on the
> "createTime" column with a delay of 10 seconds (withWatermark("createTime",
> "10 seconds")). However, the error message indicates it is missing on the
> start column. Try adding watermark to "start" Column: Modify your code as
> below  to include a watermark on the "start" column generated by the
> window function:
>
> from pyspark.sql.functions import col, from_json, explode, window, sum,
> watermark
>
> streaming_df = session.readStream \
>   .format("kafka") \
>   .option("kafka.bootstrap.servers", "localhost:9092") \
>   .option("subscribe", "payment_msg") \
>   .option("startingOffsets", "earliest") \
>   .load() \
>   .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value")) \
>   .select("parsed_value.*") \
>   .withWatermark("createTime", "10 seconds")  # Existing watermark on
> createTime
>
> *# Modified section with watermark on 'start' column*
> streaming_df = streaming_df.groupBy(
>   col("provinceId"),
>   window(col("createTime"), "1 hour", "30 minutes")
> ).agg(
>   sum(col("payAmount")).alias("totalPayAmount")
> ).withWatermark(expr("start"), "10 seconds")  # Watermark on
> window-generated 'start'
>
> # Rest of the code remains the same
> streaming_df.createOrReplaceTempView("streaming_df")
>
> spark.sql("""
> SELECT
>   window.start, window.end, provinceId, totalPayAmount
> FROM streaming_df
> ORDER BY window.start
> """) \
> .writeStream \
> .format("kafka") \
> .option("checkpointLocation", "checkpoint") \
> .option("kafka.bootstrap.servers", "localhost:9092") \
> .option("topic", "sink") \
> .start()
>
> Try and see how it goes
>
> HTH
>
> Mich Talebzadeh,
>
> Technologist | Solutions Architect | Data Engineer  | Generative AI
>
> London
> United Kingdom
>
>
>view my Linkedin profile
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> Disclaimer: The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner Von Braun)".
>
> Mich Talebzadeh,
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  Von
> Braun )".
>
>
> On Tue, 2 Apr 2024 at 22:43, Chloe He 
> wrote:
>
>> Hi Mich,
>>
>> Thank you so much for your response. I really appreciate your help!
>>
>> You mentioned "defining the watermark using the withWatermark function on
>> the streaming_df before creating the temporary view” - I believe this is
>> what I’m doing and it’s not working for me. Here is the exact code snippet
>> that I’m running:
>>
>> ```
>> >>> streaming_df = session.readStream\
>> .format("kafka")\
>> .option("kafka.bootstrap.servers", "localhost:9092")\
>> .option("subscribe", "payment_msg")\
>> .option("startingOffsets","earliest")\
>> .load()\
>> .select(from_json(col("value").cast("string"),
>> schema).alias("parsed_value"))\
>> .select("parsed_value.*")\
>> .withWatermark("createTime", "10 seconds")
>>
>> >>> streaming_df.createOrReplaceTempView("streaming_df”)
>>
>> >>> spark.sql("""
>> SELECT
>> window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
>> FROM streaming_df
>> GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
>> ORDER BY window.start
>> """)\
>>   .withWatermark("start", "10 seconds")\
>>   .writeStream\
>>   .format("kafka") \
>>   .option("checkpointLocation", "checkpoint") \
>>   .option("kafka.bootstrap.servers", "localhost:9092") \
>>   .option("topic", "sink") \
>>   .start()
>>
>> AnalysisException: Append output mode not 

Re: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-02 Thread Mich Talebzadeh
hm. you are getting below

AnalysisException: Append output mode not supported when there are
streaming aggregations on streaming DataFrames/DataSets without watermark;

The problem seems to be that you are using the append output mode when
writing the streaming query results to Kafka. This mode is designed for
scenarios where you want to append new data to an existing dataset at the
sink (in this case, the "sink" topic in Kafka). However, your query
involves a streaming aggregation: group by provinceId, window('createTime',
'1 hour', '30 minutes'). The problem is that Spark Structured Streaming
requires a watermark to ensure exactly-once processing when using
aggregations with append mode. Your code already defines a watermark on the
"createTime" column with a delay of 10 seconds (withWatermark("createTime",
"10 seconds")). However, the error message indicates it is missing on the
start column. Try adding watermark to "start" Column: Modify your code as
below  to include a watermark on the "start" column generated by the window
function:

from pyspark.sql.functions import col, from_json, explode, window, sum,
watermark

streaming_df = session.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "payment_msg") \
  .option("startingOffsets", "earliest") \
  .load() \
  .select(from_json(col("value").cast("string"),
schema).alias("parsed_value")) \
  .select("parsed_value.*") \
  .withWatermark("createTime", "10 seconds")  # Existing watermark on
createTime

*# Modified section with watermark on 'start' column*
streaming_df = streaming_df.groupBy(
  col("provinceId"),
  window(col("createTime"), "1 hour", "30 minutes")
).agg(
  sum(col("payAmount")).alias("totalPayAmount")
).withWatermark(expr("start"), "10 seconds")  # Watermark on
window-generated 'start'

# Rest of the code remains the same
streaming_df.createOrReplaceTempView("streaming_df")

spark.sql("""
SELECT
  window.start, window.end, provinceId, totalPayAmount
FROM streaming_df
ORDER BY window.start
""") \
.writeStream \
.format("kafka") \
.option("checkpointLocation", "checkpoint") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "sink") \
.start()

Try and see how it goes

HTH

Mich Talebzadeh,

Technologist | Solutions Architect | Data Engineer  | Generative AI

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge
but of course cannot be guaranteed . It is essential to note that, as with
any advice, quote "one test result is worth one-thousand expert opinions
(Werner Von Braun)".

Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 2 Apr 2024 at 22:43, Chloe He  wrote:

> Hi Mich,
>
> Thank you so much for your response. I really appreciate your help!
>
> You mentioned "defining the watermark using the withWatermark function on
> the streaming_df before creating the temporary view” - I believe this is
> what I’m doing and it’s not working for me. Here is the exact code snippet
> that I’m running:
>
> ```
> >>> streaming_df = session.readStream\
> .format("kafka")\
> .option("kafka.bootstrap.servers", "localhost:9092")\
> .option("subscribe", "payment_msg")\
> .option("startingOffsets","earliest")\
> .load()\
> .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))\
> .select("parsed_value.*")\
> .withWatermark("createTime", "10 seconds")
>
> >>> streaming_df.createOrReplaceTempView("streaming_df”)
>
> >>> spark.sql("""
> SELECT
> window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
> FROM streaming_df
> GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
> ORDER BY window.start
> """)\
>   .withWatermark("start", "10 seconds")\
>   .writeStream\
>   .format("kafka") \
>   .option("checkpointLocation", "checkpoint") \
>   .option("kafka.bootstrap.servers", "localhost:9092") \
>   .option("topic", "sink") \
>   .start()
>
> AnalysisException: Append output mode not supported when there are
> streaming aggregations on streaming DataFrames/DataSets without watermark;
> EventTimeWatermark start#37: timestamp, 10 seconds
> ```
>
> I’m using pyspark 3.5.1. Please let me know if I missed something. Thanks
> again!
>
> Best,
> Chloe
>
>
> On 2024/04/02 20:32:11 Mich Talebzadeh wrote:
> > ok let us 

RE: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-02 Thread Chloe He
Hi Mich,

Thank you so much for your response. I really appreciate your help!

You mentioned "defining the watermark using the withWatermark function on the 
streaming_df before creating the temporary view” - I believe this is what I’m 
doing and it’s not working for me. Here is the exact code snippet that I’m 
running:

```
>>> streaming_df = session.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "payment_msg")\
.option("startingOffsets","earliest")\
.load()\
.select(from_json(col("value").cast("string"), 
schema).alias("parsed_value"))\
.select("parsed_value.*")\
.withWatermark("createTime", "10 seconds")

>>> streaming_df.createOrReplaceTempView("streaming_df”)

>>> spark.sql("""
SELECT
window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
FROM streaming_df
GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
ORDER BY window.start
""")\
  .withWatermark("start", "10 seconds")\
  .writeStream\
  .format("kafka") \
  .option("checkpointLocation", "checkpoint") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("topic", "sink") \
  .start()

AnalysisException: Append output mode not supported when there are streaming 
aggregations on streaming DataFrames/DataSets without watermark;
EventTimeWatermark start#37: timestamp, 10 seconds
```

I’m using pyspark 3.5.1. Please let me know if I missed something. Thanks again!

Best,
Chloe


On 2024/04/02 20:32:11 Mich Talebzadeh wrote:
> ok let us take it for a test.
> 
> The original code of mine
> 
> def fetch_data(self):
> self.sc.setLogLevel("ERROR")
> schema = StructType() \
>  .add("rowkey", StringType()) \
>  .add("timestamp", TimestampType()) \
>  .add("temperature", IntegerType())
> checkpoint_path = "file:///ssd/hduser/avgtemperature/chkpt"
> try:
> 
> # construct a streaming dataframe 'streamingDataFrame' that
> subscribes to topic temperature
> streamingDataFrame = self.spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
> .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
> .option("group.id", config['common']['appName']) \
> .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
> .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
> .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
> .option("subscribe", "temperature") \
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "earliest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
> 
> 
> resultC = streamingDataFrame.select( \
>  col("parsed_value.rowkey").alias("rowkey") \
>, col("parsed_value.timestamp").alias("timestamp") \
>, col("parsed_value.temperature").alias("temperature"))
> 
> """
> We work out the window and the AVG(temperature) in the window's
> timeframe below
> This should return back the following Dataframe as struct
> 
>  root
>  |-- window: struct (nullable = false)
>  ||-- start: timestamp (nullable = true)
>  ||-- end: timestamp (nullable = true)
>  |-- avg(temperature): double (nullable = true)
> 
> """
> resultM = resultC. \
>  withWatermark("timestamp", "5 minutes"). \
>  groupBy(window(resultC.timestamp, "5 minutes", "5
> minutes")). \
>  avg('temperature')
> 
> # We take the above DataFrame and flatten it to get the columns
> aliased as "startOfWindowFrame", "endOfWindowFrame" and "AVGTemperature"
> resultMF = resultM. \
>select( \
> F.col("window.start").alias("startOfWindow") \
>   , F.col("window.end").alias("endOfWindow") \
>   ,
> F.col("avg(temperature)").alias("AVGTemperature"))
> 
> # Kafka producer requires a key, value pair. We generate UUID
> key as the unique identifier of Kafka record
> uuidUdf= F.udf(lambda : str(uuid.uuid4()),StringType())
> 
> """
> We take DataFrame resultMF containing temperature info and
> write it to Kafka. The uuid is serialized as a