;>>>>
>>>>>>
>>>>>> https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which
>>>>>> may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, 10 Mar 2023 at 07:16, karan alang
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>> Hello All -
>>>>>>>
>>>>>>> I've a structured Streaming job which has a trigger of 10 minutes,
>>>>>>> and I'm using watermark to account for late data coming in. However, the
>>>>>>> watermark is not working - and instead of a single record with total
>>>>>>> aggregated value, I see 2 records.
>>>>>>>
>>>>>>> Here is the code :
>>>>>>>
>>>>>>> ```
>>>>>>>
>>>>>>> 1) StructuredStreaming - Reading from Kafka every 10 mins
>>>>>>>
>>>>>>>
>>>>>>> df_stream = self.spark.readStream.format('kafka') \
>>>>>>> .option("kafka.security.protocol", "SSL") \
>>>>>>> .option("kafka.ssl.truststore.location",
>>>>>>> self.ssl_truststore_location) \
>>>>>>> .option("kafka.ssl.truststore.password",
>>>>>>> self.ssl_truststore_password) \
>>>>>>> .option("kafka.ssl.keystore.location",
>>>>>>> self.ssl_keystore_location_bandwidth_intermediate) \
>>>>>>> .option("kafka.ssl.keystore.password",
>>>>>>> self.ssl_keystore_password_bandwidth_intermediate) \
>>>>>>> .option("kafka.bootstrap.servers", self.kafkaBrokers) \
>>>>>>> .option("subscribe", topic) \
>>>>>>> .option("startingOffsets", "latest") \
>>>>>>> .option("failOnDataLoss", "false") \
>>>>>>> .option("kafka.metadata.max.age.ms", "1000") \
>>>>>>> .option("kafka.ssl.keystore.type", "PKCS12") \
>>>>>>> .option("kafka.ssl.truststore.type", "PKCS12") \
>>>>>>> .load()
>>>>>>>
>>>>>>> 2. calling foreachBatch(self.process)
>>>>>>> # note - outputMode is set to "update" (tried setting
>>>>>>> outputMode = append as well)
>>>>>>>
>>>>>>> # 03/09 ::: outputMode - update instead of append
>>>>>>> query = df_stream.selectExpr("CAST(value AS STRING)",
>>>>>>> "timestamp", "topic").writeStream \
>>>>>>> .outputMode("update") \
>>>>>>> .trigger(processingTime='10 minutes') \
>>>>>>> .option("truncate", "false") \
>>>>>>> .option("checkpointLocation", self.checkpoint) \
>>>>>>> .foreachBatch(self.process) \
>>>>>>> .start()
>>>>>>>
>>>>>>>
>>>>>>> self.process - where i do the bulk of the processing, which calls the
>>>>>>> function 'aggIntfLogs'
>>>>>>>
>>>>>>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing
>>>>>>> groupBy to calculate the sum of sentOctets & recvdOctets
>>>>>>>
>>>>>>>
>>>>>>> def aggIntfLogs(ldf):
>>>>>>> if ldf and ldf.count() > 0:
>>>>>>>
>>>>>>> ldf = ldf.select('applianceName', 'timeslot',
>>>>>>> 'sentOctets', 'recvdOctets','ts', 'customer') \
>>>>>>> .withColumn('sentOctets',
>>>>>>> ldf["sentOctets"].cast(LongType())) \
>>>>>>> .withColumn('recvdOctets',
>>>>>>> ldf["recvdOctets"].cast(LongType())) \
>>>>>>> .withWatermark("ts", "15 minutes")
>>>>>>>
>>>>>>> ldf = ldf.groupBy("applianceName", "timeslot",
>>>>>>> "customer",
>>>>>>>
>>>>>>> window(col("ts"), "15 minutes")) \
>>>>>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>>>>>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets')
>>>>>>> \
>>>>>>> .withColumnRenamed('sum(recvdOctets)',
>>>>>>> 'recvdOctets') \
>>>>>>> .fillna(0)
>>>>>>> return ldf
>>>>>>> return ldf
>>>>>>>
>>>>>>>
>>>>>>> Dataframe 'ldf' returned from the function aggIntfLogs - is
>>>>>>> written to Kafka topic
>>>>>>>
>>>>>>> ```
>>>>>>>
>>>>>>> I was expecting that using the watermark will account for late
>>>>>>> coming data .. i.e. the sentOctets & recvdOctets are calculated for the
>>>>>>> consolidated data
>>>>>>> (including late-coming data, since the late coming data comes within
>>>>>>> 15 mins), however, I'm seeing 2 records for some of the data (i.e. key -
>>>>>>> applianceName/timeslot/customer) i.e. the aggregated data is calculated
>>>>>>> individually for the records and I see 2 records instead of single
>>>>>>> record
>>>>>>> accounting for late coming data within watermark.
>>>>>>>
>>>>>>> What needs to be done to fix this & make this work as desired?
>>>>>>>
>>>>>>> tia!
>>>>>>>
>>>>>>>
>>>>>>> Here is the Stackoverflow link as well -
>>>>>>>
>>>>>>>
>>>>>>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
t;> I've a structured Streaming job which has a trigger of 10 minutes,
>>>>>> and I'm using watermark to account for late data coming in. However, the
>>>>>> watermark is not working - and instead of a single record with total
>>>>>> aggregated value, I see 2 records.
>>>>>>
>>>>>> Here is the code :
>>>>>>
>>>>>> ```
>>>>>>
>>>>>> 1) StructuredStreaming - Reading from Kafka every 10 mins
>>>>>>
>>>>>>
>>>>>> df_stream = self.spark.readStream.format('kafka') \
>>>>>> .option("kafka.security.protocol", "SSL") \
>>>>>> .option("kafka.ssl.truststore.location",
>>>>>> self.ssl_truststore_location) \
>>>>>> .option("kafka.ssl.truststore.password",
>>>>>> self.ssl_truststore_password) \
>>>>>> .option("kafka.ssl.keystore.location",
>>>>>> self.ssl_keystore_location_bandwidth_intermediate) \
>>>>>> .option("kafka.ssl.keystore.password",
>>>>>> self.ssl_keystore_password_bandwidth_intermediate) \
>>>>>> .option("kafka.bootstrap.servers", self.kafkaBrokers) \
>>>>>> .option("subscribe", topic) \
>>>>>> .option("startingOffsets", "latest") \
>>>>>> .option("failOnDataLoss", "false") \
>>>>>> .option("kafka.metadata.max.age.ms", "1000") \
>>>>>> .option("kafka.ssl.keystore.type", "PKCS12") \
>>>>>> .option("kafka.ssl.truststore.type", "PKCS12") \
>>>>>> .load()
>>>>>>
>>>>>> 2. calling foreachBatch(self.process)
>>>>>> # note - outputMode is set to "update" (tried setting outputMode
>>>>>> = append as well)
>>>>>>
>>>>>> # 03/09 ::: outputMode - update instead of append
>>>>>> query = df_stream.selectExpr("CAST(value AS STRING)",
>>>>>> "timestamp", "topic").writeStream \
>>>>>> .outputMode("update") \
>>>>>> .trigger(processingTime='10 minutes') \
>>>>>> .option("truncate", "false") \
>>>>>> .option("checkpointLocation", self.checkpoint) \
>>>>>> .foreachBatch(self.process) \
>>>>>> .start()
>>>>>>
>>>>>>
>>>>>> self.process - where i do the bulk of the processing, which calls the
>>>>>> function 'aggIntfLogs'
>>>>>>
>>>>>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing
>>>>>> groupBy to calculate the sum of sentOctets & recvdOctets
>>>>>>
>>>>>>
>>>>>> def aggIntfLogs(ldf):
>>>>>> if ldf and ldf.count() > 0:
>>>>>>
>>>>>> ldf = ldf.select('applianceName', 'timeslot',
>>>>>> 'sentOctets', 'recvdOctets','ts', 'customer') \
>>>>>> .withColumn('sentOctets',
>>>>>> ldf["sentOctets"].cast(LongType())) \
>>>>>> .withColumn('recvdOctets',
>>>>>> ldf["recvdOctets"].cast(LongType())) \
>>>>>> .withWatermark("ts", "15 minutes")
>>>>>>
>>>>>> ldf = ldf.groupBy("applianceName", "timeslot",
>>>>>> "customer",
>>>>>>
>>>>>> window(col("ts"), "15 minutes")) \
>>>>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>>>>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>>>>>> .withColumnRenamed('sum(recvdOctets)',
>>>>>> 'recvdOctets') \
>>>>>> .fillna(0)
>>>>>> return ldf
>>>>>> return ldf
>>>>>>
>>>>>>
>>>>>> Dataframe 'ldf' returned from the function aggIntfLogs - is
>>>>>> written to Kafka topic
>>>>>>
>>>>>> ```
>>>>>>
>>>>>> I was expecting that using the watermark will account for late coming
>>>>>> data .. i.e. the sentOctets & recvdOctets are calculated for the
>>>>>> consolidated data
>>>>>> (including late-coming data, since the late coming data comes within
>>>>>> 15 mins), however, I'm seeing 2 records for some of the data (i.e. key -
>>>>>> applianceName/timeslot/customer) i.e. the aggregated data is calculated
>>>>>> individually for the records and I see 2 records instead of single record
>>>>>> accounting for late coming data within watermark.
>>>>>>
>>>>>> What needs to be done to fix this & make this work as desired?
>>>>>>
>>>>>> tia!
>>>>>>
>>>>>>
>>>>>> Here is the Stackoverflow link as well -
>>>>>>
>>>>>>
>>>>>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected
>>>>>>
>>>>>>
>>>>>>
>>>>>>
t;>> .option("kafka.ssl.truststore.password",
>>>>> self.ssl_truststore_password) \
>>>>> .option("kafka.ssl.keystore.location",
>>>>> self.ssl_keystore_location_bandwidth_intermediate) \
>>>>> .option("kafka.ssl.keystore.password",
>>>>> self.ssl_keystore_password_bandwidth_intermediate) \
>>>>> .option("kafka.bootstrap.servers", self.kafkaBrokers) \
>>>>> .option("subscribe", topic) \
>>>>> .option("startingOffsets", "latest") \
>>>>> .option("failOnDataLoss", "false") \
>>>>> .option("kafka.metadata.max.age.ms", "1000") \
>>>>> .option("kafka.ssl.keystore.type", "PKCS12") \
>>>>> .option("kafka.ssl.truststore.type", "PKCS12") \
>>>>> .load()
>>>>>
>>>>> 2. calling foreachBatch(self.process)
>>>>> # note - outputMode is set to "update" (tried setting outputMode
>>>>> = append as well)
>>>>>
>>>>> # 03/09 ::: outputMode - update instead of append
>>>>> query = df_stream.selectExpr("CAST(value AS STRING)",
>>>>> "timestamp", "topic").writeStream \
>>>>> .outputMode("update") \
>>>>> .trigger(processingTime='10 minutes') \
>>>>> .option("truncate", "false") \
>>>>> .option("checkpointLocation", self.checkpoint) \
>>>>> .foreachBatch(self.process) \
>>>>> .start()
>>>>>
>>>>>
>>>>> self.process - where i do the bulk of the processing, which calls the
>>>>> function 'aggIntfLogs'
>>>>>
>>>>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing
>>>>> groupBy to calculate the sum of sentOctets & recvdOctets
>>>>>
>>>>>
>>>>> def aggIntfLogs(ldf):
>>>>> if ldf and ldf.count() > 0:
>>>>>
>>>>> ldf = ldf.select('applianceName', 'timeslot',
>>>>> 'sentOctets', 'recvdOctets','ts', 'customer') \
>>>>> .withColumn('sentOctets',
>>>>> ldf["sentOctets"].cast(LongType())) \
>>>>> .withColumn('recvdOctets',
>>>>> ldf["recvdOctets"].cast(LongType())) \
>>>>> .withWatermark("ts", "15 minutes")
>>>>>
>>>>> ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>>>>>
>>>>> window(col("ts"), "15 minutes")) \
>>>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>>>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>>>>> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets')
>>>>> \
>>>>> .fillna(0)
>>>>> return ldf
>>>>> return ldf
>>>>>
>>>>>
>>>>> Dataframe 'ldf' returned from the function aggIntfLogs - is
>>>>> written to Kafka topic
>>>>>
>>>>> ```
>>>>>
>>>>> I was expecting that using the watermark will account for late coming
>>>>> data .. i.e. the sentOctets & recvdOctets are calculated for the
>>>>> consolidated data
>>>>> (including late-coming data, since the late coming data comes within
>>>>> 15 mins), however, I'm seeing 2 records for some of the data (i.e. key -
>>>>> applianceName/timeslot/customer) i.e. the aggregated data is calculated
>>>>> individually for the records and I see 2 records instead of single record
>>>>> accounting for late coming data within watermark.
>>>>>
>>>>> What needs to be done to fix this & make this work as desired?
>>>>>
>>>>> tia!
>>>>>
>>>>>
>>>>> Here is the Stackoverflow link as well -
>>>>>
>>>>>
>>>>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected
>>>>>
>>>>>
>>>>>
>>>>>
.option("kafka.ssl.truststore.type", "PKCS12") \
>>>> .load()
>>>>
>>>> 2. calling foreachBatch(self.process)
>>>> # note - outputMode is set to "update" (tried setting outputMode =
>>>> append as well)
>>>>
>>>> # 03/09 ::: outputMode - update instead of append
>>>> query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp",
>>>> "topic").writeStream \
>>>> .outputMode("update") \
>>>> .trigger(processingTime='10 minutes') \
>>>> .option("truncate", "false") \
>>>> .option("checkpointLocation", self.checkpoint) \
>>>> .foreachBatch(self.process) \
>>>> .start()
>>>>
>>>>
>>>> self.process - where i do the bulk of the processing, which calls the
>>>> function 'aggIntfLogs'
>>>>
>>>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing
>>>> groupBy to calculate the sum of sentOctets & recvdOctets
>>>>
>>>>
>>>> def aggIntfLogs(ldf):
>>>> if ldf and ldf.count() > 0:
>>>>
>>>> ldf = ldf.select('applianceName', 'timeslot',
>>>> 'sentOctets', 'recvdOctets','ts', 'customer') \
>>>> .withColumn('sentOctets',
>>>> ldf["sentOctets"].cast(LongType())) \
>>>> .withColumn('recvdOctets',
>>>> ldf["recvdOctets"].cast(LongType())) \
>>>> .withWatermark("ts", "15 minutes")
>>>>
>>>> ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>>>>
>>>> window(col("ts"), "15 minutes")) \
>>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>>>> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
>>>> .fillna(0)
>>>> return ldf
>>>> return ldf
>>>>
>>>>
>>>> Dataframe 'ldf' returned from the function aggIntfLogs - is
>>>> written to Kafka topic
>>>>
>>>> ```
>>>>
>>>> I was expecting that using the watermark will account for late coming
>>>> data .. i.e. the sentOctets & recvdOctets are calculated for the
>>>> consolidated data
>>>> (including late-coming data, since the late coming data comes within 15
>>>> mins), however, I'm seeing 2 records for some of the data (i.e. key -
>>>> applianceName/timeslot/customer) i.e. the aggregated data is calculated
>>>> individually for the records and I see 2 records instead of single record
>>>> accounting for late coming data within watermark.
>>>>
>>>> What needs to be done to fix this & make this work as desired?
>>>>
>>>> tia!
>>>>
>>>>
>>>> Here is the Stackoverflow link as well -
>>>>
>>>>
>>>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected
>>>>
>>>>
>>>>
>>>>
nd ldf.count() > 0:
>>>
>>> ldf = ldf.select('applianceName', 'timeslot', 'sentOctets',
>>> 'recvdOctets','ts', 'customer') \
>>> .withColumn('sentOctets',
>>> ldf["sentOctets"].cast(LongType())) \
>>> .withColumn('recvdOctets',
>>> ldf["recvdOctets"].cast(LongType())) \
>>> .withWatermark("ts", "15 minutes")
>>>
>>> ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>>>
>>> window(col("ts"), "15 minutes")) \
>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>>> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
>>> .fillna(0)
>>> return ldf
>>> return ldf
>>>
>>>
>>> Dataframe 'ldf' returned from the function aggIntfLogs - is written
>>> to Kafka topic
>>>
>>> ```
>>>
>>> I was expecting that using the watermark will account for late coming
>>> data .. i.e. the sentOctets & recvdOctets are calculated for the
>>> consolidated data
>>> (including late-coming data, since the late coming data comes within 15
>>> mins), however, I'm seeing 2 records for some of the data (i.e. key -
>>> applianceName/timeslot/customer) i.e. the aggregated data is calculated
>>> individually for the records and I see 2 records instead of single record
>>> accounting for late coming data within watermark.
>>>
>>> What needs to be done to fix this & make this work as desired?
>>>
>>> tia!
>>>
>>>
>>> Here is the Stackoverflow link as well -
>>>
>>>
>>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected
>>>
>>>
>>>
>>>
.agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
>> .fillna(0)
>> return ldf
>> return ldf
>>
>>
>> Dataframe 'ldf' returned from the function aggIntfLogs - is written
>> to Kafka topic
>>
>> ```
>>
>> I was expecting that using the watermark will account for late coming
>> data .. i.e. the sentOctets & recvdOctets are calculated for the
>> consolidated data
>> (including late-coming data, since the late coming data comes within 15
>> mins), however, I'm seeing 2 records for some of the data (i.e. key -
>> applianceName/timeslot/customer) i.e. the aggregated data is calculated
>> individually for the records and I see 2 records instead of single record
>> accounting for late coming data within watermark.
>>
>> What needs to be done to fix this & make this work as desired?
>>
>> tia!
>>
>>
>> Here is the Stackoverflow link as well -
>>
>>
>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected
>>
>>
>>
>>
;
> ```
>
> I was expecting that using the watermark will account for late coming data
> .. i.e. the sentOctets & recvdOctets are calculated for the consolidated
> data
> (including late-coming data, since the late coming data comes within 15
> mins), however, I'm seeing 2 records for some of the data (i.e. key -
> applianceName/timeslot/customer) i.e. the aggregated data is calculated
> individually for the records and I see 2 records instead of single record
> accounting for late coming data within watermark.
>
> What needs to be done to fix this & make this work as desired?
>
> tia!
>
>
> Here is the Stackoverflow link as well -
>
>
> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected
>
>
>
>
the data (i.e. key -
applianceName/timeslot/customer) i.e. the aggregated data is calculated
individually for the records and I see 2 records instead of single record
accounting for late coming data within watermark.
What needs to be done to fix this & make this work as desired?
tia!
Here is the Stackoverflow link as well -
https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected