Hi Mich, this doesn't seem to be working for me .. the watermark seems to
be getting ignored !
Here is the data put into Kafka :
```
+---------------------------------------------------------------------------------------------------+----+
|value
|key |
+---------------------------------------------------------------------------------------------------+----+
|{"temparature":14,"insert_ts":"2023-03-15T16:04:33.003-07:00","ts":"2023-03-15T15:12:00.000-07:00"}|null|
|{"temparature":10,"insert_ts":"2023-03-15T16:05:58.816-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null|
|{"temparature":17,"insert_ts":"2023-03-15T16:07:55.222-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null|
|{"temparature":6,"insert_ts":"2023-03-15T16:11:41.759-07:00","ts":"2023-03-13T10:12:00.000-07:00"}
|null|
+---------------------------------------------------------------------------------------------------+----+
```
Note :
insert_ts - specifies when the data was inserted
Here is the output of the Structured Stream:
-------------------------------------------
Batch: 2
-------------------------------------------
+-------------------+-------------------+---------------+
|startOfWindowFrame |endOfWindowFrame |Sum_Temperature|
+-------------------+-------------------+---------------+
|2023-03-15 16:10:00|2023-03-15 16:15:00|27 |
|2023-03-15 15:10:00|2023-03-15 15:15:00|14 |
|2023-03-13 10:10:00|2023-03-13 10:15:00|6 |
+-------------------+-------------------+---------------+
Note: I'm summing up the temperatures (for easy verification)
As per the above - all the 3 'ts' are included in the DataFrame, even when
I added "ts":"2023-03-13T10:12:00.000-07:00", as the last record.
Since the wattermark is set to "5 minutes" and the max(ts) ==
2023-03-15T16:12:00.000-07:00
record with ts = "2023-03-13T10:12:00.000-07:00" should have got dropped,
it is more than 2 days old (i.e. dated - 2023-03-13)!
Any ideas what needs to be changed to make this work ?
Here is the code (modified for my requirement, but essentially the same)
```
schema = StructType([
StructField("temparature", LongType(), False),
StructField("ts", TimestampType(), False),
StructField("insert_ts", TimestampType(), False)
])
streamingDataFrame = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafkaBrokers) \
.option("group.id", 'watermark-grp') \
.option("subscribe", topic) \
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "latest") \
.load() \
.select(from_json(col("value").cast("string"),
schema=schema).alias("parsed_value"))
resultC = streamingDataFrame.select( col("parsed_value.ts").alias("timestamp") \
,
col("parsed_value.temparature").alias("temparature"),
col("parsed_value.insert_ts").alias("insert_ts"))
resultM = resultC. \
withWatermark("timestamp", "5 minutes"). \
groupBy(window(resultC.timestamp, "5 minutes", "5 minutes")). \
agg({'temparature':'sum'})
resultMF = resultM. \
select(col("window.start").alias("startOfWindowFrame"),col("window.end").alias("endOfWindowFrame")
\
, col("sum(temparature)").alias("Sum_Temperature"))
result = resultMF. \
writeStream. \
outputMode('complete'). \
option("numRows", 1000). \
option("truncate", "false"). \
format('console'). \
option('checkpointLocation', checkpoint_path). \
queryName("sum_temparature"). \
start()
result.awaitTermination()
```
On Sun, Mar 12, 2023 at 3:13 AM Mich Talebzadeh <[email protected]>
wrote:
> OK
>
> ts is the timestamp right?
>
> This is a similar code that works out the average temperature with time
> frame of 5 minutes.
>
> Note the comments and catch error with try:
>
> try:
>
> # construct a streaming dataframe streamingDataFrame that
> subscribes to topic temperature
> streamingDataFrame = self.spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
> .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
> .option("group.id", config['common']['appName']) \
> .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
> .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
> .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
> .option("subscribe", "temperature") \
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "latest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
>
> resultC = streamingDataFrame.select( \
> col("parsed_value.rowkey").alias("rowkey") \
> , col("parsed_value.timestamp").alias("timestamp") \
> , col("parsed_value.temperature").alias("temperature"))
>
> """
> We work out the window and the AVG(temperature) in the
> window's timeframe below
> This should return back the following Dataframe as struct
>
> root
> |-- window: struct (nullable = false)
> | |-- start: timestamp (nullable = true)
> | |-- end: timestamp (nullable = true)
> |-- avg(temperature): double (nullable = true)
>
> """
> resultM = resultC. \
> withWatermark("timestamp", "5 minutes"). \
> groupBy(window(resultC.timestamp, "5 minutes", "5
> minutes")). \
> avg('temperature')
>
> # We take the above Dataframe and flatten it to get the
> columns aliased as "startOfWindowFrame", "endOfWindowFrame" and
> "AVGTemperature"
> resultMF = resultM. \
> select( \
>
> F.col("window.start").alias("startOfWindowFrame") \
> , F.col("window.end").alias("endOfWindowFrame") \
> ,
> F.col("avg(temperature)").alias("AVGTemperature"))
>
> resultMF.printSchema()
>
> result = resultMF. \
> writeStream. \
> outputMode('complete'). \
> option("numRows", 1000). \
> option("truncate", "false"). \
> format('console'). \
> option('checkpointLocation', checkpoint_path). \
> queryName("temperature"). \
> start()
>
> except Exception as e:
> print(f"""{e}, quitting""")
> sys.exit(1)
>
>
>
> HTH
>
>
> view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
> https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 11 Mar 2023 at 04:33, karan alang <[email protected]> wrote:
>
>> Hi Mich -
>> Here is the output of the ldf.printSchema() & ldf.show() commands.
>>
>> ldf.printSchema()
>>
>> root
>> |-- applianceName: string (nullable = true)
>> |-- timeslot: long (nullable = true)
>> |-- customer: string (nullable = true)
>> |-- window: struct (nullable = false)
>> | |-- start: timestamp (nullable = true)
>> | |-- end: timestamp (nullable = true)
>> |-- sentOctets: long (nullable = true)
>> |-- recvdOctets: long (nullable = true)
>>
>>
>> ldf.show() :
>>
>>
>>
>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
>> |applianceName |timeslot |customer|window
>> |sentOctets|recvdOctets|
>>
>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
>> |abc1 |2797514 |cust1 |{2023-03-11 04:15:00,
>> 2023-03-11 04:30:00}|21459264 |32211859 |
>> |pqrq |2797513 |cust1 |{2023-03-11 04:15:00,
>> 2023-03-11 04:30:00}|17775527 |31331093 |
>> |xyz |2797514 |cust1 |{2023-03-11 04:15:00,
>> 2023-03-11 04:30:00}|12808015 |24191707 |
>>
>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
>>
>> Also, any comment on the outputMode ? I've set it to 'update', since I'm
>> using aggregation.
>>
>> thanks!
>>
>> On Fri, Mar 10, 2023 at 10:55 AM Mich Talebzadeh <
>> [email protected]> wrote:
>>
>>>
>>> Just looking at the code
>>>
>>>
>>> in here
>>>
>>>
>>> ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>>>
>>> window(col("ts"), "15 minutes")) \
>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>>> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
>>> .fillna(0)
>>>
>>> What does ldf.printSchema() returns
>>>
>>>
>>> HTH
>>>
>>>
>>> view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>> https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 10 Mar 2023 at 07:16, karan alang <[email protected]> wrote:
>>>
>>>>
>>>> Hello All -
>>>>
>>>> I've a structured Streaming job which has a trigger of 10 minutes, and
>>>> I'm using watermark to account for late data coming in. However, the
>>>> watermark is not working - and instead of a single record with total
>>>> aggregated value, I see 2 records.
>>>>
>>>> Here is the code :
>>>>
>>>> ```
>>>>
>>>> 1) StructuredStreaming - Reading from Kafka every 10 mins
>>>>
>>>>
>>>> df_stream = self.spark.readStream.format('kafka') \
>>>> .option("kafka.security.protocol", "SSL") \
>>>> .option("kafka.ssl.truststore.location",
>>>> self.ssl_truststore_location) \
>>>> .option("kafka.ssl.truststore.password",
>>>> self.ssl_truststore_password) \
>>>> .option("kafka.ssl.keystore.location",
>>>> self.ssl_keystore_location_bandwidth_intermediate) \
>>>> .option("kafka.ssl.keystore.password",
>>>> self.ssl_keystore_password_bandwidth_intermediate) \
>>>> .option("kafka.bootstrap.servers", self.kafkaBrokers) \
>>>> .option("subscribe", topic) \
>>>> .option("startingOffsets", "latest") \
>>>> .option("failOnDataLoss", "false") \
>>>> .option("kafka.metadata.max.age.ms", "1000") \
>>>> .option("kafka.ssl.keystore.type", "PKCS12") \
>>>> .option("kafka.ssl.truststore.type", "PKCS12") \
>>>> .load()
>>>>
>>>> 2. calling foreachBatch(self.process)
>>>> # note - outputMode is set to "update" (tried setting outputMode =
>>>> append as well)
>>>>
>>>> # 03/09 ::: outputMode - update instead of append
>>>> query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp",
>>>> "topic").writeStream \
>>>> .outputMode("update") \
>>>> .trigger(processingTime='10 minutes') \
>>>> .option("truncate", "false") \
>>>> .option("checkpointLocation", self.checkpoint) \
>>>> .foreachBatch(self.process) \
>>>> .start()
>>>>
>>>>
>>>> self.process - where i do the bulk of the processing, which calls the
>>>> function 'aggIntfLogs'
>>>>
>>>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing
>>>> groupBy to calculate the sum of sentOctets & recvdOctets
>>>>
>>>>
>>>> def aggIntfLogs(ldf):
>>>> if ldf and ldf.count() > 0:
>>>>
>>>> ldf = ldf.select('applianceName', 'timeslot',
>>>> 'sentOctets', 'recvdOctets','ts', 'customer') \
>>>> .withColumn('sentOctets',
>>>> ldf["sentOctets"].cast(LongType())) \
>>>> .withColumn('recvdOctets',
>>>> ldf["recvdOctets"].cast(LongType())) \
>>>> .withWatermark("ts", "15 minutes")
>>>>
>>>> ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>>>>
>>>> window(col("ts"), "15 minutes")) \
>>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>>>> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
>>>> .fillna(0)
>>>> return ldf
>>>> return ldf
>>>>
>>>>
>>>> Dataframe 'ldf' returned from the function aggIntfLogs - is
>>>> written to Kafka topic
>>>>
>>>> ```
>>>>
>>>> I was expecting that using the watermark will account for late coming
>>>> data .. i.e. the sentOctets & recvdOctets are calculated for the
>>>> consolidated data
>>>> (including late-coming data, since the late coming data comes within 15
>>>> mins), however, I'm seeing 2 records for some of the data (i.e. key -
>>>> applianceName/timeslot/customer) i.e. the aggregated data is calculated
>>>> individually for the records and I see 2 records instead of single record
>>>> accounting for late coming data within watermark.
>>>>
>>>> What needs to be done to fix this & make this work as desired?
>>>>
>>>> tia!
>>>>
>>>>
>>>> Here is the Stackoverflow link as well -
>>>>
>>>>
>>>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected
>>>>
>>>>
>>>>
>>>>