This indeed looks like a bug. I will take some time to look into it. Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年4月3日周三 01:55写道:
> > hm. you are getting below > > AnalysisException: Append output mode not supported when there are > streaming aggregations on streaming DataFrames/DataSets without watermark; > > The problem seems to be that you are using the append output mode when > writing the streaming query results to Kafka. This mode is designed for > scenarios where you want to append new data to an existing dataset at the > sink (in this case, the "sink" topic in Kafka). However, your query > involves a streaming aggregation: group by provinceId, window('createTime', > '1 hour', '30 minutes'). The problem is that Spark Structured Streaming > requires a watermark to ensure exactly-once processing when using > aggregations with append mode. Your code already defines a watermark on the > "createTime" column with a delay of 10 seconds (withWatermark("createTime", > "10 seconds")). However, the error message indicates it is missing on the > start column. Try adding watermark to "start" Column: Modify your code as > below to include a watermark on the "start" column generated by the > window function: > > from pyspark.sql.functions import col, from_json, explode, window, sum, > watermark > > streaming_df = session.readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", "localhost:9092") \ > .option("subscribe", "payment_msg") \ > .option("startingOffsets", "earliest") \ > .load() \ > .select(from_json(col("value").cast("string"), > schema).alias("parsed_value")) \ > .select("parsed_value.*") \ > .withWatermark("createTime", "10 seconds") # Existing watermark on > createTime > > *# Modified section with watermark on 'start' column* > streaming_df = streaming_df.groupBy( > col("provinceId"), > window(col("createTime"), "1 hour", "30 minutes") > ).agg( > sum(col("payAmount")).alias("totalPayAmount") > ).withWatermark(expr("start"), "10 seconds") # Watermark on > window-generated 'start' > > # Rest of the code remains the same > streaming_df.createOrReplaceTempView("streaming_df") > > spark.sql(""" > SELECT > window.start, window.end, provinceId, totalPayAmount > FROM streaming_df > ORDER BY window.start > """) \ > .writeStream \ > .format("kafka") \ > .option("checkpointLocation", "checkpoint") \ > .option("kafka.bootstrap.servers", "localhost:9092") \ > .option("topic", "sink") \ > .start() > > Try and see how it goes > > HTH > > Mich Talebzadeh, > > Technologist | Solutions Architect | Data Engineer | Generative AI > > London > United Kingdom > > > view my Linkedin profile > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > Disclaimer: The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner Von Braun)". > > Mich Talebzadeh, > Technologist | Solutions Architect | Data Engineer | Generative AI > London > United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". > > > On Tue, 2 Apr 2024 at 22:43, Chloe He <ch...@voltrondata.com.invalid> > wrote: > >> Hi Mich, >> >> Thank you so much for your response. I really appreciate your help! >> >> You mentioned "defining the watermark using the withWatermark function on >> the streaming_df before creating the temporary view” - I believe this is >> what I’m doing and it’s not working for me. Here is the exact code snippet >> that I’m running: >> >> ``` >> >>> streaming_df = session.readStream\ >> .format("kafka")\ >> .option("kafka.bootstrap.servers", "localhost:9092")\ >> .option("subscribe", "payment_msg")\ >> .option("startingOffsets","earliest")\ >> .load()\ >> .select(from_json(col("value").cast("string"), >> schema).alias("parsed_value"))\ >> .select("parsed_value.*")\ >> .withWatermark("createTime", "10 seconds") >> >> >>> streaming_df.createOrReplaceTempView("streaming_df”) >> >> >>> spark.sql(""" >> SELECT >> window.start, window.end, provinceId, sum(payAmount) as totalPayAmount >> FROM streaming_df >> GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') >> ORDER BY window.start >> """)\ >> .withWatermark("start", "10 seconds")\ >> .writeStream\ >> .format("kafka") \ >> .option("checkpointLocation", "checkpoint") \ >> .option("kafka.bootstrap.servers", "localhost:9092") \ >> .option("topic", "sink") \ >> .start() >> >> AnalysisException: Append output mode not supported when there are >> streaming aggregations on streaming DataFrames/DataSets without watermark; >> EventTimeWatermark start#37: timestamp, 10 seconds >> ``` >> >> I’m using pyspark 3.5.1. Please let me know if I missed something. Thanks >> again! >> >> Best, >> Chloe >> >> >> On 2024/04/02 20:32:11 Mich Talebzadeh wrote: >> > ok let us take it for a test. >> > >> > The original code of mine >> > >> > def fetch_data(self): >> > self.sc.setLogLevel("ERROR") >> > schema = StructType() \ >> > .add("rowkey", StringType()) \ >> > .add("timestamp", TimestampType()) \ >> > .add("temperature", IntegerType()) >> > checkpoint_path = "file:///ssd/hduser/avgtemperature/chkpt" >> > try: >> > >> > # construct a streaming dataframe 'streamingDataFrame' that >> > subscribes to topic temperature >> > streamingDataFrame = self.spark \ >> > .readStream \ >> > .format("kafka") \ >> > .option("kafka.bootstrap.servers", >> > config['MDVariables']['bootstrapServers'],) \ >> > .option("schema.registry.url", >> > config['MDVariables']['schemaRegistryURL']) \ >> > .option("group.id", config['common']['appName']) \ >> > .option("zookeeper.connection.timeout.ms", >> > config['MDVariables']['zookeeperConnectionTimeoutMs']) \ >> > .option("rebalance.backoff.ms", >> > config['MDVariables']['rebalanceBackoffMS']) \ >> > .option("zookeeper.session.timeout.ms", >> > config['MDVariables']['zookeeperSessionTimeOutMs']) \ >> > .option("auto.commit.interval.ms", >> > config['MDVariables']['autoCommitIntervalMS']) \ >> > .option("subscribe", "temperature") \ >> > .option("failOnDataLoss", "false") \ >> > .option("includeHeaders", "true") \ >> > .option("startingOffsets", "earliest") \ >> > .load() \ >> > .select(from_json(col("value").cast("string"), >> > schema).alias("parsed_value")) >> > >> > >> > resultC = streamingDataFrame.select( \ >> > col("parsed_value.rowkey").alias("rowkey") \ >> > , col("parsed_value.timestamp").alias("timestamp") \ >> > , >> col("parsed_value.temperature").alias("temperature")) >> > >> > """ >> > We work out the window and the AVG(temperature) in the >> window's >> > timeframe below >> > This should return back the following Dataframe as struct >> > >> > root >> > |-- window: struct (nullable = false) >> > | |-- start: timestamp (nullable = true) >> > | |-- end: timestamp (nullable = true) >> > |-- avg(temperature): double (nullable = true) >> > >> > """ >> > resultM = resultC. \ >> > withWatermark("timestamp", "5 minutes"). \ >> > groupBy(window(resultC.timestamp, "5 minutes", "5 >> > minutes")). \ >> > avg('temperature') >> > >> > # We take the above DataFrame and flatten it to get the >> columns >> > aliased as "startOfWindowFrame", "endOfWindowFrame" and "AVGTemperature" >> > resultMF = resultM. \ >> > select( \ >> > >> F.col("window.start").alias("startOfWindow") \ >> > , F.col("window.end").alias("endOfWindow") \ >> > , >> > F.col("avg(temperature)").alias("AVGTemperature")) >> > >> > # Kafka producer requires a key, value pair. We generate >> UUID >> > key as the unique identifier of Kafka record >> > uuidUdf= F.udf(lambda : str(uuid.uuid4()),StringType()) >> > >> > """ >> > We take DataFrame resultMF containing temperature info and >> > write it to Kafka. The uuid is serialized as a string and used as the >> key. >> > We take all the columns of the DataFrame and serialize >> them as >> > a JSON string, putting the results in the "value" of the record. >> > """ >> > result = resultMF.withColumn("uuid",uuidUdf()) \ >> > .selectExpr("CAST(uuid AS STRING) AS key", >> > "to_json(struct(startOfWindow, endOfWindow, AVGTemperature)) AS value") >> \ >> > .writeStream \ >> > .outputMode('complete') \ >> > .format("kafka") \ >> > .option("kafka.bootstrap.servers", >> > config['MDVariables']['bootstrapServers'],) \ >> > .option("topic", "avgtemperature") \ >> > .option('checkpointLocation', checkpoint_path) \ >> > .queryName("avgtemperature") \ >> > .start() >> > >> > except Exception as e: >> > print(f"""{e}, quitting""") >> > sys.exit(1) >> > >> > #print(result.status) >> > #print(result.recentProgress) >> > #print(result.lastProgress) >> > >> > result.awaitTermination() >> > >> > Now try to use sql for the entire transformation and aggression >> > >> > #import this and anything else needed >> > from pyspark.sql.functions import from_json, col, window >> > from pyspark.sql.types import StructType, StringType,IntegerType, >> > FloatType, TimestampType >> > >> > >> > # Define the schema for the JSON data >> > schema = ... # Replace with your schema definition >> > >> > # construct a streaming dataframe 'streamingDataFrame' that >> > subscribes to topic temperature >> > streamingDataFrame = self.spark \ >> > .readStream \ >> > .format("kafka") \ >> > .option("kafka.bootstrap.servers", >> > config['MDVariables']['bootstrapServers'],) \ >> > .option("schema.registry.url", >> > config['MDVariables']['schemaRegistryURL']) \ >> > .option("group.id", config['common']['appName']) \ >> > .option("zookeeper.connection.timeout.ms", >> > config['MDVariables']['zookeeperConnectionTimeoutMs']) \ >> > .option("rebalance.backoff.ms", >> > config['MDVariables']['rebalanceBackoffMS']) \ >> > .option("zookeeper.session.timeout.ms", >> > config['MDVariables']['zookeeperSessionTimeOutMs']) \ >> > .option("auto.commit.interval.ms", >> > config['MDVariables']['autoCommitIntervalMS']) \ >> > .option("subscribe", "temperature") \ >> > .option("failOnDataLoss", "false") \ >> > .option("includeHeaders", "true") \ >> > .option("startingOffsets", "earliest") \ >> > .load() \ >> > .select(from_json(col("value").cast("string"), >> > schema).alias("parsed_value")) >> > .select("parsed_value.*") >> > .withWatermark("createTime", "10 seconds")) # Define the >> > watermark here >> > >> > # Create a temporary view from the streaming DataFrame with watermark >> > streaming_df.createOrReplaceTempView("michboy") >> > >> > # Execute SQL queries on the temporary view >> > result_df = (spark.sql(""" >> > SELECT >> > window.start, window.end, provinceId, sum(payAmount) as >> > totalPayAmount >> > FROM michboy >> > GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') >> > ORDER BY window.start >> > """) >> > .writeStream >> > .format("kafka") >> > .option("checkpointLocation", "checkpoint") >> > .option("kafka.bootstrap.servers", "localhost:9092") >> > .option("topic", "sink") >> > .start()) >> > >> > Note that the watermark is defined using the withWatermark function on >> the >> > streaming_df before creating the temporary view (michboy 😀). This way, >> the >> > watermark information is correctly propagated to the temporary view, >> > allowing you to execute SQL queries with window functions and >> aggregations >> > on the streaming data. >> > >> > Note that by defining the watermark on the streaming DataFrame before >> > creating the temporary view, Spark will recognize the watermark and >> allow >> > streaming aggregations and window operations in your SQL queries. >> > >> > HTH >> > >> > Mich Talebzadeh, >> > Technologist | Solutions Architect | Data Engineer | Generative AI >> > London >> > United Kingdom >> > >> > >> > view my Linkedin profile >> > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> > >> > >> > https://en.everybodywiki.com/Mich_Talebzadeh >> > >> > >> > >> > *Disclaimer:* The information provided is correct to the best of my >> > knowledge but of course cannot be guaranteed . It is essential to note >> > that, as with any advice, quote "one test result is worth one-thousand >> > expert opinions (Werner < >> https://en.wikipedia.org/wiki/Wernher_von_Braun>Von >> > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >> > >> > >> > On Tue, 2 Apr 2024 at 20:24, Chloe He <ch...@voltrondata.com.invalid> >> wrote: >> > >> > > Hello! >> > > >> > > I am attempting to write a streaming pipeline that would consume data >> from >> > > a Kafka source, manipulate the data, and then write results to a >> downstream >> > > sink (Kafka, Redis, etc). I want to write fully formed SQL instead of >> using >> > > the function API that Spark offers. I read a few guides on how to do >> this >> > > and my understanding is that I need to create a temp view in order to >> > > execute my raw SQL queries via spark.sql(). >> > > >> > > However, I’m having trouble defining watermarks on my source. It >> doesn’t >> > > seem like there is a way to introduce watermark in the raw SQL that >> Spark >> > > supports, so I’m using the .withWatermark() function. However, this >> > > watermark does not work on the temp view. >> > > >> > > Example code: >> > > ``` >> > > streaming_df.select(from_json(col("value").cast("string"), >> > > >> schema).alias("parsed_value")).select("parsed_value.*").withWatermark("createTime", >> > > "10 seconds”) >> > > >> > > json_df.createOrReplaceTempView("json_df”) >> > > >> > > session.sql(""" >> > > SELECT >> > > window.start, window.end, provinceId, sum(payAmount) as totalPayAmount >> > > FROM json_df >> > > GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') >> > > ORDER BY window.start >> > > """)\ >> > > .writeStream\ >> > > .format("kafka") \ >> > > .option("checkpointLocation", "checkpoint") \ >> > > .option("kafka.bootstrap.servers", "localhost:9092") \ >> > > .option("topic", "sink") \ >> > > .start() >> > > ``` >> > > This throws >> > > ``` >> > > AnalysisException: Append output mode not supported when there are >> > > streaming aggregations on streaming DataFrames/DataSets without >> watermark; >> > > ``` >> > > >> > > If I switch out the SQL query and write it in the function API >> instead, >> > > everything seems to work fine. >> > > >> > > How can I use .sql() in conjunction with watermarks? >> > > >> > > Best, >> > > Chloe >> > > >> > >> >