Sorry this is not a bug but essentially a user error. Spark throws a really
confusing error and I'm also confused. Please see the reply in the ticket
for how to make things correct.
https://issues.apache.org/jira/browse/SPARK-47718

刘唯 <z920631...@gmail.com> 于2024年4月6日周六 11:41写道:

> This indeed looks like a bug. I will take some time to look into it.
>
> Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年4月3日周三 01:55写道:
>
>>
>> hm. you are getting below
>>
>> AnalysisException: Append output mode not supported when there are
>> streaming aggregations on streaming DataFrames/DataSets without watermark;
>>
>> The problem seems to be that you are using the append output mode when
>> writing the streaming query results to Kafka. This mode is designed for
>> scenarios where you want to append new data to an existing dataset at the
>> sink (in this case, the "sink" topic in Kafka). However, your query
>> involves a streaming aggregation: group by provinceId, window('createTime',
>> '1 hour', '30 minutes'). The problem is that Spark Structured Streaming
>> requires a watermark to ensure exactly-once processing when using
>> aggregations with append mode. Your code already defines a watermark on the
>> "createTime" column with a delay of 10 seconds (withWatermark("createTime",
>> "10 seconds")). However, the error message indicates it is missing on the
>> start column. Try adding watermark to "start" Column: Modify your code as
>> below  to include a watermark on the "start" column generated by the
>> window function:
>>
>> from pyspark.sql.functions import col, from_json, explode, window, sum,
>> watermark
>>
>> streaming_df = session.readStream \
>>   .format("kafka") \
>>   .option("kafka.bootstrap.servers", "localhost:9092") \
>>   .option("subscribe", "payment_msg") \
>>   .option("startingOffsets", "earliest") \
>>   .load() \
>>   .select(from_json(col("value").cast("string"),
>> schema).alias("parsed_value")) \
>>   .select("parsed_value.*") \
>>   .withWatermark("createTime", "10 seconds")  # Existing watermark on
>> createTime
>>
>> *# Modified section with watermark on 'start' column*
>> streaming_df = streaming_df.groupBy(
>>   col("provinceId"),
>>   window(col("createTime"), "1 hour", "30 minutes")
>> ).agg(
>>   sum(col("payAmount")).alias("totalPayAmount")
>> ).withWatermark(expr("start"), "10 seconds")  # Watermark on
>> window-generated 'start'
>>
>> # Rest of the code remains the same
>> streaming_df.createOrReplaceTempView("streaming_df")
>>
>> spark.sql("""
>> SELECT
>>   window.start, window.end, provinceId, totalPayAmount
>> FROM streaming_df
>> ORDER BY window.start
>> """) \
>> .writeStream \
>> .format("kafka") \
>> .option("checkpointLocation", "checkpoint") \
>> .option("kafka.bootstrap.servers", "localhost:9092") \
>> .option("topic", "sink") \
>> .start()
>>
>> Try and see how it goes
>>
>> HTH
>>
>> Mich Talebzadeh,
>>
>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>>
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> Disclaimer: The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner Von Braun)".
>>
>> Mich Talebzadeh,
>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>
>>
>> On Tue, 2 Apr 2024 at 22:43, Chloe He <ch...@voltrondata.com.invalid>
>> wrote:
>>
>>> Hi Mich,
>>>
>>> Thank you so much for your response. I really appreciate your help!
>>>
>>> You mentioned "defining the watermark using the withWatermark function
>>> on the streaming_df before creating the temporary view” - I believe this is
>>> what I’m doing and it’s not working for me. Here is the exact code snippet
>>> that I’m running:
>>>
>>> ```
>>> >>> streaming_df = session.readStream\
>>>     .format("kafka")\
>>>     .option("kafka.bootstrap.servers", "localhost:9092")\
>>>     .option("subscribe", "payment_msg")\
>>>     .option("startingOffsets","earliest")\
>>>     .load()\
>>>     .select(from_json(col("value").cast("string"),
>>> schema).alias("parsed_value"))\
>>>     .select("parsed_value.*")\
>>>     .withWatermark("createTime", "10 seconds")
>>>
>>> >>> streaming_df.createOrReplaceTempView("streaming_df”)
>>>
>>> >>> spark.sql("""
>>> SELECT
>>>     window.start, window.end, provinceId, sum(payAmount) as
>>> totalPayAmount
>>>     FROM streaming_df
>>>     GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
>>>     ORDER BY window.start
>>> """)\
>>>   .withWatermark("start", "10 seconds")\
>>>   .writeStream\
>>>   .format("kafka") \
>>>   .option("checkpointLocation", "checkpoint") \
>>>   .option("kafka.bootstrap.servers", "localhost:9092") \
>>>   .option("topic", "sink") \
>>>   .start()
>>>
>>> AnalysisException: Append output mode not supported when there are
>>> streaming aggregations on streaming DataFrames/DataSets without watermark;
>>> EventTimeWatermark start#37: timestamp, 10 seconds
>>> ```
>>>
>>> I’m using pyspark 3.5.1. Please let me know if I missed something.
>>> Thanks again!
>>>
>>> Best,
>>> Chloe
>>>
>>>
>>> On 2024/04/02 20:32:11 Mich Talebzadeh wrote:
>>> > ok let us take it for a test.
>>> >
>>> > The original code of mine
>>> >
>>> >     def fetch_data(self):
>>> >         self.sc.setLogLevel("ERROR")
>>> >         schema = StructType() \
>>> >          .add("rowkey", StringType()) \
>>> >          .add("timestamp", TimestampType()) \
>>> >          .add("temperature", IntegerType())
>>> >         checkpoint_path = "file:///ssd/hduser/avgtemperature/chkpt"
>>> >         try:
>>> >
>>> >             # construct a streaming dataframe 'streamingDataFrame'
>>> that
>>> > subscribes to topic temperature
>>> >             streamingDataFrame = self.spark \
>>> >                 .readStream \
>>> >                 .format("kafka") \
>>> >                 .option("kafka.bootstrap.servers",
>>> > config['MDVariables']['bootstrapServers'],) \
>>> >                 .option("schema.registry.url",
>>> > config['MDVariables']['schemaRegistryURL']) \
>>> >                 .option("group.id", config['common']['appName']) \
>>> >                 .option("zookeeper.connection.timeout.ms",
>>> > config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>> >                 .option("rebalance.backoff.ms",
>>> > config['MDVariables']['rebalanceBackoffMS']) \
>>> >                 .option("zookeeper.session.timeout.ms",
>>> > config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>> >                 .option("auto.commit.interval.ms",
>>> > config['MDVariables']['autoCommitIntervalMS']) \
>>> >                 .option("subscribe", "temperature") \
>>> >                 .option("failOnDataLoss", "false") \
>>> >                 .option("includeHeaders", "true") \
>>> >                 .option("startingOffsets", "earliest") \
>>> >                 .load() \
>>> >                 .select(from_json(col("value").cast("string"),
>>> > schema).alias("parsed_value"))
>>> >
>>> >
>>> >             resultC = streamingDataFrame.select( \
>>> >                      col("parsed_value.rowkey").alias("rowkey") \
>>> >                    , col("parsed_value.timestamp").alias("timestamp")
>>> \
>>> >                    ,
>>> col("parsed_value.temperature").alias("temperature"))
>>> >
>>> >             """
>>> >             We work out the window and the AVG(temperature) in the
>>> window's
>>> > timeframe below
>>> >             This should return back the following Dataframe as struct
>>> >
>>> >              root
>>> >              |-- window: struct (nullable = false)
>>> >              |    |-- start: timestamp (nullable = true)
>>> >              |    |-- end: timestamp (nullable = true)
>>> >              |-- avg(temperature): double (nullable = true)
>>> >
>>> >             """
>>> >             resultM = resultC. \
>>> >                      withWatermark("timestamp", "5 minutes"). \
>>> >                      groupBy(window(resultC.timestamp, "5 minutes", "5
>>> > minutes")). \
>>> >                      avg('temperature')
>>> >
>>> >             # We take the above DataFrame and flatten it to get the
>>> columns
>>> > aliased as "startOfWindowFrame", "endOfWindowFrame" and
>>> "AVGTemperature"
>>> >             resultMF = resultM. \
>>> >                        select( \
>>> >
>>> F.col("window.start").alias("startOfWindow") \
>>> >                           , F.col("window.end").alias("endOfWindow") \
>>> >                           ,
>>> > F.col("avg(temperature)").alias("AVGTemperature"))
>>> >
>>> >             # Kafka producer requires a key, value pair. We generate
>>> UUID
>>> > key as the unique identifier of Kafka record
>>> >             uuidUdf= F.udf(lambda : str(uuid.uuid4()),StringType())
>>> >
>>> >             """
>>> >             We take DataFrame resultMF containing temperature info and
>>> > write it to Kafka. The uuid is serialized as a string and used as the
>>> key.
>>> >             We take all the columns of the DataFrame and serialize
>>> them as
>>> > a JSON string, putting the results in the "value" of the record.
>>> >             """
>>> >             result = resultMF.withColumn("uuid",uuidUdf()) \
>>> >                      .selectExpr("CAST(uuid AS STRING) AS key",
>>> > "to_json(struct(startOfWindow, endOfWindow, AVGTemperature)) AS
>>> value") \
>>> >                      .writeStream \
>>> >                      .outputMode('complete') \
>>> >                      .format("kafka") \
>>> >                      .option("kafka.bootstrap.servers",
>>> > config['MDVariables']['bootstrapServers'],) \
>>> >                      .option("topic", "avgtemperature") \
>>> >                      .option('checkpointLocation', checkpoint_path) \
>>> >                      .queryName("avgtemperature") \
>>> >                      .start()
>>> >
>>> >         except Exception as e:
>>> >                 print(f"""{e}, quitting""")
>>> >                 sys.exit(1)
>>> >
>>> >         #print(result.status)
>>> >         #print(result.recentProgress)
>>> >         #print(result.lastProgress)
>>> >
>>> >         result.awaitTermination()
>>> >
>>> > Now try to use sql for the entire transformation and aggression
>>> >
>>> > #import this and anything else needed
>>> > from pyspark.sql.functions import from_json, col, window
>>> > from pyspark.sql.types import StructType, StringType,IntegerType,
>>> > FloatType, TimestampType
>>> >
>>> >
>>> > # Define the schema for the JSON data
>>> > schema = ... # Replace with your schema definition
>>> >
>>> >         # construct a streaming dataframe 'streamingDataFrame' that
>>> > subscribes to topic temperature
>>> >         streamingDataFrame = self.spark \
>>> >                 .readStream \
>>> >                 .format("kafka") \
>>> >                 .option("kafka.bootstrap.servers",
>>> > config['MDVariables']['bootstrapServers'],) \
>>> >                 .option("schema.registry.url",
>>> > config['MDVariables']['schemaRegistryURL']) \
>>> >                 .option("group.id", config['common']['appName']) \
>>> >                 .option("zookeeper.connection.timeout.ms",
>>> > config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>> >                 .option("rebalance.backoff.ms",
>>> > config['MDVariables']['rebalanceBackoffMS']) \
>>> >                 .option("zookeeper.session.timeout.ms",
>>> > config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>> >                 .option("auto.commit.interval.ms",
>>> > config['MDVariables']['autoCommitIntervalMS']) \
>>> >                 .option("subscribe", "temperature") \
>>> >                 .option("failOnDataLoss", "false") \
>>> >                 .option("includeHeaders", "true") \
>>> >                 .option("startingOffsets", "earliest") \
>>> >                 .load() \
>>> >                 .select(from_json(col("value").cast("string"),
>>> > schema).alias("parsed_value"))
>>> >               .select("parsed_value.*")
>>> >               .withWatermark("createTime", "10 seconds"))  # Define
>>> the
>>> > watermark here
>>> >
>>> > # Create a temporary view from the streaming DataFrame with watermark
>>> > streaming_df.createOrReplaceTempView("michboy")
>>> >
>>> > # Execute SQL queries on the temporary view
>>> > result_df = (spark.sql("""
>>> >     SELECT
>>> >         window.start, window.end, provinceId, sum(payAmount) as
>>> > totalPayAmount
>>> >     FROM michboy
>>> >     GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
>>> >     ORDER BY window.start
>>> > """)
>>> >               .writeStream
>>> >               .format("kafka")
>>> >               .option("checkpointLocation", "checkpoint")
>>> >               .option("kafka.bootstrap.servers", "localhost:9092")
>>> >               .option("topic", "sink")
>>> >               .start())
>>> >
>>> > Note that the watermark is defined using the withWatermark function on
>>> the
>>> > streaming_df before creating the temporary view (michboy 😀). This
>>> way, the
>>> > watermark information is correctly propagated to the temporary view,
>>> > allowing you to execute SQL queries with window functions and
>>> aggregations
>>> > on the streaming data.
>>> >
>>> > Note that by defining the watermark on the streaming DataFrame before
>>> > creating the temporary view, Spark will recognize the watermark and
>>> allow
>>> > streaming aggregations and window operations in your SQL queries.
>>> >
>>> > HTH
>>> >
>>> > Mich Talebzadeh,
>>> > Technologist | Solutions Architect | Data Engineer  | Generative AI
>>> > London
>>> > United Kingdom
>>> >
>>> >
>>> >    view my Linkedin profile
>>> > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>> >
>>> >
>>> >  https://en.everybodywiki.com/Mich_Talebzadeh
>>> >
>>> >
>>> >
>>> > *Disclaimer:* The information provided is correct to the best of my
>>> > knowledge but of course cannot be guaranteed . It is essential to note
>>> > that, as with any advice, quote "one test result is worth one-thousand
>>> > expert opinions (Werner  <
>>> https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
>>> > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>> >
>>> >
>>> > On Tue, 2 Apr 2024 at 20:24, Chloe He <ch...@voltrondata.com.invalid>
>>> wrote:
>>> >
>>> > > Hello!
>>> > >
>>> > > I am attempting to write a streaming pipeline that would consume
>>> data from
>>> > > a Kafka source, manipulate the data, and then write results to a
>>> downstream
>>> > > sink (Kafka, Redis, etc). I want to write fully formed SQL instead
>>> of using
>>> > > the function API that Spark offers. I read a few guides on how to do
>>> this
>>> > > and my understanding is that I need to create a temp view in order to
>>> > > execute my raw SQL queries via spark.sql().
>>> > >
>>> > > However, I’m having trouble defining watermarks on my source. It
>>> doesn’t
>>> > > seem like there is a way to introduce watermark in the raw SQL that
>>> Spark
>>> > > supports, so I’m using the .withWatermark() function. However, this
>>> > > watermark does not work on the temp view.
>>> > >
>>> > > Example code:
>>> > > ```
>>> > > streaming_df.select(from_json(col("value").cast("string"),
>>> > >
>>> schema).alias("parsed_value")).select("parsed_value.*").withWatermark("createTime",
>>> > > "10 seconds”)
>>> > >
>>> > > json_df.createOrReplaceTempView("json_df”)
>>> > >
>>> > > session.sql("""
>>> > > SELECT
>>> > > window.start, window.end, provinceId, sum(payAmount) as
>>> totalPayAmount
>>> > > FROM json_df
>>> > > GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
>>> > > ORDER BY window.start
>>> > > """)\
>>> > > .writeStream\
>>> > > .format("kafka") \
>>> > > .option("checkpointLocation", "checkpoint") \
>>> > > .option("kafka.bootstrap.servers", "localhost:9092") \
>>> > > .option("topic", "sink") \
>>> > > .start()
>>> > > ```
>>> > > This throws
>>> > > ```
>>> > > AnalysisException: Append output mode not supported when there are
>>> > > streaming aggregations on streaming DataFrames/DataSets without
>>> watermark;
>>> > > ```
>>> > >
>>> > > If I switch out the SQL query and write it in the function API
>>> instead,
>>> > > everything seems to work fine.
>>> > >
>>> > > How can I use .sql() in conjunction with watermarks?
>>> > >
>>> > > Best,
>>> > > Chloe
>>> > >
>>> >
>>>
>>

Reply via email to