interesting. So below should be the corrected code with the suggestion in
the [SPARK-47718] .sql() does not recognize watermark defined upstream -
ASF JIRA (apache.org) <https://issues.apache.org/jira/browse/SPARK-47718>

# Define schema for parsing Kafka messages
schema = StructType([
    StructField('createTime', TimestampType(), True),
    StructField('orderId', LongType(), True),
    StructField('payAmount', DoubleType(), True),
    StructField('payPlatform', IntegerType(), True),
    StructField('provinceId', IntegerType(), True),
])

# Read streaming data from Kafka source
streaming_df = session.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "payment_msg") \
    .option("startingOffsets", "earliest") \
    .load() \
    .select(from_json(col("value").cast("string"),
schema).alias("parsed_value")) \
    .select("parsed_value.*") \
    .withWatermark("createTime", "10 seconds")

# Create temporary view for SQL queries
*streaming_df.createOrReplaceTempView("streaming_df")*
# Define SQL query with correct window function usage
query = """
*SELECT*
*    window(start, '1 hour', '30 minutes') as window,*
    provinceId,
    sum(payAmount) as totalPayAmount
FROM streaming_df
GROUP BY provinceId, window(start, '1 hour', '30 minutes')
ORDER BY window.start
"""

# Write the aggregated results to Kafka sink
stream = session.sql(query) \
    .writeStream \
    .format("kafka") \
    .option("checkpointLocation", "checkpoint") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "sink") \
    .start()


Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Tue, 9 Apr 2024 at 21:45, 刘唯 <z920631...@gmail.com> wrote:

