Hi Mich,

Thank you so much for your response. I really appreciate your help!

You mentioned "defining the watermark using the withWatermark function on the 
streaming_df before creating the temporary view” - I believe this is what I’m 
doing and it’s not working for me. Here is the exact code snippet that I’m 
running:

```
>>> streaming_df = session.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "payment_msg")\
    .option("startingOffsets","earliest")\
    .load()\
    .select(from_json(col("value").cast("string"), 
schema).alias("parsed_value"))\
    .select("parsed_value.*")\
    .withWatermark("createTime", "10 seconds")

>>> streaming_df.createOrReplaceTempView("streaming_df”)

>>> spark.sql("""
SELECT
    window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
    FROM streaming_df
    GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
    ORDER BY window.start
""")\
  .withWatermark("start", "10 seconds")\
  .writeStream\
  .format("kafka") \
  .option("checkpointLocation", "checkpoint") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("topic", "sink") \
  .start()

AnalysisException: Append output mode not supported when there are streaming 
aggregations on streaming DataFrames/DataSets without watermark;
EventTimeWatermark start#37: timestamp, 10 seconds
```

I’m using pyspark 3.5.1. Please let me know if I missed something. Thanks again!

Best,
Chloe


On 2024/04/02 20:32:11 Mich Talebzadeh wrote:
> ok let us take it for a test.
> 
> The original code of mine
> 
>     def fetch_data(self):
>         self.sc.setLogLevel("ERROR")
>         schema = StructType() \
>          .add("rowkey", StringType()) \
>          .add("timestamp", TimestampType()) \
>          .add("temperature", IntegerType())
>         checkpoint_path = "file:///ssd/hduser/avgtemperature/chkpt"
>         try:
> 
>             # construct a streaming dataframe 'streamingDataFrame' that
> subscribes to topic temperature
>             streamingDataFrame = self.spark \
>                 .readStream \
>                 .format("kafka") \
>                 .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
>                 .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
>                 .option("group.id", config['common']['appName']) \
>                 .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>                 .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
>                 .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>                 .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
>                 .option("subscribe", "temperature") \
>                 .option("failOnDataLoss", "false") \
>                 .option("includeHeaders", "true") \
>                 .option("startingOffsets", "earliest") \
>                 .load() \
>                 .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
> 
> 
>             resultC = streamingDataFrame.select( \
>                      col("parsed_value.rowkey").alias("rowkey") \
>                    , col("parsed_value.timestamp").alias("timestamp") \
>                    , col("parsed_value.temperature").alias("temperature"))
> 
>             """
>             We work out the window and the AVG(temperature) in the window's
> timeframe below
>             This should return back the following Dataframe as struct
> 
>              root
>              |-- window: struct (nullable = false)
>              |    |-- start: timestamp (nullable = true)
>              |    |-- end: timestamp (nullable = true)
>              |-- avg(temperature): double (nullable = true)
> 
>             """
>             resultM = resultC. \
>                      withWatermark("timestamp", "5 minutes"). \
>                      groupBy(window(resultC.timestamp, "5 minutes", "5
> minutes")). \
>                      avg('temperature')
> 
>             # We take the above DataFrame and flatten it to get the columns
> aliased as "startOfWindowFrame", "endOfWindowFrame" and "AVGTemperature"
>             resultMF = resultM. \
>                        select( \
>                             F.col("window.start").alias("startOfWindow") \
>                           , F.col("window.end").alias("endOfWindow") \
>                           ,
> F.col("avg(temperature)").alias("AVGTemperature"))
> 
>             # Kafka producer requires a key, value pair. We generate UUID
> key as the unique identifier of Kafka record
>             uuidUdf= F.udf(lambda : str(uuid.uuid4()),StringType())
> 
>             """
>             We take DataFrame resultMF containing temperature info and
> write it to Kafka. The uuid is serialized as a string and used as the key.
>             We take all the columns of the DataFrame and serialize them as
> a JSON string, putting the results in the "value" of the record.
>             """
>             result = resultMF.withColumn("uuid",uuidUdf()) \
>                      .selectExpr("CAST(uuid AS STRING) AS key",
> "to_json(struct(startOfWindow, endOfWindow, AVGTemperature)) AS value") \
>                      .writeStream \
>                      .outputMode('complete') \
>                      .format("kafka") \
>                      .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
>                      .option("topic", "avgtemperature") \
>                      .option('checkpointLocation', checkpoint_path) \
>                      .queryName("avgtemperature") \
>                      .start()
> 
>         except Exception as e:
>                 print(f"""{e}, quitting""")
>                 sys.exit(1)
> 
>         #print(result.status)
>         #print(result.recentProgress)
>         #print(result.lastProgress)
> 
>         result.awaitTermination()
> 
> Now try to use sql for the entire transformation and aggression
> 
> #import this and anything else needed
> from pyspark.sql.functions import from_json, col, window
> from pyspark.sql.types import StructType, StringType,IntegerType,
> FloatType, TimestampType
> 
> 
> # Define the schema for the JSON data
> schema = ... # Replace with your schema definition
> 
>         # construct a streaming dataframe 'streamingDataFrame' that
> subscribes to topic temperature
>         streamingDataFrame = self.spark \
>                 .readStream \
>                 .format("kafka") \
>                 .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
>                 .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
>                 .option("group.id", config['common']['appName']) \
>                 .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>                 .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
>                 .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>                 .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
>                 .option("subscribe", "temperature") \
>                 .option("failOnDataLoss", "false") \
>                 .option("includeHeaders", "true") \
>                 .option("startingOffsets", "earliest") \
>                 .load() \
>                 .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>               .select("parsed_value.*")
>               .withWatermark("createTime", "10 seconds"))  # Define the
> watermark here
> 
> # Create a temporary view from the streaming DataFrame with watermark
> streaming_df.createOrReplaceTempView("michboy")
> 
> # Execute SQL queries on the temporary view
> result_df = (spark.sql("""
>     SELECT
>         window.start, window.end, provinceId, sum(payAmount) as
> totalPayAmount
>     FROM michboy
>     GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
>     ORDER BY window.start
> """)
>               .writeStream
>               .format("kafka")
>               .option("checkpointLocation", "checkpoint")
>               .option("kafka.bootstrap.servers", "localhost:9092")
>               .option("topic", "sink")
>               .start())
> 
> Note that the watermark is defined using the withWatermark function on the
> streaming_df before creating the temporary view (michboy 😀). This way, the
> watermark information is correctly propagated to the temporary view,
> allowing you to execute SQL queries with window functions and aggregations
> on the streaming data.
> 
> Note that by defining the watermark on the streaming DataFrame before
> creating the temporary view, Spark will recognize the watermark and allow
> streaming aggregations and window operations in your SQL queries.
> 
> HTH
> 
> Mich Talebzadeh,
> Technologist | Solutions Architect | Data Engineer  | Generative AI
> London
> United Kingdom
> 
> 
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
> 
> 
>  https://en.everybodywiki.com/Mich_Talebzadeh
> 
> 
> 
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
> 
> 
> On Tue, 2 Apr 2024 at 20:24, Chloe He <ch...@voltrondata.com.invalid> wrote:
> 
> > Hello!
> >
> > I am attempting to write a streaming pipeline that would consume data from
> > a Kafka source, manipulate the data, and then write results to a downstream
> > sink (Kafka, Redis, etc). I want to write fully formed SQL instead of using
> > the function API that Spark offers. I read a few guides on how to do this
> > and my understanding is that I need to create a temp view in order to
> > execute my raw SQL queries via spark.sql().
> >
> > However, I’m having trouble defining watermarks on my source. It doesn’t
> > seem like there is a way to introduce watermark in the raw SQL that Spark
> > supports, so I’m using the .withWatermark() function. However, this
> > watermark does not work on the temp view.
> >
> > Example code:
> > ```
> > streaming_df.select(from_json(col("value").cast("string"),
> > schema).alias("parsed_value")).select("parsed_value.*").withWatermark("createTime",
> > "10 seconds”)
> >
> > json_df.createOrReplaceTempView("json_df”)
> >
> > session.sql("""
> > SELECT
> > window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
> > FROM json_df
> > GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
> > ORDER BY window.start
> > """)\
> > .writeStream\
> > .format("kafka") \
> > .option("checkpointLocation", "checkpoint") \
> > .option("kafka.bootstrap.servers", "localhost:9092") \
> > .option("topic", "sink") \
> > .start()
> > ```
> > This throws
> > ```
> > AnalysisException: Append output mode not supported when there are
> > streaming aggregations on streaming DataFrames/DataSets without watermark;
> > ```
> >
> > If I switch out the SQL query and write it in the function API instead,
> > everything seems to work fine.
> >
> > How can I use .sql() in conjunction with watermarks?
> >
> > Best,
> > Chloe
> >
> 

Reply via email to