Hi Mich, this doesn't seem to be working for me .. the watermark seems to be getting ignored !
Here is the data put into Kafka : ``` +---------------------------------------------------------------------------------------------------+----+ |value |key | +---------------------------------------------------------------------------------------------------+----+ |{"temparature":14,"insert_ts":"2023-03-15T16:04:33.003-07:00","ts":"2023-03-15T15:12:00.000-07:00"}|null| |{"temparature":10,"insert_ts":"2023-03-15T16:05:58.816-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null| |{"temparature":17,"insert_ts":"2023-03-15T16:07:55.222-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null| |{"temparature":6,"insert_ts":"2023-03-15T16:11:41.759-07:00","ts":"2023-03-13T10:12:00.000-07:00"} |null| +---------------------------------------------------------------------------------------------------+----+ ``` Note : insert_ts - specifies when the data was inserted Here is the output of the Structured Stream: ------------------------------------------- Batch: 2 ------------------------------------------- +-------------------+-------------------+---------------+ |startOfWindowFrame |endOfWindowFrame |Sum_Temperature| +-------------------+-------------------+---------------+ |2023-03-15 16:10:00|2023-03-15 16:15:00|27 | |2023-03-15 15:10:00|2023-03-15 15:15:00|14 | |2023-03-13 10:10:00|2023-03-13 10:15:00|6 | +-------------------+-------------------+---------------+ Note: I'm summing up the temperatures (for easy verification) As per the above - all the 3 'ts' are included in the DataFrame, even when I added "ts":"2023-03-13T10:12:00.000-07:00", as the last record. Since the wattermark is set to "5 minutes" and the max(ts) == 2023-03-15T16:12:00.000-07:00 record with ts = "2023-03-13T10:12:00.000-07:00" should have got dropped, it is more than 2 days old (i.e. dated - 2023-03-13)! Any ideas what needs to be changed to make this work ? Here is the code (modified for my requirement, but essentially the same) ``` schema = StructType([ StructField("temparature", LongType(), False), StructField("ts", TimestampType(), False), StructField("insert_ts", TimestampType(), False) ]) streamingDataFrame = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", kafkaBrokers) \ .option("group.id", 'watermark-grp') \ .option("subscribe", topic) \ .option("failOnDataLoss", "false") \ .option("includeHeaders", "true") \ .option("startingOffsets", "latest") \ .load() \ .select(from_json(col("value").cast("string"), schema=schema).alias("parsed_value")) resultC = streamingDataFrame.select( col("parsed_value.ts").alias("timestamp") \ , col("parsed_value.temparature").alias("temparature"), col("parsed_value.insert_ts").alias("insert_ts")) resultM = resultC. \ withWatermark("timestamp", "5 minutes"). \ groupBy(window(resultC.timestamp, "5 minutes", "5 minutes")). \ agg({'temparature':'sum'}) resultMF = resultM. \ select(col("window.start").alias("startOfWindowFrame"),col("window.end").alias("endOfWindowFrame") \ , col("sum(temparature)").alias("Sum_Temperature")) result = resultMF. \ writeStream. \ outputMode('complete'). \ option("numRows", 1000). \ option("truncate", "false"). \ format('console'). \ option('checkpointLocation', checkpoint_path). \ queryName("sum_temparature"). \ start() result.awaitTermination() ``` On Sun, Mar 12, 2023 at 3:13 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > OK > > ts is the timestamp right? > > This is a similar code that works out the average temperature with time > frame of 5 minutes. > > Note the comments and catch error with try: > > try: > > # construct a streaming dataframe streamingDataFrame that > subscribes to topic temperature > streamingDataFrame = self.spark \ > .readStream \ > .format("kafka") \ > .option("kafka.bootstrap.servers", > config['MDVariables']['bootstrapServers'],) \ > .option("schema.registry.url", > config['MDVariables']['schemaRegistryURL']) \ > .option("group.id", config['common']['appName']) \ > .option("zookeeper.connection.timeout.ms", > config['MDVariables']['zookeeperConnectionTimeoutMs']) \ > .option("rebalance.backoff.ms", > config['MDVariables']['rebalanceBackoffMS']) \ > .option("zookeeper.session.timeout.ms", > config['MDVariables']['zookeeperSessionTimeOutMs']) \ > .option("auto.commit.interval.ms", > config['MDVariables']['autoCommitIntervalMS']) \ > .option("subscribe", "temperature") \ > .option("failOnDataLoss", "false") \ > .option("includeHeaders", "true") \ > .option("startingOffsets", "latest") \ > .load() \ > .select(from_json(col("value").cast("string"), > schema).alias("parsed_value")) > > > resultC = streamingDataFrame.select( \ > col("parsed_value.rowkey").alias("rowkey") \ > , col("parsed_value.timestamp").alias("timestamp") \ > , col("parsed_value.temperature").alias("temperature")) > > """ > We work out the window and the AVG(temperature) in the > window's timeframe below > This should return back the following Dataframe as struct > > root > |-- window: struct (nullable = false) > | |-- start: timestamp (nullable = true) > | |-- end: timestamp (nullable = true) > |-- avg(temperature): double (nullable = true) > > """ > resultM = resultC. \ > withWatermark("timestamp", "5 minutes"). \ > groupBy(window(resultC.timestamp, "5 minutes", "5 > minutes")). \ > avg('temperature') > > # We take the above Dataframe and flatten it to get the > columns aliased as "startOfWindowFrame", "endOfWindowFrame" and > "AVGTemperature" > resultMF = resultM. \ > select( \ > > F.col("window.start").alias("startOfWindowFrame") \ > , F.col("window.end").alias("endOfWindowFrame") \ > , > F.col("avg(temperature)").alias("AVGTemperature")) > > resultMF.printSchema() > > result = resultMF. \ > writeStream. \ > outputMode('complete'). \ > option("numRows", 1000). \ > option("truncate", "false"). \ > format('console'). \ > option('checkpointLocation', checkpoint_path). \ > queryName("temperature"). \ > start() > > except Exception as e: > print(f"""{e}, quitting""") > sys.exit(1) > > > > HTH > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sat, 11 Mar 2023 at 04:33, karan alang <karan.al...@gmail.com> wrote: > >> Hi Mich - >> Here is the output of the ldf.printSchema() & ldf.show() commands. >> >> ldf.printSchema() >> >> root >> |-- applianceName: string (nullable = true) >> |-- timeslot: long (nullable = true) >> |-- customer: string (nullable = true) >> |-- window: struct (nullable = false) >> | |-- start: timestamp (nullable = true) >> | |-- end: timestamp (nullable = true) >> |-- sentOctets: long (nullable = true) >> |-- recvdOctets: long (nullable = true) >> >> >> ldf.show() : >> >> >> >> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+ >> |applianceName |timeslot |customer|window >> |sentOctets|recvdOctets| >> >> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+ >> |abc1 |2797514 |cust1 |{2023-03-11 04:15:00, >> 2023-03-11 04:30:00}|21459264 |32211859 | >> |pqrq |2797513 |cust1 |{2023-03-11 04:15:00, >> 2023-03-11 04:30:00}|17775527 |31331093 | >> |xyz |2797514 |cust1 |{2023-03-11 04:15:00, >> 2023-03-11 04:30:00}|12808015 |24191707 | >> >> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+ >> >> Also, any comment on the outputMode ? I've set it to 'update', since I'm >> using aggregation. >> >> thanks! >> >> On Fri, Mar 10, 2023 at 10:55 AM Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> >>> Just looking at the code >>> >>> >>> in here >>> >>> >>> ldf = ldf.groupBy("applianceName", "timeslot", "customer", >>> >>> window(col("ts"), "15 minutes")) \ >>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \ >>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \ >>> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \ >>> .fillna(0) >>> >>> What does ldf.printSchema() returns >>> >>> >>> HTH >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> >>> On Fri, 10 Mar 2023 at 07:16, karan alang <karan.al...@gmail.com> wrote: >>> >>>> >>>> Hello All - >>>> >>>> I've a structured Streaming job which has a trigger of 10 minutes, and >>>> I'm using watermark to account for late data coming in. However, the >>>> watermark is not working - and instead of a single record with total >>>> aggregated value, I see 2 records. >>>> >>>> Here is the code : >>>> >>>> ``` >>>> >>>> 1) StructuredStreaming - Reading from Kafka every 10 mins >>>> >>>> >>>> df_stream = self.spark.readStream.format('kafka') \ >>>> .option("kafka.security.protocol", "SSL") \ >>>> .option("kafka.ssl.truststore.location", >>>> self.ssl_truststore_location) \ >>>> .option("kafka.ssl.truststore.password", >>>> self.ssl_truststore_password) \ >>>> .option("kafka.ssl.keystore.location", >>>> self.ssl_keystore_location_bandwidth_intermediate) \ >>>> .option("kafka.ssl.keystore.password", >>>> self.ssl_keystore_password_bandwidth_intermediate) \ >>>> .option("kafka.bootstrap.servers", self.kafkaBrokers) \ >>>> .option("subscribe", topic) \ >>>> .option("startingOffsets", "latest") \ >>>> .option("failOnDataLoss", "false") \ >>>> .option("kafka.metadata.max.age.ms", "1000") \ >>>> .option("kafka.ssl.keystore.type", "PKCS12") \ >>>> .option("kafka.ssl.truststore.type", "PKCS12") \ >>>> .load() >>>> >>>> 2. calling foreachBatch(self.process) >>>> # note - outputMode is set to "update" (tried setting outputMode = >>>> append as well) >>>> >>>> # 03/09 ::: outputMode - update instead of append >>>> query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", >>>> "topic").writeStream \ >>>> .outputMode("update") \ >>>> .trigger(processingTime='10 minutes') \ >>>> .option("truncate", "false") \ >>>> .option("checkpointLocation", self.checkpoint) \ >>>> .foreachBatch(self.process) \ >>>> .start() >>>> >>>> >>>> self.process - where i do the bulk of the processing, which calls the >>>> function 'aggIntfLogs' >>>> >>>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing >>>> groupBy to calculate the sum of sentOctets & recvdOctets >>>> >>>> >>>> def aggIntfLogs(ldf): >>>> if ldf and ldf.count() > 0: >>>> >>>> ldf = ldf.select('applianceName', 'timeslot', >>>> 'sentOctets', 'recvdOctets','ts', 'customer') \ >>>> .withColumn('sentOctets', >>>> ldf["sentOctets"].cast(LongType())) \ >>>> .withColumn('recvdOctets', >>>> ldf["recvdOctets"].cast(LongType())) \ >>>> .withWatermark("ts", "15 minutes") >>>> >>>> ldf = ldf.groupBy("applianceName", "timeslot", "customer", >>>> >>>> window(col("ts"), "15 minutes")) \ >>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \ >>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \ >>>> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \ >>>> .fillna(0) >>>> return ldf >>>> return ldf >>>> >>>> >>>> Dataframe 'ldf' returned from the function aggIntfLogs - is >>>> written to Kafka topic >>>> >>>> ``` >>>> >>>> I was expecting that using the watermark will account for late coming >>>> data .. i.e. the sentOctets & recvdOctets are calculated for the >>>> consolidated data >>>> (including late-coming data, since the late coming data comes within 15 >>>> mins), however, I'm seeing 2 records for some of the data (i.e. key - >>>> applianceName/timeslot/customer) i.e. the aggregated data is calculated >>>> individually for the records and I see 2 records instead of single record >>>> accounting for late coming data within watermark. >>>> >>>> What needs to be done to fix this & make this work as desired? >>>> >>>> tia! >>>> >>>> >>>> Here is the Stackoverflow link as well - >>>> >>>> >>>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected >>>> >>>> >>>> >>>>