> Sorry this is not a bug but essentially a user error. Spark throws a
> really confusing error and I'm also confused. Please see the reply in the
> ticket for how to make things correct.
> https://issues.apache.org/jira/browse/SPARK-47718
>
> 刘唯 <z920631...@gmail.com> 于2024年4月6日周六 11:41写道:
>
>> This indeed looks like a bug. I will take some time to look into it.
>>
>> Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年4月3日周三 01:55写道:
>>
>>>
>>> hm. you are getting below
>>>
>>> AnalysisException: Append output mode not supported when there are
>>> streaming aggregations on streaming DataFrames/DataSets without watermark;
>>>
>>> The problem seems to be that you are using the append output mode when
>>> writing the streaming query results to Kafka. This mode is designed for
>>> scenarios where you want to append new data to an existing dataset at the
>>> sink (in this case, the "sink" topic in Kafka). However, your query
>>> involves a streaming aggregation: group by provinceId, window('createTime',
>>> '1 hour', '30 minutes'). The problem is that Spark Structured Streaming
>>> requires a watermark to ensure exactly-once processing when using
>>> aggregations with append mode. Your code already defines a watermark on the
>>> "createTime" column with a delay of 10 seconds (withWatermark("createTime",
>>> "10 seconds")). However, the error message indicates it is missing on the
>>> start column. Try adding watermark to "start" Column: Modify your code as
>>> below  to include a watermark on the "start" column generated by the
>>> window function:
>>>
>>> from pyspark.sql.functions import col, from_json, explode, window, sum,
>>> watermark
>>>
>>> streaming_df = session.readStream \
>>>   .format("kafka") \
>>>   .option("kafka.bootstrap.servers", "localhost:9092") \
>>>   .option("subscribe", "payment_msg") \
>>>   .option("startingOffsets", "earliest") \
>>>   .load() \
>>>   .select(from_json(col("value").cast("string"),
>>> schema).alias("parsed_value")) \
>>>   .select("parsed_value.*") \
>>>   .withWatermark("createTime", "10 seconds")  # Existing watermark on
>>> createTime
>>>
>>> *# Modified section with watermark on 'start' column*
>>> streaming_df = streaming_df.groupBy(
>>>   col("provinceId"),
>>>   window(col("createTime"), "1 hour", "30 minutes")
>>> ).agg(
>>>   sum(col("payAmount")).alias("totalPayAmount")
>>> ).withWatermark(expr("start"), "10 seconds")  # Watermark on
>>> window-generated 'start'
>>>
>>> # Rest of the code remains the same
>>> streaming_df.createOrReplaceTempView("streaming_df")
>>>
>>> spark.sql("""
>>> SELECT
>>>   window.start, window.end, provinceId, totalPayAmount
>>> FROM streaming_df
>>> ORDER BY window.start
>>> """) \
>>> .writeStream \
>>> .format("kafka") \
>>> .option("checkpointLocation", "checkpoint") \
>>> .option("kafka.bootstrap.servers", "localhost:9092") \
>>> .option("topic", "sink") \
>>> .start()
>>>
>>> Try and see how it goes
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>>
>>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>>>
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> Disclaimer: The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner Von Braun)".
>>>
>>> Mich Talebzadeh,
>>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>
>>>
>>> On Tue, 2 Apr 2024 at 22:43, Chloe He <ch...@voltrondata.com.invalid>
>>> wrote:
>>>
>>>> Hi Mich,
>>>>
>>>> Thank you so much for your response. I really appreciate your help!
>>>>
>>>> You mentioned "defining the watermark using the withWatermark function
>>>> on the streaming_df before creating the temporary view” - I believe this is
>>>> what I’m doing and it’s not working for me. Here is the exact code snippet
>>>> that I’m running:
>>>>
>>>> ```
>>>> >>> streaming_df = session.readStream\
>>>>     .format("kafka")\
>>>>     .option("kafka.bootstrap.servers", "localhost:9092")\
>>>>     .option("subscribe", "payment_msg")\
>>>>     .option("startingOffsets","earliest")\
>>>>     .load()\
>>>>     .select(from_json(col("value").cast("string"),
>>>> schema).alias("parsed_value"))\
>>>>     .select("parsed_value.*")\
>>>>     .withWatermark("createTime", "10 seconds")
>>>>
>>>> >>> streaming_df.createOrReplaceTempView("streaming_df”)
>>>>
>>>> >>> spark.sql("""
>>>> SELECT
>>>>     window.start, window.end, provinceId, sum(payAmount) as
>>>> totalPayAmount
>>>>     FROM streaming_df
>>>>     GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
>>>>     ORDER BY window.start
>>>> """)\
>>>>   .withWatermark("start", "10 seconds")\
>>>>   .writeStream\
>>>>   .format("kafka") \
>>>>   .option("checkpointLocation", "checkpoint") \
>>>>   .option("kafka.bootstrap.servers", "localhost:9092") \
>>>>   .option("topic", "sink") \
>>>>   .start()
>>>>
>>>> AnalysisException: Append output mode not supported when there are
>>>> streaming aggregations on streaming DataFrames/DataSets without watermark;
>>>> EventTimeWatermark start#37: timestamp, 10 seconds
>>>> ```
>>>>
>>>> I’m using pyspark 3.5.1. Please let me know if I missed something.
>>>> Thanks again!
>>>>
>>>> Best,
>>>> Chloe
>>>>
>>>>
>>>> On 2024/04/02 20:32:11 Mich Talebzadeh wrote:
>>>> > ok let us take it for a test.
>>>> >
>>>> > The original code of mine
>>>> >
>>>> >     def fetch_data(self):
>>>> >         self.sc.setLogLevel("ERROR")
>>>> >         schema = StructType() \
>>>> >          .add("rowkey", StringType()) \
>>>> >          .add("timestamp", TimestampType()) \
>>>> >          .add("temperature", IntegerType())
>>>> >         checkpoint_path = "file:///ssd/hduser/avgtemperature/chkpt"
>>>> >         try:
>>>> >
>>>> >             # construct a streaming dataframe 'streamingDataFrame'
>>>> that
>>>> > subscribes to topic temperature
>>>> >             streamingDataFrame = self.spark \
>>>> >                 .readStream \
>>>> >                 .format("kafka") \
>>>> >                 .option("kafka.bootstrap.servers",
>>>> > config['MDVariables']['bootstrapServers'],) \
>>>> >                 .option("schema.registry.url",
>>>> > config['MDVariables']['schemaRegistryURL']) \
>>>> >                 .option("group.id", config['common']['appName']) \
>>>> >                 .option("zookeeper.connection.timeout.ms",
>>>> > config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>>> >                 .option("rebalance.backoff.ms",
>>>> > config['MDVariables']['rebalanceBackoffMS']) \
>>>> >                 .option("zookeeper.session.timeout.ms",
>>>> > config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>>> >                 .option("auto.commit.interval.ms",
>>>> > config['MDVariables']['autoCommitIntervalMS']) \
>>>> >                 .option("subscribe", "temperature") \
>>>> >                 .option("failOnDataLoss", "false") \
>>>> >                 .option("includeHeaders", "true") \
>>>> >                 .option("startingOffsets", "earliest") \
>>>> >                 .load() \
>>>> >                 .select(from_json(col("value").cast("string"),
>>>> > schema).alias("parsed_value"))
>>>> >
>>>> >
>>>> >             resultC = streamingDataFrame.select( \
>>>> >                      col("parsed_value.rowkey").alias("rowkey") \
>>>> >                    ,
>>>> col("parsed_value.timestamp").alias("timestamp") \
>>>> >                    ,
>>>> col("parsed_value.temperature").alias("temperature"))
>>>> >
>>>> >             """
>>>> >             We work out the window and the AVG(temperature) in the
>>>> window's
>>>> > timeframe below
>>>> >             This should return back the following Dataframe as struct
>>>> >
>>>> >              root
>>>> >              |-- window: struct (nullable = false)
>>>> >              |    |-- start: timestamp (nullable = true)
>>>> >              |    |-- end: timestamp (nullable = true)
>>>> >              |-- avg(temperature): double (nullable = true)
>>>> >
>>>> >             """
>>>> >             resultM = resultC. \
>>>> >                      withWatermark("timestamp", "5 minutes"). \
>>>> >                      groupBy(window(resultC.timestamp, "5 minutes",
>>>> "5
>>>> > minutes")). \
>>>> >                      avg('temperature')
>>>> >
>>>> >             # We take the above DataFrame and flatten it to get the
>>>> columns
>>>> > aliased as "startOfWindowFrame", "endOfWindowFrame" and
>>>> "AVGTemperature"
>>>> >             resultMF = resultM. \
>>>> >                        select( \
>>>> >
>>>> F.col("window.start").alias("startOfWindow") \
>>>> >                           , F.col("window.end").alias("endOfWindow")
>>>> \
>>>> >                           ,
>>>> > F.col("avg(temperature)").alias("AVGTemperature"))
>>>> >
>>>> >             # Kafka producer requires a key, value pair. We generate
>>>> UUID
>>>> > key as the unique identifier of Kafka record
>>>> >             uuidUdf= F.udf(lambda : str(uuid.uuid4()),StringType())
>>>> >
>>>> >             """
>>>> >             We take DataFrame resultMF containing temperature info
>>>> and
>>>> > write it to Kafka. The uuid is serialized as a string and used as the
>>>> key.
>>>> >             We take all the columns of the DataFrame and serialize
>>>> them as
>>>> > a JSON string, putting the results in the "value" of the record.
>>>> >             """
>>>> >             result = resultMF.withColumn("uuid",uuidUdf()) \
>>>> >                      .selectExpr("CAST(uuid AS STRING) AS key",
>>>> > "to_json(struct(startOfWindow, endOfWindow, AVGTemperature)) AS
>>>> value") \
>>>> >                      .writeStream \
>>>> >                      .outputMode('complete') \
>>>> >                      .format("kafka") \
>>>> >                      .option("kafka.bootstrap.servers",
>>>> > config['MDVariables']['bootstrapServers'],) \
>>>> >                      .option("topic", "avgtemperature") \
>>>> >                      .option('checkpointLocation', checkpoint_path) \
>>>> >                      .queryName("avgtemperature") \
>>>> >                      .start()
>>>> >
>>>> >         except Exception as e:
>>>> >                 print(f"""{e}, quitting""")
>>>> >                 sys.exit(1)
>>>> >
>>>> >         #print(result.status)
>>>> >         #print(result.recentProgress)
>>>> >         #print(result.lastProgress)
>>>> >
>>>> >         result.awaitTermination()
>>>> >
>>>> > Now try to use sql for the entire transformation and aggression
>>>> >
>>>> > #import this and anything else needed
>>>> > from pyspark.sql.functions import from_json, col, window
>>>> > from pyspark.sql.types import StructType, StringType,IntegerType,
>>>> > FloatType, TimestampType
>>>> >
>>>> >
>>>> > # Define the schema for the JSON data
>>>> > schema = ... # Replace with your schema definition
>>>> >
>>>> >         # construct a streaming dataframe 'streamingDataFrame' that
>>>> > subscribes to topic temperature
>>>> >         streamingDataFrame = self.spark \
>>>> >                 .readStream \
>>>> >                 .format("kafka") \
>>>> >                 .option("kafka.bootstrap.servers",
>>>> > config['MDVariables']['bootstrapServers'],) \
>>>> >                 .option("schema.registry.url",
>>>> > config['MDVariables']['schemaRegistryURL']) \
>>>> >                 .option("group.id", config['common']['appName']) \
>>>> >                 .option("zookeeper.connection.timeout.ms",
>>>> > config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>>> >                 .option("rebalance.backoff.ms",
>>>> > config['MDVariables']['rebalanceBackoffMS']) \
>>>> >                 .option("zookeeper.session.timeout.ms",
>>>> > config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>>> >                 .option("auto.commit.interval.ms",
>>>> > config['MDVariables']['autoCommitIntervalMS']) \
>>>> >                 .option("subscribe", "temperature") \
>>>> >                 .option("failOnDataLoss", "false") \
>>>> >                 .option("includeHeaders", "true") \
>>>> >                 .option("startingOffsets", "earliest") \
>>>> >                 .load() \
>>>> >                 .select(from_json(col("value").cast("string"),
>>>> > schema).alias("parsed_value"))
>>>> >               .select("parsed_value.*")
>>>> >               .withWatermark("createTime", "10 seconds"))  # Define
>>>> the
>>>> > watermark here
>>>> >
>>>> > # Create a temporary view from the streaming DataFrame with watermark
>>>> > streaming_df.createOrReplaceTempView("michboy")
>>>> >
>>>> > # Execute SQL queries on the temporary view
>>>> > result_df = (spark.sql("""
>>>> >     SELECT
>>>> >         window.start, window.end, provinceId, sum(payAmount) as
>>>> > totalPayAmount
>>>> >     FROM michboy
>>>> >     GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
>>>> >     ORDER BY window.start
>>>> > """)
>>>> >               .writeStream
>>>> >               .format("kafka")
>>>> >               .option("checkpointLocation", "checkpoint")
>>>> >               .option("kafka.bootstrap.servers", "localhost:9092")
>>>> >               .option("topic", "sink")
>>>> >               .start())
>>>> >
>>>> > Note that the watermark is defined using the withWatermark function
>>>> on the
>>>> > streaming_df before creating the temporary view (michboy 😀). This
>>>> way, the
>>>> > watermark information is correctly propagated to the temporary view,
>>>> > allowing you to execute SQL queries with window functions and
>>>> aggregations
>>>> > on the streaming data.
>>>> >
>>>> > Note that by defining the watermark on the streaming DataFrame before
>>>> > creating the temporary view, Spark will recognize the watermark and
>>>> allow
>>>> > streaming aggregations and window operations in your SQL queries.
>>>> >
>>>> > HTH
>>>> >
>>>> > Mich Talebzadeh,
>>>> > Technologist | Solutions Architect | Data Engineer  | Generative AI
>>>> > London
>>>> > United Kingdom
>>>> >
>>>> >
>>>> >    view my Linkedin profile
>>>> > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>> >
>>>> >
>>>> >  https://en.everybodywiki.com/Mich_Talebzadeh
>>>> >
>>>> >
>>>> >
>>>> > *Disclaimer:* The information provided is correct to the best of my
>>>> > knowledge but of course cannot be guaranteed . It is essential to note
>>>> > that, as with any advice, quote "one test result is worth one-thousand
>>>> > expert opinions (Werner  <
>>>> https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
>>>> > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>> >
>>>> >
>>>> > On Tue, 2 Apr 2024 at 20:24, Chloe He <ch...@voltrondata.com.invalid>
>>>> wrote:
>>>> >
>>>> > > Hello!
>>>> > >
>>>> > > I am attempting to write a streaming pipeline that would consume
>>>> data from
>>>> > > a Kafka source, manipulate the data, and then write results to a
>>>> downstream
>>>> > > sink (Kafka, Redis, etc). I want to write fully formed SQL instead
>>>> of using
>>>> > > the function API that Spark offers. I read a few guides on how to
>>>> do this
>>>> > > and my understanding is that I need to create a temp view in order
>>>> to
>>>> > > execute my raw SQL queries via spark.sql().
>>>> > >
>>>> > > However, I’m having trouble defining watermarks on my source. It
>>>> doesn’t
>>>> > > seem like there is a way to introduce watermark in the raw SQL that
>>>> Spark
>>>> > > supports, so I’m using the .withWatermark() function. However, this
>>>> > > watermark does not work on the temp view.
>>>> > >
>>>> > > Example code:
>>>> > > ```
>>>> > > streaming_df.select(from_json(col("value").cast("string"),
>>>> > >
>>>> schema).alias("parsed_value")).select("parsed_value.*").withWatermark("createTime",
>>>> > > "10 seconds”)
>>>> > >
>>>> > > json_df.createOrReplaceTempView("json_df”)
>>>> > >
>>>> > > session.sql("""
>>>> > > SELECT
>>>> > > window.start, window.end, provinceId, sum(payAmount) as
>>>> totalPayAmount
>>>> > > FROM json_df
>>>> > > GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
>>>> > > ORDER BY window.start
>>>> > > """)\
>>>> > > .writeStream\
>>>> > > .format("kafka") \
>>>> > > .option("checkpointLocation", "checkpoint") \
>>>> > > .option("kafka.bootstrap.servers", "localhost:9092") \
>>>> > > .option("topic", "sink") \
>>>> > > .start()
>>>> > > ```
>>>> > > This throws
>>>> > > ```
>>>> > > AnalysisException: Append output mode not supported when there are
>>>> > > streaming aggregations on streaming DataFrames/DataSets without
>>>> watermark;
>>>> > > ```
>>>> > >
>>>> > > If I switch out the SQL query and write it in the function API
>>>> instead,
>>>> > > everything seems to work fine.
>>>> > >
>>>> > > How can I use .sql() in conjunction with watermarks?
>>>> > >
>>>> > > Best,
>>>> > > Chloe
>>>> > >
>>>> >
>>>>
>>>

Reply via email to