ok let us take it for a test.

The original code of mine

    def fetch_data(self):
        self.sc.setLogLevel("ERROR")
        schema = StructType() \
         .add("rowkey", StringType()) \
         .add("timestamp", TimestampType()) \
         .add("temperature", IntegerType())
        checkpoint_path = "file:///ssd/hduser/avgtemperature/chkpt"
        try:

            # construct a streaming dataframe 'streamingDataFrame' that
subscribes to topic temperature
            streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", "temperature") \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "earliest") \
                .load() \
                .select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))


            resultC = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \
                   , col("parsed_value.timestamp").alias("timestamp") \
                   , col("parsed_value.temperature").alias("temperature"))

            """
            We work out the window and the AVG(temperature) in the window's
timeframe below
            This should return back the following Dataframe as struct

             root
             |-- window: struct (nullable = false)
             |    |-- start: timestamp (nullable = true)
             |    |-- end: timestamp (nullable = true)
             |-- avg(temperature): double (nullable = true)

            """
            resultM = resultC. \
                     withWatermark("timestamp", "5 minutes"). \
                     groupBy(window(resultC.timestamp, "5 minutes", "5
minutes")). \
                     avg('temperature')

            # We take the above DataFrame and flatten it to get the columns
aliased as "startOfWindowFrame", "endOfWindowFrame" and "AVGTemperature"
            resultMF = resultM. \
                       select( \
                            F.col("window.start").alias("startOfWindow") \
                          , F.col("window.end").alias("endOfWindow") \
                          ,
F.col("avg(temperature)").alias("AVGTemperature"))

            # Kafka producer requires a key, value pair. We generate UUID
key as the unique identifier of Kafka record
            uuidUdf= F.udf(lambda : str(uuid.uuid4()),StringType())

            """
            We take DataFrame resultMF containing temperature info and
write it to Kafka. The uuid is serialized as a string and used as the key.
            We take all the columns of the DataFrame and serialize them as
a JSON string, putting the results in the "value" of the record.
            """
            result = resultMF.withColumn("uuid",uuidUdf()) \
                     .selectExpr("CAST(uuid AS STRING) AS key",
"to_json(struct(startOfWindow, endOfWindow, AVGTemperature)) AS value") \
                     .writeStream \
                     .outputMode('complete') \
                     .format("kafka") \
                     .option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
                     .option("topic", "avgtemperature") \
                     .option('checkpointLocation', checkpoint_path) \
                     .queryName("avgtemperature") \
                     .start()

        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)

        #print(result.status)
        #print(result.recentProgress)
        #print(result.lastProgress)

        result.awaitTermination()

Now try to use sql for the entire transformation and aggression

#import this and anything else needed
from pyspark.sql.functions import from_json, col, window
from pyspark.sql.types import StructType, StringType,IntegerType,
FloatType, TimestampType


# Define the schema for the JSON data
schema = ... # Replace with your schema definition

        # construct a streaming dataframe 'streamingDataFrame' that
subscribes to topic temperature
        streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", "temperature") \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "earliest") \
                .load() \
                .select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))
              .select("parsed_value.*")
              .withWatermark("createTime", "10 seconds"))  # Define the
watermark here

# Create a temporary view from the streaming DataFrame with watermark
streaming_df.createOrReplaceTempView("michboy")

# Execute SQL queries on the temporary view
result_df = (spark.sql("""
    SELECT
        window.start, window.end, provinceId, sum(payAmount) as
totalPayAmount
    FROM michboy
    GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
    ORDER BY window.start
""")
              .writeStream
              .format("kafka")
              .option("checkpointLocation", "checkpoint")
              .option("kafka.bootstrap.servers", "localhost:9092")
              .option("topic", "sink")
              .start())

Note that the watermark is defined using the withWatermark function on the
streaming_df before creating the temporary view (michboy 😀). This way, the
watermark information is correctly propagated to the temporary view,
allowing you to execute SQL queries with window functions and aggregations
on the streaming data.

Note that by defining the watermark on the streaming DataFrame before
creating the temporary view, Spark will recognize the watermark and allow
streaming aggregations and window operations in your SQL queries.

HTH

Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Tue, 2 Apr 2024 at 20:24, Chloe He <ch...@voltrondata.com.invalid> wrote:

> Hello!
>
> I am attempting to write a streaming pipeline that would consume data from
> a Kafka source, manipulate the data, and then write results to a downstream
> sink (Kafka, Redis, etc). I want to write fully formed SQL instead of using
> the function API that Spark offers. I read a few guides on how to do this
> and my understanding is that I need to create a temp view in order to
> execute my raw SQL queries via spark.sql().
>
> However, I’m having trouble defining watermarks on my source. It doesn’t
> seem like there is a way to introduce watermark in the raw SQL that Spark
> supports, so I’m using the .withWatermark() function. However, this
> watermark does not work on the temp view.
>
> Example code:
> ```
> streaming_df.select(from_json(col("value").cast("string"),
> schema).alias("parsed_value")).select("parsed_value.*").withWatermark("createTime",
> "10 seconds”)
>
> json_df.createOrReplaceTempView("json_df”)
>
> session.sql("""
> SELECT
> window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
> FROM json_df
> GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
> ORDER BY window.start
> """)\
> .writeStream\
> .format("kafka") \
> .option("checkpointLocation", "checkpoint") \
> .option("kafka.bootstrap.servers", "localhost:9092") \
> .option("topic", "sink") \
> .start()
> ```
> This throws
> ```
> AnalysisException: Append output mode not supported when there are
> streaming aggregations on streaming DataFrames/DataSets without watermark;
> ```
>
> If I switch out the SQL query and write it in the function API instead,
> everything seems to work fine.
>
> How can I use .sql() in conjunction with watermarks?
>
> Best,
> Chloe
>

Reply via email to