interesting. So below should be the corrected code with the suggestion in the [SPARK-47718] .sql() does not recognize watermark defined upstream - ASF JIRA (apache.org) <https://issues.apache.org/jira/browse/SPARK-47718>
# Define schema for parsing Kafka messages schema = StructType([ StructField('createTime', TimestampType(), True), StructField('orderId', LongType(), True), StructField('payAmount', DoubleType(), True), StructField('payPlatform', IntegerType(), True), StructField('provinceId', IntegerType(), True), ]) # Read streaming data from Kafka source streaming_df = session.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "payment_msg") \ .option("startingOffsets", "earliest") \ .load() \ .select(from_json(col("value").cast("string"), schema).alias("parsed_value")) \ .select("parsed_value.*") \ .withWatermark("createTime", "10 seconds") # Create temporary view for SQL queries *streaming_df.createOrReplaceTempView("streaming_df")* # Define SQL query with correct window function usage query = """ *SELECT* * window(start, '1 hour', '30 minutes') as window,* provinceId, sum(payAmount) as totalPayAmount FROM streaming_df GROUP BY provinceId, window(start, '1 hour', '30 minutes') ORDER BY window.start """ # Write the aggregated results to Kafka sink stream = session.sql(query) \ .writeStream \ .format("kafka") \ .option("checkpointLocation", "checkpoint") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("topic", "sink") \ .start() Mich Talebzadeh, Technologist | Solutions Architect | Data Engineer | Generative AI London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Tue, 9 Apr 2024 at 21:45, 刘唯 <z920631...@gmail.com> wrote: > Sorry this is not a bug but essentially a user error. Spark throws a > really confusing error and I'm also confused. Please see the reply in the > ticket for how to make things correct. > https://issues.apache.org/jira/browse/SPARK-47718 > > 刘唯 <z920631...@gmail.com> 于2024年4月6日周六 11:41写道: > >> This indeed looks like a bug. I will take some time to look into it. >> >> Mich Talebzadeh <mich.talebza...@gmail.com> 于2024年4月3日周三 01:55写道: >> >>> >>> hm. you are getting below >>> >>> AnalysisException: Append output mode not supported when there are >>> streaming aggregations on streaming DataFrames/DataSets without watermark; >>> >>> The problem seems to be that you are using the append output mode when >>> writing the streaming query results to Kafka. This mode is designed for >>> scenarios where you want to append new data to an existing dataset at the >>> sink (in this case, the "sink" topic in Kafka). However, your query >>> involves a streaming aggregation: group by provinceId, window('createTime', >>> '1 hour', '30 minutes'). The problem is that Spark Structured Streaming >>> requires a watermark to ensure exactly-once processing when using >>> aggregations with append mode. Your code already defines a watermark on the >>> "createTime" column with a delay of 10 seconds (withWatermark("createTime", >>> "10 seconds")). However, the error message indicates it is missing on the >>> start column. Try adding watermark to "start" Column: Modify your code as >>> below to include a watermark on the "start" column generated by the >>> window function: >>> >>> from pyspark.sql.functions import col, from_json, explode, window, sum, >>> watermark >>> >>> streaming_df = session.readStream \ >>> .format("kafka") \ >>> .option("kafka.bootstrap.servers", "localhost:9092") \ >>> .option("subscribe", "payment_msg") \ >>> .option("startingOffsets", "earliest") \ >>> .load() \ >>> .select(from_json(col("value").cast("string"), >>> schema).alias("parsed_value")) \ >>> .select("parsed_value.*") \ >>> .withWatermark("createTime", "10 seconds") # Existing watermark on >>> createTime >>> >>> *# Modified section with watermark on 'start' column* >>> streaming_df = streaming_df.groupBy( >>> col("provinceId"), >>> window(col("createTime"), "1 hour", "30 minutes") >>> ).agg( >>> sum(col("payAmount")).alias("totalPayAmount") >>> ).withWatermark(expr("start"), "10 seconds") # Watermark on >>> window-generated 'start' >>> >>> # Rest of the code remains the same >>> streaming_df.createOrReplaceTempView("streaming_df") >>> >>> spark.sql(""" >>> SELECT >>> window.start, window.end, provinceId, totalPayAmount >>> FROM streaming_df >>> ORDER BY window.start >>> """) \ >>> .writeStream \ >>> .format("kafka") \ >>> .option("checkpointLocation", "checkpoint") \ >>> .option("kafka.bootstrap.servers", "localhost:9092") \ >>> .option("topic", "sink") \ >>> .start() >>> >>> Try and see how it goes >>> >>> HTH >>> >>> Mich Talebzadeh, >>> >>> Technologist | Solutions Architect | Data Engineer | Generative AI >>> >>> London >>> United Kingdom >>> >>> >>> view my Linkedin profile >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> Disclaimer: The information provided is correct to the best of my >>> knowledge but of course cannot be guaranteed . It is essential to note >>> that, as with any advice, quote "one test result is worth one-thousand >>> expert opinions (Werner Von Braun)". >>> >>> Mich Talebzadeh, >>> Technologist | Solutions Architect | Data Engineer | Generative AI >>> London >>> United Kingdom >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* The information provided is correct to the best of my >>> knowledge but of course cannot be guaranteed . It is essential to note >>> that, as with any advice, quote "one test result is worth one-thousand >>> expert opinions (Werner >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>> >>> >>> On Tue, 2 Apr 2024 at 22:43, Chloe He <ch...@voltrondata.com.invalid> >>> wrote: >>> >>>> Hi Mich, >>>> >>>> Thank you so much for your response. I really appreciate your help! >>>> >>>> You mentioned "defining the watermark using the withWatermark function >>>> on the streaming_df before creating the temporary view” - I believe this is >>>> what I’m doing and it’s not working for me. Here is the exact code snippet >>>> that I’m running: >>>> >>>> ``` >>>> >>> streaming_df = session.readStream\ >>>> .format("kafka")\ >>>> .option("kafka.bootstrap.servers", "localhost:9092")\ >>>> .option("subscribe", "payment_msg")\ >>>> .option("startingOffsets","earliest")\ >>>> .load()\ >>>> .select(from_json(col("value").cast("string"), >>>> schema).alias("parsed_value"))\ >>>> .select("parsed_value.*")\ >>>> .withWatermark("createTime", "10 seconds") >>>> >>>> >>> streaming_df.createOrReplaceTempView("streaming_df”) >>>> >>>> >>> spark.sql(""" >>>> SELECT >>>> window.start, window.end, provinceId, sum(payAmount) as >>>> totalPayAmount >>>> FROM streaming_df >>>> GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') >>>> ORDER BY window.start >>>> """)\ >>>> .withWatermark("start", "10 seconds")\ >>>> .writeStream\ >>>> .format("kafka") \ >>>> .option("checkpointLocation", "checkpoint") \ >>>> .option("kafka.bootstrap.servers", "localhost:9092") \ >>>> .option("topic", "sink") \ >>>> .start() >>>> >>>> AnalysisException: Append output mode not supported when there are >>>> streaming aggregations on streaming DataFrames/DataSets without watermark; >>>> EventTimeWatermark start#37: timestamp, 10 seconds >>>> ``` >>>> >>>> I’m using pyspark 3.5.1. Please let me know if I missed something. >>>> Thanks again! >>>> >>>> Best, >>>> Chloe >>>> >>>> >>>> On 2024/04/02 20:32:11 Mich Talebzadeh wrote: >>>> > ok let us take it for a test. >>>> > >>>> > The original code of mine >>>> > >>>> > def fetch_data(self): >>>> > self.sc.setLogLevel("ERROR") >>>> > schema = StructType() \ >>>> > .add("rowkey", StringType()) \ >>>> > .add("timestamp", TimestampType()) \ >>>> > .add("temperature", IntegerType()) >>>> > checkpoint_path = "file:///ssd/hduser/avgtemperature/chkpt" >>>> > try: >>>> > >>>> > # construct a streaming dataframe 'streamingDataFrame' >>>> that >>>> > subscribes to topic temperature >>>> > streamingDataFrame = self.spark \ >>>> > .readStream \ >>>> > .format("kafka") \ >>>> > .option("kafka.bootstrap.servers", >>>> > config['MDVariables']['bootstrapServers'],) \ >>>> > .option("schema.registry.url", >>>> > config['MDVariables']['schemaRegistryURL']) \ >>>> > .option("group.id", config['common']['appName']) \ >>>> > .option("zookeeper.connection.timeout.ms", >>>> > config['MDVariables']['zookeeperConnectionTimeoutMs']) \ >>>> > .option("rebalance.backoff.ms", >>>> > config['MDVariables']['rebalanceBackoffMS']) \ >>>> > .option("zookeeper.session.timeout.ms", >>>> > config['MDVariables']['zookeeperSessionTimeOutMs']) \ >>>> > .option("auto.commit.interval.ms", >>>> > config['MDVariables']['autoCommitIntervalMS']) \ >>>> > .option("subscribe", "temperature") \ >>>> > .option("failOnDataLoss", "false") \ >>>> > .option("includeHeaders", "true") \ >>>> > .option("startingOffsets", "earliest") \ >>>> > .load() \ >>>> > .select(from_json(col("value").cast("string"), >>>> > schema).alias("parsed_value")) >>>> > >>>> > >>>> > resultC = streamingDataFrame.select( \ >>>> > col("parsed_value.rowkey").alias("rowkey") \ >>>> > , >>>> col("parsed_value.timestamp").alias("timestamp") \ >>>> > , >>>> col("parsed_value.temperature").alias("temperature")) >>>> > >>>> > """ >>>> > We work out the window and the AVG(temperature) in the >>>> window's >>>> > timeframe below >>>> > This should return back the following Dataframe as struct >>>> > >>>> > root >>>> > |-- window: struct (nullable = false) >>>> > | |-- start: timestamp (nullable = true) >>>> > | |-- end: timestamp (nullable = true) >>>> > |-- avg(temperature): double (nullable = true) >>>> > >>>> > """ >>>> > resultM = resultC. \ >>>> > withWatermark("timestamp", "5 minutes"). \ >>>> > groupBy(window(resultC.timestamp, "5 minutes", >>>> "5 >>>> > minutes")). \ >>>> > avg('temperature') >>>> > >>>> > # We take the above DataFrame and flatten it to get the >>>> columns >>>> > aliased as "startOfWindowFrame", "endOfWindowFrame" and >>>> "AVGTemperature" >>>> > resultMF = resultM. \ >>>> > select( \ >>>> > >>>> F.col("window.start").alias("startOfWindow") \ >>>> > , F.col("window.end").alias("endOfWindow") >>>> \ >>>> > , >>>> > F.col("avg(temperature)").alias("AVGTemperature")) >>>> > >>>> > # Kafka producer requires a key, value pair. We generate >>>> UUID >>>> > key as the unique identifier of Kafka record >>>> > uuidUdf= F.udf(lambda : str(uuid.uuid4()),StringType()) >>>> > >>>> > """ >>>> > We take DataFrame resultMF containing temperature info >>>> and >>>> > write it to Kafka. The uuid is serialized as a string and used as the >>>> key. >>>> > We take all the columns of the DataFrame and serialize >>>> them as >>>> > a JSON string, putting the results in the "value" of the record. >>>> > """ >>>> > result = resultMF.withColumn("uuid",uuidUdf()) \ >>>> > .selectExpr("CAST(uuid AS STRING) AS key", >>>> > "to_json(struct(startOfWindow, endOfWindow, AVGTemperature)) AS >>>> value") \ >>>> > .writeStream \ >>>> > .outputMode('complete') \ >>>> > .format("kafka") \ >>>> > .option("kafka.bootstrap.servers", >>>> > config['MDVariables']['bootstrapServers'],) \ >>>> > .option("topic", "avgtemperature") \ >>>> > .option('checkpointLocation', checkpoint_path) \ >>>> > .queryName("avgtemperature") \ >>>> > .start() >>>> > >>>> > except Exception as e: >>>> > print(f"""{e}, quitting""") >>>> > sys.exit(1) >>>> > >>>> > #print(result.status) >>>> > #print(result.recentProgress) >>>> > #print(result.lastProgress) >>>> > >>>> > result.awaitTermination() >>>> > >>>> > Now try to use sql for the entire transformation and aggression >>>> > >>>> > #import this and anything else needed >>>> > from pyspark.sql.functions import from_json, col, window >>>> > from pyspark.sql.types import StructType, StringType,IntegerType, >>>> > FloatType, TimestampType >>>> > >>>> > >>>> > # Define the schema for the JSON data >>>> > schema = ... # Replace with your schema definition >>>> > >>>> > # construct a streaming dataframe 'streamingDataFrame' that >>>> > subscribes to topic temperature >>>> > streamingDataFrame = self.spark \ >>>> > .readStream \ >>>> > .format("kafka") \ >>>> > .option("kafka.bootstrap.servers", >>>> > config['MDVariables']['bootstrapServers'],) \ >>>> > .option("schema.registry.url", >>>> > config['MDVariables']['schemaRegistryURL']) \ >>>> > .option("group.id", config['common']['appName']) \ >>>> > .option("zookeeper.connection.timeout.ms", >>>> > config['MDVariables']['zookeeperConnectionTimeoutMs']) \ >>>> > .option("rebalance.backoff.ms", >>>> > config['MDVariables']['rebalanceBackoffMS']) \ >>>> > .option("zookeeper.session.timeout.ms", >>>> > config['MDVariables']['zookeeperSessionTimeOutMs']) \ >>>> > .option("auto.commit.interval.ms", >>>> > config['MDVariables']['autoCommitIntervalMS']) \ >>>> > .option("subscribe", "temperature") \ >>>> > .option("failOnDataLoss", "false") \ >>>> > .option("includeHeaders", "true") \ >>>> > .option("startingOffsets", "earliest") \ >>>> > .load() \ >>>> > .select(from_json(col("value").cast("string"), >>>> > schema).alias("parsed_value")) >>>> > .select("parsed_value.*") >>>> > .withWatermark("createTime", "10 seconds")) # Define >>>> the >>>> > watermark here >>>> > >>>> > # Create a temporary view from the streaming DataFrame with watermark >>>> > streaming_df.createOrReplaceTempView("michboy") >>>> > >>>> > # Execute SQL queries on the temporary view >>>> > result_df = (spark.sql(""" >>>> > SELECT >>>> > window.start, window.end, provinceId, sum(payAmount) as >>>> > totalPayAmount >>>> > FROM michboy >>>> > GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') >>>> > ORDER BY window.start >>>> > """) >>>> > .writeStream >>>> > .format("kafka") >>>> > .option("checkpointLocation", "checkpoint") >>>> > .option("kafka.bootstrap.servers", "localhost:9092") >>>> > .option("topic", "sink") >>>> > .start()) >>>> > >>>> > Note that the watermark is defined using the withWatermark function >>>> on the >>>> > streaming_df before creating the temporary view (michboy 😀). This >>>> way, the >>>> > watermark information is correctly propagated to the temporary view, >>>> > allowing you to execute SQL queries with window functions and >>>> aggregations >>>> > on the streaming data. >>>> > >>>> > Note that by defining the watermark on the streaming DataFrame before >>>> > creating the temporary view, Spark will recognize the watermark and >>>> allow >>>> > streaming aggregations and window operations in your SQL queries. >>>> > >>>> > HTH >>>> > >>>> > Mich Talebzadeh, >>>> > Technologist | Solutions Architect | Data Engineer | Generative AI >>>> > London >>>> > United Kingdom >>>> > >>>> > >>>> > view my Linkedin profile >>>> > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> > >>>> > >>>> > https://en.everybodywiki.com/Mich_Talebzadeh >>>> > >>>> > >>>> > >>>> > *Disclaimer:* The information provided is correct to the best of my >>>> > knowledge but of course cannot be guaranteed . It is essential to note >>>> > that, as with any advice, quote "one test result is worth one-thousand >>>> > expert opinions (Werner < >>>> https://en.wikipedia.org/wiki/Wernher_von_Braun>Von >>>> > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>>> > >>>> > >>>> > On Tue, 2 Apr 2024 at 20:24, Chloe He <ch...@voltrondata.com.invalid> >>>> wrote: >>>> > >>>> > > Hello! >>>> > > >>>> > > I am attempting to write a streaming pipeline that would consume >>>> data from >>>> > > a Kafka source, manipulate the data, and then write results to a >>>> downstream >>>> > > sink (Kafka, Redis, etc). I want to write fully formed SQL instead >>>> of using >>>> > > the function API that Spark offers. I read a few guides on how to >>>> do this >>>> > > and my understanding is that I need to create a temp view in order >>>> to >>>> > > execute my raw SQL queries via spark.sql(). >>>> > > >>>> > > However, I’m having trouble defining watermarks on my source. It >>>> doesn’t >>>> > > seem like there is a way to introduce watermark in the raw SQL that >>>> Spark >>>> > > supports, so I’m using the .withWatermark() function. However, this >>>> > > watermark does not work on the temp view. >>>> > > >>>> > > Example code: >>>> > > ``` >>>> > > streaming_df.select(from_json(col("value").cast("string"), >>>> > > >>>> schema).alias("parsed_value")).select("parsed_value.*").withWatermark("createTime", >>>> > > "10 seconds”) >>>> > > >>>> > > json_df.createOrReplaceTempView("json_df”) >>>> > > >>>> > > session.sql(""" >>>> > > SELECT >>>> > > window.start, window.end, provinceId, sum(payAmount) as >>>> totalPayAmount >>>> > > FROM json_df >>>> > > GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') >>>> > > ORDER BY window.start >>>> > > """)\ >>>> > > .writeStream\ >>>> > > .format("kafka") \ >>>> > > .option("checkpointLocation", "checkpoint") \ >>>> > > .option("kafka.bootstrap.servers", "localhost:9092") \ >>>> > > .option("topic", "sink") \ >>>> > > .start() >>>> > > ``` >>>> > > This throws >>>> > > ``` >>>> > > AnalysisException: Append output mode not supported when there are >>>> > > streaming aggregations on streaming DataFrames/DataSets without >>>> watermark; >>>> > > ``` >>>> > > >>>> > > If I switch out the SQL query and write it in the function API >>>> instead, >>>> > > everything seems to work fine. >>>> > > >>>> > > How can I use .sql() in conjunction with watermarks? >>>> > > >>>> > > Best, >>>> > > Chloe >>>> > > >>>> > >>>> >>>