Hi Mich, this doesn't seem to be working for me .. the watermark seems to
be getting ignored !

Here is the data put into Kafka :

```

+---------------------------------------------------------------------------------------------------+----+

|value
                        |key |

+---------------------------------------------------------------------------------------------------+----+

|{"temparature":14,"insert_ts":"2023-03-15T16:04:33.003-07:00","ts":"2023-03-15T15:12:00.000-07:00"}|null|

|{"temparature":10,"insert_ts":"2023-03-15T16:05:58.816-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null|

|{"temparature":17,"insert_ts":"2023-03-15T16:07:55.222-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null|

|{"temparature":6,"insert_ts":"2023-03-15T16:11:41.759-07:00","ts":"2023-03-13T10:12:00.000-07:00"}
|null|

+---------------------------------------------------------------------------------------------------+----+
```
Note :
insert_ts - specifies when the data was inserted

Here is the output of the Structured Stream:

-------------------------------------------

Batch: 2

-------------------------------------------

+-------------------+-------------------+---------------+

|startOfWindowFrame |endOfWindowFrame   |Sum_Temperature|

+-------------------+-------------------+---------------+

|2023-03-15 16:10:00|2023-03-15 16:15:00|27             |

|2023-03-15 15:10:00|2023-03-15 15:15:00|14             |

|2023-03-13 10:10:00|2023-03-13 10:15:00|6              |

+-------------------+-------------------+---------------+

Note: I'm summing up the temperatures (for easy verification)

As per the above - all the 3 'ts' are included in the DataFrame, even when
I added   "ts":"2023-03-13T10:12:00.000-07:00", as the last record.
Since the wattermark is set to "5 minutes" and the max(ts) ==
2023-03-15T16:12:00.000-07:00
record with ts = "2023-03-13T10:12:00.000-07:00" should have got dropped,
it is more than 2 days old (i.e. dated - 2023-03-13)!

Any ideas what needs to be changed to make this work ?

Here is the code (modified for my requirement, but essentially the same)
```

schema = StructType([
            StructField("temparature", LongType(), False),
            StructField("ts", TimestampType(), False),
            StructField("insert_ts", TimestampType(), False)
        ])

streamingDataFrame = spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", kafkaBrokers) \
                .option("group.id", 'watermark-grp') \
                .option("subscribe", topic) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "latest") \
                .load() \
                .select(from_json(col("value").cast("string"),
schema=schema).alias("parsed_value"))

resultC = streamingDataFrame.select( col("parsed_value.ts").alias("timestamp") \
                   ,
col("parsed_value.temparature").alias("temparature"),
col("parsed_value.insert_ts").alias("insert_ts"))

resultM = resultC. \
    withWatermark("timestamp", "5 minutes"). \
    groupBy(window(resultC.timestamp, "5 minutes", "5 minutes")). \
    agg({'temparature':'sum'})

resultMF = resultM. \
            
select(col("window.start").alias("startOfWindowFrame"),col("window.end").alias("endOfWindowFrame")
\
                          , col("sum(temparature)").alias("Sum_Temperature"))

result = resultMF. \
                     writeStream. \
                     outputMode('complete'). \
                     option("numRows", 1000). \
                     option("truncate", "false"). \
                     format('console'). \
                     option('checkpointLocation', checkpoint_path). \
                     queryName("sum_temparature"). \
                     start()

result.awaitTermination()

```



