Sorry this is not a bug but essentially a user error. Spark throws a really confusing error and I'm also confused. Please see the reply in the ticket for how to make things correct. https://issues.apache.org/jira/browse/SPARK-47718
刘唯 <z920631...@gmail.com> 于2024年4月6日周六 11:41写道: > This indeed looks like a bug. I will take some time to look into it. > > Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年4月3日周三 01:55写道: > >> >> hm. you are getting below >> >> AnalysisException: Append output mode not supported when there are >> streaming aggregations on streaming DataFrames/DataSets without watermark; >> >> The problem seems to be that you are using the append output mode when >> writing the streaming query results to Kafka. This mode is designed for >> scenarios where you want to append new data to an existing dataset at the >> sink (in this case, the "sink" topic in Kafka). However, your query >> involves a streaming aggregation: group by provinceId, window('createTime', >> '1 hour', '30 minutes'). The problem is that Spark Structured Streaming >> requires a watermark to ensure exactly-once processing when using >> aggregations with append mode. Your code already defines a watermark on the >> "createTime" column with a delay of 10 seconds (withWatermark("createTime", >> "10 seconds")). However, the error message indicates it is missing on the >> start column. Try adding watermark to "start" Column: Modify your code as >> below to include a watermark on the "start" column generated by the >> window function: >> >> from pyspark.sql.functions import col, from_json, explode, window, sum, >> watermark >> >> streaming_df = session.readStream \ >> .format("kafka") \ >> .option("kafka.bootstrap.servers", "localhost:9092") \ >> .option("subscribe", "payment_msg") \ >> .option("startingOffsets", "earliest") \ >> .load() \ >> .select(from_json(col("value").cast("string"), >> schema).alias("parsed_value")) \ >> .select("parsed_value.*") \ >> .withWatermark("createTime", "10 seconds") # Existing watermark on >> createTime >> >> *# Modified section with watermark on 'start' column* >> streaming_df = streaming_df.groupBy( >> col("provinceId"), >> window(col("createTime"), "1 hour", "30 minutes") >> ).agg( >> sum(col("payAmount")).alias("totalPayAmount") >> ).withWatermark(expr("start"), "10 seconds") # Watermark on >> window-generated 'start' >> >> # Rest of the code remains the same >> streaming_df.createOrReplaceTempView("streaming_df") >> >> spark.sql(""" >> SELECT >> window.start, window.end, provinceId, totalPayAmount >> FROM streaming_df >> ORDER BY window.start >> """) \ >> .writeStream \ >> .format("kafka") \ >> .option("checkpointLocation", "checkpoint") \ >> .option("kafka.bootstrap.servers", "localhost:9092") \ >> .option("topic", "sink") \ >> .start() >> >> Try and see how it goes >> >> HTH >> >> Mich Talebzadeh, >> >> Technologist | Solutions Architect | Data Engineer | Generative AI >> >> London >> United Kingdom >> >> >> view my Linkedin profile >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> Disclaimer: The information provided is correct to the best of my >> knowledge but of course cannot be guaranteed . It is essential to note >> that, as with any advice, quote "one test result is worth one-thousand >> expert opinions (Werner Von Braun)". >> >> Mich Talebzadeh, >> Technologist | Solutions Architect | Data Engineer | Generative AI >> London >> United Kingdom >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* The information provided is correct to the best of my >> knowledge but of course cannot be guaranteed . It is essential to note >> that, as with any advice, quote "one test result is worth one-thousand >> expert opinions (Werner >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >> >> >> On Tue, 2 Apr 2024 at 22:43, Chloe He <ch...@voltrondata.com.invalid> >> wrote: >> >>> Hi Mich, >>> >>> Thank you so much for your response. I really appreciate your help! >>> >>> You mentioned "defining the watermark using the withWatermark function >>> on the streaming_df before creating the temporary view” - I believe this is >>> what I’m doing and it’s not working for me. Here is the exact code snippet >>> that I’m running: >>> >>> ``` >>> >>> streaming_df = session.readStream\ >>> .format("kafka")\ >>> .option("kafka.bootstrap.servers", "localhost:9092")\ >>> .option("subscribe", "payment_msg")\ >>> .option("startingOffsets","earliest")\ >>> .load()\ >>> .select(from_json(col("value").cast("string"), >>> schema).alias("parsed_value"))\ >>> .select("parsed_value.*")\ >>> .withWatermark("createTime", "10 seconds") >>> >>> >>> streaming_df.createOrReplaceTempView("streaming_df”) >>> >>> >>> spark.sql(""" >>> SELECT >>> window.start, window.end, provinceId, sum(payAmount) as >>> totalPayAmount >>> FROM streaming_df >>> GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') >>> ORDER BY window.start >>> """)\ >>> .withWatermark("start", "10 seconds")\ >>> .writeStream\ >>> .format("kafka") \ >>> .option("checkpointLocation", "checkpoint") \ >>> .option("kafka.bootstrap.servers", "localhost:9092") \ >>> .option("topic", "sink") \ >>> .start() >>> >>> AnalysisException: Append output mode not supported when there are >>> streaming aggregations on streaming DataFrames/DataSets without watermark; >>> EventTimeWatermark start#37: timestamp, 10 seconds >>> ``` >>> >>> I’m using pyspark 3.5.1. Please let me know if I missed something. >>> Thanks again! >>> >>> Best, >>> Chloe >>> >>> >>> On 2024/04/02 20:32:11 Mich Talebzadeh wrote: >>> > ok let us take it for a test. >>> > >>> > The original code of mine >>> > >>> > def fetch_data(self): >>> > self.sc.setLogLevel("ERROR") >>> > schema = StructType() \ >>> > .add("rowkey", StringType()) \ >>> > .add("timestamp", TimestampType()) \ >>> > .add("temperature", IntegerType()) >>> > checkpoint_path = "file:///ssd/hduser/avgtemperature/chkpt" >>> > try: >>> > >>> > # construct a streaming dataframe 'streamingDataFrame' >>> that >>> > subscribes to topic temperature >>> > streamingDataFrame = self.spark \ >>> > .readStream \ >>> > .format("kafka") \ >>> > .option("kafka.bootstrap.servers", >>> > config['MDVariables']['bootstrapServers'],) \ >>> > .option("schema.registry.url", >>> > config['MDVariables']['schemaRegistryURL']) \ >>> > .option("group.id", config['common']['appName']) \ >>> > .option("zookeeper.connection.timeout.ms", >>> > config['MDVariables']['zookeeperConnectionTimeoutMs']) \ >>> > .option("rebalance.backoff.ms", >>> > config['MDVariables']['rebalanceBackoffMS']) \ >>> > .option("zookeeper.session.timeout.ms", >>> > config['MDVariables']['zookeeperSessionTimeOutMs']) \ >>> > .option("auto.commit.interval.ms", >>> > config['MDVariables']['autoCommitIntervalMS']) \ >>> > .option("subscribe", "temperature") \ >>> > .option("failOnDataLoss", "false") \ >>> > .option("includeHeaders", "true") \ >>> > .option("startingOffsets", "earliest") \ >>> > .load() \ >>> > .select(from_json(col("value").cast("string"), >>> > schema).alias("parsed_value")) >>> > >>> > >>> > resultC = streamingDataFrame.select( \ >>> > col("parsed_value.rowkey").alias("rowkey") \ >>> > , col("parsed_value.timestamp").alias("timestamp") >>> \ >>> > , >>> col("parsed_value.temperature").alias("temperature")) >>> > >>> > """ >>> > We work out the window and the AVG(temperature) in the >>> window's >>> > timeframe below >>> > This should return back the following Dataframe as struct >>> > >>> > root >>> > |-- window: struct (nullable = false) >>> > | |-- start: timestamp (nullable = true) >>> > | |-- end: timestamp (nullable = true) >>> > |-- avg(temperature): double (nullable = true) >>> > >>> > """ >>> > resultM = resultC. \ >>> > withWatermark("timestamp", "5 minutes"). \ >>> > groupBy(window(resultC.timestamp, "5 minutes", "5 >>> > minutes")). \ >>> > avg('temperature') >>> > >>> > # We take the above DataFrame and flatten it to get the >>> columns >>> > aliased as "startOfWindowFrame", "endOfWindowFrame" and >>> "AVGTemperature" >>> > resultMF = resultM. \ >>> > select( \ >>> > >>> F.col("window.start").alias("startOfWindow") \ >>> > , F.col("window.end").alias("endOfWindow") \ >>> > , >>> > F.col("avg(temperature)").alias("AVGTemperature")) >>> > >>> > # Kafka producer requires a key, value pair. We generate >>> UUID >>> > key as the unique identifier of Kafka record >>> > uuidUdf= F.udf(lambda : str(uuid.uuid4()),StringType()) >>> > >>> > """ >>> > We take DataFrame resultMF containing temperature info and >>> > write it to Kafka. The uuid is serialized as a string and used as the >>> key. >>> > We take all the columns of the DataFrame and serialize >>> them as >>> > a JSON string, putting the results in the "value" of the record. >>> > """ >>> > result = resultMF.withColumn("uuid",uuidUdf()) \ >>> > .selectExpr("CAST(uuid AS STRING) AS key", >>> > "to_json(struct(startOfWindow, endOfWindow, AVGTemperature)) AS >>> value") \ >>> > .writeStream \ >>> > .outputMode('complete') \ >>> > .format("kafka") \ >>> > .option("kafka.bootstrap.servers", >>> > config['MDVariables']['bootstrapServers'],) \ >>> > .option("topic", "avgtemperature") \ >>> > .option('checkpointLocation', checkpoint_path) \ >>> > .queryName("avgtemperature") \ >>> > .start() >>> > >>> > except Exception as e: >>> > print(f"""{e}, quitting""") >>> > sys.exit(1) >>> > >>> > #print(result.status) >>> > #print(result.recentProgress) >>> > #print(result.lastProgress) >>> > >>> > result.awaitTermination() >>> > >>> > Now try to use sql for the entire transformation and aggression >>> > >>> > #import this and anything else needed >>> > from pyspark.sql.functions import from_json, col, window >>> > from pyspark.sql.types import StructType, StringType,IntegerType, >>> > FloatType, TimestampType >>> > >>> > >>> > # Define the schema for the JSON data >>> > schema = ... # Replace with your schema definition >>> > >>> > # construct a streaming dataframe 'streamingDataFrame' that >>> > subscribes to topic temperature >>> > streamingDataFrame = self.spark \ >>> > .readStream \ >>> > .format("kafka") \ >>> > .option("kafka.bootstrap.servers", >>> > config['MDVariables']['bootstrapServers'],) \ >>> > .option("schema.registry.url", >>> > config['MDVariables']['schemaRegistryURL']) \ >>> > .option("group.id", config['common']['appName']) \ >>> > .option("zookeeper.connection.timeout.ms", >>> > config['MDVariables']['zookeeperConnectionTimeoutMs']) \ >>> > .option("rebalance.backoff.ms", >>> > config['MDVariables']['rebalanceBackoffMS']) \ >>> > .option("zookeeper.session.timeout.ms", >>> > config['MDVariables']['zookeeperSessionTimeOutMs']) \ >>> > .option("auto.commit.interval.ms", >>> > config['MDVariables']['autoCommitIntervalMS']) \ >>> > .option("subscribe", "temperature") \ >>> > .option("failOnDataLoss", "false") \ >>> > .option("includeHeaders", "true") \ >>> > .option("startingOffsets", "earliest") \ >>> > .load() \ >>> > .select(from_json(col("value").cast("string"), >>> > schema).alias("parsed_value")) >>> > .select("parsed_value.*") >>> > .withWatermark("createTime", "10 seconds")) # Define >>> the >>> > watermark here >>> > >>> > # Create a temporary view from the streaming DataFrame with watermark >>> > streaming_df.createOrReplaceTempView("michboy") >>> > >>> > # Execute SQL queries on the temporary view >>> > result_df = (spark.sql(""" >>> > SELECT >>> > window.start, window.end, provinceId, sum(payAmount) as >>> > totalPayAmount >>> > FROM michboy >>> > GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') >>> > ORDER BY window.start >>> > """) >>> > .writeStream >>> > .format("kafka") >>> > .option("checkpointLocation", "checkpoint") >>> > .option("kafka.bootstrap.servers", "localhost:9092") >>> > .option("topic", "sink") >>> > .start()) >>> > >>> > Note that the watermark is defined using the withWatermark function on >>> the >>> > streaming_df before creating the temporary view (michboy 😀). This >>> way, the >>> > watermark information is correctly propagated to the temporary view, >>> > allowing you to execute SQL queries with window functions and >>> aggregations >>> > on the streaming data. >>> > >>> > Note that by defining the watermark on the streaming DataFrame before >>> > creating the temporary view, Spark will recognize the watermark and >>> allow >>> > streaming aggregations and window operations in your SQL queries. >>> > >>> > HTH >>> > >>> > Mich Talebzadeh, >>> > Technologist | Solutions Architect | Data Engineer | Generative AI >>> > London >>> > United Kingdom >>> > >>> > >>> > view my Linkedin profile >>> > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> > >>> > >>> > https://en.everybodywiki.com/Mich_Talebzadeh >>> > >>> > >>> > >>> > *Disclaimer:* The information provided is correct to the best of my >>> > knowledge but of course cannot be guaranteed . It is essential to note >>> > that, as with any advice, quote "one test result is worth one-thousand >>> > expert opinions (Werner < >>> https://en.wikipedia.org/wiki/Wernher_von_Braun>Von >>> > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>> > >>> > >>> > On Tue, 2 Apr 2024 at 20:24, Chloe He <ch...@voltrondata.com.invalid> >>> wrote: >>> > >>> > > Hello! >>> > > >>> > > I am attempting to write a streaming pipeline that would consume >>> data from >>> > > a Kafka source, manipulate the data, and then write results to a >>> downstream >>> > > sink (Kafka, Redis, etc). I want to write fully formed SQL instead >>> of using >>> > > the function API that Spark offers. I read a few guides on how to do >>> this >>> > > and my understanding is that I need to create a temp view in order to >>> > > execute my raw SQL queries via spark.sql(). >>> > > >>> > > However, I’m having trouble defining watermarks on my source. It >>> doesn’t >>> > > seem like there is a way to introduce watermark in the raw SQL that >>> Spark >>> > > supports, so I’m using the .withWatermark() function. However, this >>> > > watermark does not work on the temp view. >>> > > >>> > > Example code: >>> > > ``` >>> > > streaming_df.select(from_json(col("value").cast("string"), >>> > > >>> schema).alias("parsed_value")).select("parsed_value.*").withWatermark("createTime", >>> > > "10 seconds”) >>> > > >>> > > json_df.createOrReplaceTempView("json_df”) >>> > > >>> > > session.sql(""" >>> > > SELECT >>> > > window.start, window.end, provinceId, sum(payAmount) as >>> totalPayAmount >>> > > FROM json_df >>> > > GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') >>> > > ORDER BY window.start >>> > > """)\ >>> > > .writeStream\ >>> > > .format("kafka") \ >>> > > .option("checkpointLocation", "checkpoint") \ >>> > > .option("kafka.bootstrap.servers", "localhost:9092") \ >>> > > .option("topic", "sink") \ >>> > > .start() >>> > > ``` >>> > > This throws >>> > > ``` >>> > > AnalysisException: Append output mode not supported when there are >>> > > streaming aggregations on streaming DataFrames/DataSets without >>> watermark; >>> > > ``` >>> > > >>> > > If I switch out the SQL query and write it in the function API >>> instead, >>> > > everything seems to work fine. >>> > > >>> > > How can I use .sql() in conjunction with watermarks? >>> > > >>> > > Best, >>> > > Chloe >>> > > >>> > >>> >>