This indeed looks like a bug. I will take some time to look into it.

Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年4月3日周三 01:55写道:

>
> hm. you are getting below
>
> AnalysisException: Append output mode not supported when there are
> streaming aggregations on streaming DataFrames/DataSets without watermark;
>
> The problem seems to be that you are using the append output mode when
> writing the streaming query results to Kafka. This mode is designed for
> scenarios where you want to append new data to an existing dataset at the
> sink (in this case, the "sink" topic in Kafka). However, your query
> involves a streaming aggregation: group by provinceId, window('createTime',
> '1 hour', '30 minutes'). The problem is that Spark Structured Streaming
> requires a watermark to ensure exactly-once processing when using
> aggregations with append mode. Your code already defines a watermark on the
> "createTime" column with a delay of 10 seconds (withWatermark("createTime",
> "10 seconds")). However, the error message indicates it is missing on the
> start column. Try adding watermark to "start" Column: Modify your code as
> below  to include a watermark on the "start" column generated by the
> window function:
>
> from pyspark.sql.functions import col, from_json, explode, window, sum,
> watermark
>
> streaming_df = session.readStream \
>   .format("kafka") \
>   .option("kafka.bootstrap.servers", "localhost:9092") \
>   .option("subscribe", "payment_msg") \
>   .option("startingOffsets", "earliest") \
>   .load() \
>   .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value")) \
>   .select("parsed_value.*") \
>   .withWatermark("createTime", "10 seconds")  # Existing watermark on
> createTime
>
> *# Modified section with watermark on 'start' column*
> streaming_df = streaming_df.groupBy(
>   col("provinceId"),
>   window(col("createTime"), "1 hour", "30 minutes")
> ).agg(
>   sum(col("payAmount")).alias("totalPayAmount")
> ).withWatermark(expr("start"), "10 seconds")  # Watermark on
> window-generated 'start'
>
> # Rest of the code remains the same
> streaming_df.createOrReplaceTempView("streaming_df")
>
> spark.sql("""
> SELECT
>   window.start, window.end, provinceId, totalPayAmount
> FROM streaming_df
> ORDER BY window.start
> """) \
> .writeStream \
> .format("kafka") \
> .option("checkpointLocation", "checkpoint") \
> .option("kafka.bootstrap.servers", "localhost:9092") \
> .option("topic", "sink") \
> .start()
>
> Try and see how it goes
>
> HTH
>
> Mich Talebzadeh,
>
> Technologist | Solutions Architect | Data Engineer  | Generative AI
>
> London
> United Kingdom
>
>
>    view my Linkedin profile
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> Disclaimer: The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner Von Braun)".
>
> Mich Talebzadeh,
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Tue, 2 Apr 2024 at 22:43, Chloe He <ch...@voltrondata.com.invalid>
> wrote:
>
>> Hi Mich,
>>
>> Thank you so much for your response. I really appreciate your help!
>>
>> You mentioned "defining the watermark using the withWatermark function on
>> the streaming_df before creating the temporary view” - I believe this is
>> what I’m doing and it’s not working for me. Here is the exact code snippet
>> that I’m running:
>>
>> ```
>> >>> streaming_df = session.readStream\
>>     .format("kafka")\
>>     .option("kafka.bootstrap.servers", "localhost:9092")\
>>     .option("subscribe", "payment_msg")\
>>     .option("startingOffsets","earliest")\
>>     .load()\
>>     .select(from_json(col("value").cast("string"),
>> schema).alias("parsed_value"))\
>>     .select("parsed_value.*")\
>>     .withWatermark("createTime", "10 seconds")
>>
>> >>> streaming_df.createOrReplaceTempView("streaming_df”)
>>
>> >>> spark.sql("""
>> SELECT
>>     window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
>>     FROM streaming_df
>>     GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
>>     ORDER BY window.start
>> """)\
>>   .withWatermark("start", "10 seconds")\
>>   .writeStream\
>>   .format("kafka") \
>>   .option("checkpointLocation", "checkpoint") \
>>   .option("kafka.bootstrap.servers", "localhost:9092") \
>>   .option("topic", "sink") \
>>   .start()
>>
>> AnalysisException: Append output mode not supported when there are
>> streaming aggregations on streaming DataFrames/DataSets without watermark;
>> EventTimeWatermark start#37: timestamp, 10 seconds
>> ```
>>
>> I’m using pyspark 3.5.1. Please let me know if I missed something. Thanks
>> again!
>>
>> Best,
>> Chloe
>>
>>
>> On 2024/04/02 20:32:11 Mich Talebzadeh wrote:
>> > ok let us take it for a test.
>> >
>> > The original code of mine
>> >
>> >     def fetch_data(self):
>> >         self.sc.setLogLevel("ERROR")
>> >         schema = StructType() \
>> >          .add("rowkey", StringType()) \
>> >          .add("timestamp", TimestampType()) \
>> >          .add("temperature", IntegerType())
>> >         checkpoint_path = "file:///ssd/hduser/avgtemperature/chkpt"
>> >         try:
>> >
>> >             # construct a streaming dataframe 'streamingDataFrame' that
>> > subscribes to topic temperature
>> >             streamingDataFrame = self.spark \
>> >                 .readStream \
>> >                 .format("kafka") \
>> >                 .option("kafka.bootstrap.servers",
>> > config['MDVariables']['bootstrapServers'],) \
>> >                 .option("schema.registry.url",
>> > config['MDVariables']['schemaRegistryURL']) \
>> >                 .option("group.id", config['common']['appName']) \
>> >                 .option("zookeeper.connection.timeout.ms",
>> > config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>> >                 .option("rebalance.backoff.ms",
>> > config['MDVariables']['rebalanceBackoffMS']) \
>> >                 .option("zookeeper.session.timeout.ms",
>> > config['MDVariables']['zookeeperSessionTimeOutMs']) \
>> >                 .option("auto.commit.interval.ms",
>> > config['MDVariables']['autoCommitIntervalMS']) \
>> >                 .option("subscribe", "temperature") \
>> >                 .option("failOnDataLoss", "false") \
>> >                 .option("includeHeaders", "true") \
>> >                 .option("startingOffsets", "earliest") \
>> >                 .load() \
>> >                 .select(from_json(col("value").cast("string"),
>> > schema).alias("parsed_value"))
>> >
>> >
>> >             resultC = streamingDataFrame.select( \
>> >                      col("parsed_value.rowkey").alias("rowkey") \
>> >                    , col("parsed_value.timestamp").alias("timestamp") \
>> >                    ,
>> col("parsed_value.temperature").alias("temperature"))
>> >
>> >             """
>> >             We work out the window and the AVG(temperature) in the
>> window's
>> > timeframe below
>> >             This should return back the following Dataframe as struct
>> >
>> >              root
>> >              |-- window: struct (nullable = false)
>> >              |    |-- start: timestamp (nullable = true)
>> >              |    |-- end: timestamp (nullable = true)
>> >              |-- avg(temperature): double (nullable = true)
>> >
>> >             """
>> >             resultM = resultC. \
>> >                      withWatermark("timestamp", "5 minutes"). \
>> >                      groupBy(window(resultC.timestamp, "5 minutes", "5
>> > minutes")). \
>> >                      avg('temperature')
>> >
>> >             # We take the above DataFrame and flatten it to get the
>> columns
>> > aliased as "startOfWindowFrame", "endOfWindowFrame" and "AVGTemperature"
>> >             resultMF = resultM. \
>> >                        select( \
>> >
>> F.col("window.start").alias("startOfWindow") \
>> >                           , F.col("window.end").alias("endOfWindow") \
>> >                           ,
>> > F.col("avg(temperature)").alias("AVGTemperature"))
>> >
>> >             # Kafka producer requires a key, value pair. We generate
>> UUID
>> > key as the unique identifier of Kafka record
>> >             uuidUdf= F.udf(lambda : str(uuid.uuid4()),StringType())
>> >
>> >             """
>> >             We take DataFrame resultMF containing temperature info and
>> > write it to Kafka. The uuid is serialized as a string and used as the
>> key.
>> >             We take all the columns of the DataFrame and serialize
>> them as
>> > a JSON string, putting the results in the "value" of the record.
>> >             """
>> >             result = resultMF.withColumn("uuid",uuidUdf()) \
>> >                      .selectExpr("CAST(uuid AS STRING) AS key",
>> > "to_json(struct(startOfWindow, endOfWindow, AVGTemperature)) AS value")
>> \
>> >                      .writeStream \
>> >                      .outputMode('complete') \
>> >                      .format("kafka") \
>> >                      .option("kafka.bootstrap.servers",
>> > config['MDVariables']['bootstrapServers'],) \
>> >                      .option("topic", "avgtemperature") \
>> >                      .option('checkpointLocation', checkpoint_path) \
>> >                      .queryName("avgtemperature") \
>> >                      .start()
>> >
>> >         except Exception as e:
>> >                 print(f"""{e}, quitting""")
>> >                 sys.exit(1)
>> >
>> >         #print(result.status)
>> >         #print(result.recentProgress)
>> >         #print(result.lastProgress)
>> >
>> >         result.awaitTermination()
>> >
>> > Now try to use sql for the entire transformation and aggression
>> >
>> > #import this and anything else needed
>> > from pyspark.sql.functions import from_json, col, window
>> > from pyspark.sql.types import StructType, StringType,IntegerType,
>> > FloatType, TimestampType
>> >
>> >
>> > # Define the schema for the JSON data
>> > schema = ... # Replace with your schema definition
>> >
>> >         # construct a streaming dataframe 'streamingDataFrame' that
>> > subscribes to topic temperature
>> >         streamingDataFrame = self.spark \
>> >                 .readStream \
>> >                 .format("kafka") \
>> >                 .option("kafka.bootstrap.servers",
>> > config['MDVariables']['bootstrapServers'],) \
>> >                 .option("schema.registry.url",
>> > config['MDVariables']['schemaRegistryURL']) \
>> >                 .option("group.id", config['common']['appName']) \
>> >                 .option("zookeeper.connection.timeout.ms",
>> > config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>> >                 .option("rebalance.backoff.ms",
>> > config['MDVariables']['rebalanceBackoffMS']) \
>> >                 .option("zookeeper.session.timeout.ms",
>> > config['MDVariables']['zookeeperSessionTimeOutMs']) \
>> >                 .option("auto.commit.interval.ms",
>> > config['MDVariables']['autoCommitIntervalMS']) \
>> >                 .option("subscribe", "temperature") \
>> >                 .option("failOnDataLoss", "false") \
>> >                 .option("includeHeaders", "true") \
>> >                 .option("startingOffsets", "earliest") \
>> >                 .load() \
>> >                 .select(from_json(col("value").cast("string"),
>> > schema).alias("parsed_value"))
>> >               .select("parsed_value.*")
>> >               .withWatermark("createTime", "10 seconds"))  # Define the
>> > watermark here
>> >
>> > # Create a temporary view from the streaming DataFrame with watermark
>> > streaming_df.createOrReplaceTempView("michboy")
>> >
>> > # Execute SQL queries on the temporary view
>> > result_df = (spark.sql("""
>> >     SELECT
>> >         window.start, window.end, provinceId, sum(payAmount) as
>> > totalPayAmount
>> >     FROM michboy
>> >     GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
>> >     ORDER BY window.start
>> > """)
>> >               .writeStream
>> >               .format("kafka")
>> >               .option("checkpointLocation", "checkpoint")
>> >               .option("kafka.bootstrap.servers", "localhost:9092")
>> >               .option("topic", "sink")
>> >               .start())
>> >
>> > Note that the watermark is defined using the withWatermark function on
>> the
>> > streaming_df before creating the temporary view (michboy 😀). This way,
>> the
>> > watermark information is correctly propagated to the temporary view,
>> > allowing you to execute SQL queries with window functions and
>> aggregations
>> > on the streaming data.
>> >
>> > Note that by defining the watermark on the streaming DataFrame before
>> > creating the temporary view, Spark will recognize the watermark and
>> allow
>> > streaming aggregations and window operations in your SQL queries.
>> >
>> > HTH
>> >
>> > Mich Talebzadeh,
>> > Technologist | Solutions Architect | Data Engineer  | Generative AI
>> > London
>> > United Kingdom
>> >
>> >
>> >    view my Linkedin profile
>> > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>> >
>> >
>> >  https://en.everybodywiki.com/Mich_Talebzadeh
>> >
>> >
>> >
>> > *Disclaimer:* The information provided is correct to the best of my
>> > knowledge but of course cannot be guaranteed . It is essential to note
>> > that, as with any advice, quote "one test result is worth one-thousand
>> > expert opinions (Werner  <
>> https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
>> > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>> >
>> >
>> > On Tue, 2 Apr 2024 at 20:24, Chloe He <ch...@voltrondata.com.invalid>
>> wrote:
>> >
>> > > Hello!
>> > >
>> > > I am attempting to write a streaming pipeline that would consume data
>> from
>> > > a Kafka source, manipulate the data, and then write results to a
>> downstream
>> > > sink (Kafka, Redis, etc). I want to write fully formed SQL instead of
>> using
>> > > the function API that Spark offers. I read a few guides on how to do
>> this
>> > > and my understanding is that I need to create a temp view in order to
>> > > execute my raw SQL queries via spark.sql().
>> > >
>> > > However, I’m having trouble defining watermarks on my source. It
>> doesn’t
>> > > seem like there is a way to introduce watermark in the raw SQL that
>> Spark
>> > > supports, so I’m using the .withWatermark() function. However, this
>> > > watermark does not work on the temp view.
>> > >
>> > > Example code:
>> > > ```
>> > > streaming_df.select(from_json(col("value").cast("string"),
>> > >
>> schema).alias("parsed_value")).select("parsed_value.*").withWatermark("createTime",
>> > > "10 seconds”)
>> > >
>> > > json_df.createOrReplaceTempView("json_df”)
>> > >
>> > > session.sql("""
>> > > SELECT
>> > > window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
>> > > FROM json_df
>> > > GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
>> > > ORDER BY window.start
>> > > """)\
>> > > .writeStream\
>> > > .format("kafka") \
>> > > .option("checkpointLocation", "checkpoint") \
>> > > .option("kafka.bootstrap.servers", "localhost:9092") \
>> > > .option("topic", "sink") \
>> > > .start()
>> > > ```
>> > > This throws
>> > > ```
>> > > AnalysisException: Append output mode not supported when there are
>> > > streaming aggregations on streaming DataFrames/DataSets without
>> watermark;
>> > > ```
>> > >
>> > > If I switch out the SQL query and write it in the function API
>> instead,
>> > > everything seems to work fine.
>> > >
>> > > How can I use .sql() in conjunction with watermarks?
>> > >
>> > > Best,
>> > > Chloe
>> > >
>> >
>>
>

Reply via email to