On Sun, Mar 12, 2023 at 3:13 AM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> OK
>
> ts is the timestamp right?
>
> This is a similar code that works out the average temperature with time
> frame of 5 minutes.
>
> Note the comments and catch error with try:
>
>         try:
>
>             # construct a streaming dataframe streamingDataFrame that
> subscribes to topic temperature
>             streamingDataFrame = self.spark \
>                 .readStream \
>                 .format("kafka") \
>                 .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
>                 .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
>                 .option("group.id", config['common']['appName']) \
>                 .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>                 .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
>                 .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>                 .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
>                 .option("subscribe", "temperature") \
>                 .option("failOnDataLoss", "false") \
>                 .option("includeHeaders", "true") \
>                 .option("startingOffsets", "latest") \
>                 .load() \
>                 .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
>
>             resultC = streamingDataFrame.select( \
>                      col("parsed_value.rowkey").alias("rowkey") \
>                    , col("parsed_value.timestamp").alias("timestamp") \
>                    , col("parsed_value.temperature").alias("temperature"))
>
>             """
>             We work out the window and the AVG(temperature) in the
> window's timeframe below
>             This should return back the following Dataframe as struct
>
>              root
>              |-- window: struct (nullable = false)
>              |    |-- start: timestamp (nullable = true)
>              |    |-- end: timestamp (nullable = true)
>              |-- avg(temperature): double (nullable = true)
>
>             """
>             resultM = resultC. \
>                      withWatermark("timestamp", "5 minutes"). \
>                      groupBy(window(resultC.timestamp, "5 minutes", "5
> minutes")). \
>                      avg('temperature')
>
>             # We take the above Dataframe and flatten it to get the
> columns aliased as "startOfWindowFrame", "endOfWindowFrame" and
> "AVGTemperature"
>             resultMF = resultM. \
>                        select( \
>
> F.col("window.start").alias("startOfWindowFrame") \
>                           , F.col("window.end").alias("endOfWindowFrame") \
>                           ,
> F.col("avg(temperature)").alias("AVGTemperature"))
>
>             resultMF.printSchema()
>
>             result = resultMF. \
>                      writeStream. \
>                      outputMode('complete'). \
>                      option("numRows", 1000). \
>                      option("truncate", "false"). \
>                      format('console'). \
>                      option('checkpointLocation', checkpoint_path). \
>                      queryName("temperature"). \
>                      start()
>
>         except Exception as e:
>                 print(f"""{e}, quitting""")
>                 sys.exit(1)
>
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 11 Mar 2023 at 04:33, karan alang <karan.al...@gmail.com> wrote:
>
>> Hi Mich -
>> Here is the output of the ldf.printSchema() & ldf.show() commands.
>>
>> ldf.printSchema()
>>
>> root
>>  |-- applianceName: string (nullable = true)
>>  |-- timeslot: long (nullable = true)
>>  |-- customer: string (nullable = true)
>>  |-- window: struct (nullable = false)
>>  |    |-- start: timestamp (nullable = true)
>>  |    |-- end: timestamp (nullable = true)
>>  |-- sentOctets: long (nullable = true)
>>  |-- recvdOctets: long (nullable = true)
>>
>>
>>  ldf.show() :
>>
>>
>>  
>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
>> |applianceName     |timeslot    |customer|window
>>                        |sentOctets|recvdOctets|
>>
>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
>> |abc1              |2797514    |cust1     |{2023-03-11 04:15:00,
>> 2023-03-11 04:30:00}|21459264  |32211859   |
>> |pqrq              |2797513    |cust1     |{2023-03-11 04:15:00,
>> 2023-03-11 04:30:00}|17775527  |31331093   |
>> |xyz                |2797514    |cust1     |{2023-03-11 04:15:00,
>> 2023-03-11 04:30:00}|12808015  |24191707   |
>>
>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
>>
>> Also, any comment on the outputMode ? I've set it to 'update', since I'm
>> using aggregation.
>>
>> thanks!
>>
>> On Fri, Mar 10, 2023 at 10:55 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>>
>>> Just looking at the code
>>>
>>>
>>> in here
>>>
>>>
>>> ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>>>                                                              
>>> window(col("ts"), "15 minutes")) \
>>>                     .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>>>                     .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>>>                     .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
>>>                     .fillna(0)
>>>
>>> What does ldf.printSchema() returns
>>>
>>>
>>> HTH
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 10 Mar 2023 at 07:16, karan alang <karan.al...@gmail.com> wrote:
>>>
>>>>
>>>> Hello All -
>>>>
>>>> I've a structured Streaming job which has a trigger of 10 minutes, and
>>>> I'm using watermark to account for late data coming in. However, the
>>>> watermark is not working - and instead of a single record with total
>>>> aggregated value, I see 2 records.
>>>>
>>>> Here is the code :
>>>>
>>>> ```
>>>>
>>>> 1) StructuredStreaming - Reading from Kafka every 10 mins
>>>>
>>>>
>>>>         df_stream = self.spark.readStream.format('kafka') \
>>>>             .option("kafka.security.protocol", "SSL") \
>>>>             .option("kafka.ssl.truststore.location", 
>>>> self.ssl_truststore_location) \
>>>>             .option("kafka.ssl.truststore.password", 
>>>> self.ssl_truststore_password) \
>>>>             .option("kafka.ssl.keystore.location", 
>>>> self.ssl_keystore_location_bandwidth_intermediate) \
>>>>             .option("kafka.ssl.keystore.password", 
>>>> self.ssl_keystore_password_bandwidth_intermediate) \
>>>>             .option("kafka.bootstrap.servers", self.kafkaBrokers) \
>>>>             .option("subscribe", topic) \
>>>>             .option("startingOffsets", "latest") \
>>>>             .option("failOnDataLoss", "false") \
>>>>             .option("kafka.metadata.max.age.ms", "1000") \
>>>>             .option("kafka.ssl.keystore.type", "PKCS12") \
>>>>             .option("kafka.ssl.truststore.type", "PKCS12") \
>>>>             .load()
>>>>
>>>> 2. calling foreachBatch(self.process)
>>>>         # note - outputMode is set to "update" (tried setting outputMode = 
>>>> append as well)
>>>>
>>>>         # 03/09 ::: outputMode - update instead of append
>>>>         query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", 
>>>> "topic").writeStream \
>>>>             .outputMode("update") \
>>>>             .trigger(processingTime='10 minutes') \
>>>>             .option("truncate", "false") \
>>>>             .option("checkpointLocation", self.checkpoint) \
>>>>             .foreachBatch(self.process) \
>>>>             .start()
>>>>
>>>>
>>>> self.process - where i do the bulk of the  processing, which calls the 
>>>> function  'aggIntfLogs'
>>>>
>>>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing  
>>>> groupBy to calculate the sum of sentOctets & recvdOctets
>>>>
>>>>
>>>>         def aggIntfLogs(ldf):
>>>>             if ldf and ldf.count() > 0:
>>>>
>>>>                 ldf = ldf.select('applianceName', 'timeslot', 
>>>> 'sentOctets', 'recvdOctets','ts', 'customer') \
>>>>                     .withColumn('sentOctets', 
>>>> ldf["sentOctets"].cast(LongType())) \
>>>>                     .withColumn('recvdOctets', 
>>>> ldf["recvdOctets"].cast(LongType())) \
>>>>                     .withWatermark("ts", "15 minutes")
>>>>
>>>>                 ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>>>>                                                              
>>>> window(col("ts"), "15 minutes")) \
>>>>                     .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>>>>                     .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>>>>                     .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
>>>>                     .fillna(0)
>>>>                 return ldf
>>>>             return ldf
>>>>
>>>>
>>>>         Dataframe 'ldf' returned from the function aggIntfLogs - is 
>>>> written to Kafka topic
>>>>
>>>> ```
>>>>
>>>> I was expecting that using the watermark will account for late coming
>>>> data .. i.e. the sentOctets & recvdOctets are calculated for the
>>>> consolidated data
>>>> (including late-coming data, since the late coming data comes within 15
>>>> mins), however, I'm seeing 2 records for some of the data (i.e. key -
>>>> applianceName/timeslot/customer) i.e. the aggregated data is calculated
>>>> individually for the records and I see 2 records instead of single record
>>>> accounting for late coming data within watermark.
>>>>
>>>> What needs to be done to fix this & make this work as desired?
>>>>
>>>> tia!
>>>>
>>>>
>>>> Here is the Stackoverflow link as well -
>>>>
>>>>
>>>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected
>>>>
>>>>
>>>>
>>>>

Reply via email to