Hi Mich, Thank you so much for your response. I really appreciate your help!
You mentioned "defining the watermark using the withWatermark function on the streaming_df before creating the temporary view” - I believe this is what I’m doing and it’s not working for me. Here is the exact code snippet that I’m running: ``` >>> streaming_df = session.readStream\ .format("kafka")\ .option("kafka.bootstrap.servers", "localhost:9092")\ .option("subscribe", "payment_msg")\ .option("startingOffsets","earliest")\ .load()\ .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))\ .select("parsed_value.*")\ .withWatermark("createTime", "10 seconds") >>> streaming_df.createOrReplaceTempView("streaming_df”) >>> spark.sql(""" SELECT window.start, window.end, provinceId, sum(payAmount) as totalPayAmount FROM streaming_df GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') ORDER BY window.start """)\ .withWatermark("start", "10 seconds")\ .writeStream\ .format("kafka") \ .option("checkpointLocation", "checkpoint") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("topic", "sink") \ .start() AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark; EventTimeWatermark start#37: timestamp, 10 seconds ``` I’m using pyspark 3.5.1. Please let me know if I missed something. Thanks again! Best, Chloe On 2024/04/02 20:32:11 Mich Talebzadeh wrote: > ok let us take it for a test. > > The original code of mine > > def fetch_data(self): > self.sc.setLogLevel("ERROR") > schema = StructType() \ > .add("rowkey", StringType()) \ > .add("timestamp", TimestampType()) \ > .add("temperature", IntegerType()) > checkpoint_path = "file:///ssd/hduser/avgtemperature/chkpt" > try: > > # construct a streaming dataframe 'streamingDataFrame' that > subscribes to topic temperature > streamingDataFrame = self.spark \ > .readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", > config['MDVariables']['bootstrapServers'],) \ > .option("schema.registry.url", > config['MDVariables']['schemaRegistryURL']) \ > .option("group.id", config['common']['appName']) \ > .option("zookeeper.connection.timeout.ms", > config['MDVariables']['zookeeperConnectionTimeoutMs']) \ > .option("rebalance.backoff.ms", > config['MDVariables']['rebalanceBackoffMS']) \ > .option("zookeeper.session.timeout.ms", > config['MDVariables']['zookeeperSessionTimeOutMs']) \ > .option("auto.commit.interval.ms", > config['MDVariables']['autoCommitIntervalMS']) \ > .option("subscribe", "temperature") \ > .option("failOnDataLoss", "false") \ > .option("includeHeaders", "true") \ > .option("startingOffsets", "earliest") \ > .load() \ > .select(from_json(col("value").cast("string"), > schema).alias("parsed_value")) > > > resultC = streamingDataFrame.select( \ > col("parsed_value.rowkey").alias("rowkey") \ > , col("parsed_value.timestamp").alias("timestamp") \ > , col("parsed_value.temperature").alias("temperature")) > > """ > We work out the window and the AVG(temperature) in the window's > timeframe below > This should return back the following Dataframe as struct > > root > |-- window: struct (nullable = false) > | |-- start: timestamp (nullable = true) > | |-- end: timestamp (nullable = true) > |-- avg(temperature): double (nullable = true) > > """ > resultM = resultC. \ > withWatermark("timestamp", "5 minutes"). \ > groupBy(window(resultC.timestamp, "5 minutes", "5 > minutes")). \ > avg('temperature') > > # We take the above DataFrame and flatten it to get the columns > aliased as "startOfWindowFrame", "endOfWindowFrame" and "AVGTemperature" > resultMF = resultM. \ > select( \ > F.col("window.start").alias("startOfWindow") \ > , F.col("window.end").alias("endOfWindow") \ > , > F.col("avg(temperature)").alias("AVGTemperature")) > > # Kafka producer requires a key, value pair. We generate UUID > key as the unique identifier of Kafka record > uuidUdf= F.udf(lambda : str(uuid.uuid4()),StringType()) > > """ > We take DataFrame resultMF containing temperature info and > write it to Kafka. The uuid is serialized as a string and used as the key. > We take all the columns of the DataFrame and serialize them as > a JSON string, putting the results in the "value" of the record. > """ > result = resultMF.withColumn("uuid",uuidUdf()) \ > .selectExpr("CAST(uuid AS STRING) AS key", > "to_json(struct(startOfWindow, endOfWindow, AVGTemperature)) AS value") \ > .writeStream \ > .outputMode('complete') \ > .format("kafka") \ > .option("kafka.bootstrap.servers", > config['MDVariables']['bootstrapServers'],) \ > .option("topic", "avgtemperature") \ > .option('checkpointLocation', checkpoint_path) \ > .queryName("avgtemperature") \ > .start() > > except Exception as e: > print(f"""{e}, quitting""") > sys.exit(1) > > #print(result.status) > #print(result.recentProgress) > #print(result.lastProgress) > > result.awaitTermination() > > Now try to use sql for the entire transformation and aggression > > #import this and anything else needed > from pyspark.sql.functions import from_json, col, window > from pyspark.sql.types import StructType, StringType,IntegerType, > FloatType, TimestampType > > > # Define the schema for the JSON data > schema = ... # Replace with your schema definition > > # construct a streaming dataframe 'streamingDataFrame' that > subscribes to topic temperature > streamingDataFrame = self.spark \ > .readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", > config['MDVariables']['bootstrapServers'],) \ > .option("schema.registry.url", > config['MDVariables']['schemaRegistryURL']) \ > .option("group.id", config['common']['appName']) \ > .option("zookeeper.connection.timeout.ms", > config['MDVariables']['zookeeperConnectionTimeoutMs']) \ > .option("rebalance.backoff.ms", > config['MDVariables']['rebalanceBackoffMS']) \ > .option("zookeeper.session.timeout.ms", > config['MDVariables']['zookeeperSessionTimeOutMs']) \ > .option("auto.commit.interval.ms", > config['MDVariables']['autoCommitIntervalMS']) \ > .option("subscribe", "temperature") \ > .option("failOnDataLoss", "false") \ > .option("includeHeaders", "true") \ > .option("startingOffsets", "earliest") \ > .load() \ > .select(from_json(col("value").cast("string"), > schema).alias("parsed_value")) > .select("parsed_value.*") > .withWatermark("createTime", "10 seconds")) # Define the > watermark here > > # Create a temporary view from the streaming DataFrame with watermark > streaming_df.createOrReplaceTempView("michboy") > > # Execute SQL queries on the temporary view > result_df = (spark.sql(""" > SELECT > window.start, window.end, provinceId, sum(payAmount) as > totalPayAmount > FROM michboy > GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') > ORDER BY window.start > """) > .writeStream > .format("kafka") > .option("checkpointLocation", "checkpoint") > .option("kafka.bootstrap.servers", "localhost:9092") > .option("topic", "sink") > .start()) > > Note that the watermark is defined using the withWatermark function on the > streaming_df before creating the temporary view (michboy 😀). This way, the > watermark information is correctly propagated to the temporary view, > allowing you to execute SQL queries with window functions and aggregations > on the streaming data. > > Note that by defining the watermark on the streaming DataFrame before > creating the temporary view, Spark will recognize the watermark and allow > streaming aggregations and window operations in your SQL queries. > > HTH > > Mich Talebzadeh, > Technologist | Solutions Architect | Data Engineer | Generative AI > London > United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". > > > On Tue, 2 Apr 2024 at 20:24, Chloe He <ch...@voltrondata.com.invalid> wrote: > > > Hello! > > > > I am attempting to write a streaming pipeline that would consume data from > > a Kafka source, manipulate the data, and then write results to a downstream > > sink (Kafka, Redis, etc). I want to write fully formed SQL instead of using > > the function API that Spark offers. I read a few guides on how to do this > > and my understanding is that I need to create a temp view in order to > > execute my raw SQL queries via spark.sql(). > > > > However, I’m having trouble defining watermarks on my source. It doesn’t > > seem like there is a way to introduce watermark in the raw SQL that Spark > > supports, so I’m using the .withWatermark() function. However, this > > watermark does not work on the temp view. > > > > Example code: > > ``` > > streaming_df.select(from_json(col("value").cast("string"), > > schema).alias("parsed_value")).select("parsed_value.*").withWatermark("createTime", > > "10 seconds”) > > > > json_df.createOrReplaceTempView("json_df”) > > > > session.sql(""" > > SELECT > > window.start, window.end, provinceId, sum(payAmount) as totalPayAmount > > FROM json_df > > GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') > > ORDER BY window.start > > """)\ > > .writeStream\ > > .format("kafka") \ > > .option("checkpointLocation", "checkpoint") \ > > .option("kafka.bootstrap.servers", "localhost:9092") \ > > .option("topic", "sink") \ > > .start() > > ``` > > This throws > > ``` > > AnalysisException: Append output mode not supported when there are > > streaming aggregations on streaming DataFrames/DataSets without watermark; > > ``` > > > > If I switch out the SQL query and write it in the function API instead, > > everything seems to work fine. > > > > How can I use .sql() in conjunction with watermarks? > > > > Best, > > Chloe > > >