Re: Spark StructuredStreaming - watermark not working as expected

2023-03-17 Thread karan alang
;>>>> >>>>>> >>>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>>> >>>>>> >>>>>> >>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>> for any loss, damage or destruction of data or any other property which >>>>>> may >>>>>> arise from relying on this email's technical content is explicitly >>>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>>> arising from such loss, damage or destruction. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, 10 Mar 2023 at 07:16, karan alang >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> Hello All - >>>>>>> >>>>>>> I've a structured Streaming job which has a trigger of 10 minutes, >>>>>>> and I'm using watermark to account for late data coming in. However, the >>>>>>> watermark is not working - and instead of a single record with total >>>>>>> aggregated value, I see 2 records. >>>>>>> >>>>>>> Here is the code : >>>>>>> >>>>>>> ``` >>>>>>> >>>>>>> 1) StructuredStreaming - Reading from Kafka every 10 mins >>>>>>> >>>>>>> >>>>>>> df_stream = self.spark.readStream.format('kafka') \ >>>>>>> .option("kafka.security.protocol", "SSL") \ >>>>>>> .option("kafka.ssl.truststore.location", >>>>>>> self.ssl_truststore_location) \ >>>>>>> .option("kafka.ssl.truststore.password", >>>>>>> self.ssl_truststore_password) \ >>>>>>> .option("kafka.ssl.keystore.location", >>>>>>> self.ssl_keystore_location_bandwidth_intermediate) \ >>>>>>> .option("kafka.ssl.keystore.password", >>>>>>> self.ssl_keystore_password_bandwidth_intermediate) \ >>>>>>> .option("kafka.bootstrap.servers", self.kafkaBrokers) \ >>>>>>> .option("subscribe", topic) \ >>>>>>> .option("startingOffsets", "latest") \ >>>>>>> .option("failOnDataLoss", "false") \ >>>>>>> .option("kafka.metadata.max.age.ms", "1000") \ >>>>>>> .option("kafka.ssl.keystore.type", "PKCS12") \ >>>>>>> .option("kafka.ssl.truststore.type", "PKCS12") \ >>>>>>> .load() >>>>>>> >>>>>>> 2. calling foreachBatch(self.process) >>>>>>> # note - outputMode is set to "update" (tried setting >>>>>>> outputMode = append as well) >>>>>>> >>>>>>> # 03/09 ::: outputMode - update instead of append >>>>>>> query = df_stream.selectExpr("CAST(value AS STRING)", >>>>>>> "timestamp", "topic").writeStream \ >>>>>>> .outputMode("update") \ >>>>>>> .trigger(processingTime='10 minutes') \ >>>>>>> .option("truncate", "false") \ >>>>>>> .option("checkpointLocation", self.checkpoint) \ >>>>>>> .foreachBatch(self.process) \ >>>>>>> .start() >>>>>>> >>>>>>> >>>>>>> self.process - where i do the bulk of the processing, which calls the >>>>>>> function 'aggIntfLogs' >>>>>>> >>>>>>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing >>>>>>> groupBy to calculate the sum of sentOctets & recvdOctets >>>>>>> >>>>>>> >>>>>>> def aggIntfLogs(ldf): >>>>>>> if ldf and ldf.count() > 0: >>>>>>> >>>>>>> ldf = ldf.select('applianceName', 'timeslot', >>>>>>> 'sentOctets', 'recvdOctets','ts', 'customer') \ >>>>>>> .withColumn('sentOctets', >>>>>>> ldf["sentOctets"].cast(LongType())) \ >>>>>>> .withColumn('recvdOctets', >>>>>>> ldf["recvdOctets"].cast(LongType())) \ >>>>>>> .withWatermark("ts", "15 minutes") >>>>>>> >>>>>>> ldf = ldf.groupBy("applianceName", "timeslot", >>>>>>> "customer", >>>>>>> >>>>>>> window(col("ts"), "15 minutes")) \ >>>>>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \ >>>>>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') >>>>>>> \ >>>>>>> .withColumnRenamed('sum(recvdOctets)', >>>>>>> 'recvdOctets') \ >>>>>>> .fillna(0) >>>>>>> return ldf >>>>>>> return ldf >>>>>>> >>>>>>> >>>>>>> Dataframe 'ldf' returned from the function aggIntfLogs - is >>>>>>> written to Kafka topic >>>>>>> >>>>>>> ``` >>>>>>> >>>>>>> I was expecting that using the watermark will account for late >>>>>>> coming data .. i.e. the sentOctets & recvdOctets are calculated for the >>>>>>> consolidated data >>>>>>> (including late-coming data, since the late coming data comes within >>>>>>> 15 mins), however, I'm seeing 2 records for some of the data (i.e. key - >>>>>>> applianceName/timeslot/customer) i.e. the aggregated data is calculated >>>>>>> individually for the records and I see 2 records instead of single >>>>>>> record >>>>>>> accounting for late coming data within watermark. >>>>>>> >>>>>>> What needs to be done to fix this & make this work as desired? >>>>>>> >>>>>>> tia! >>>>>>> >>>>>>> >>>>>>> Here is the Stackoverflow link as well - >>>>>>> >>>>>>> >>>>>>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected >>>>>>> >>>>>>> >>>>>>> >>>>>>>

Re: Spark StructuredStreaming - watermark not working as expected

2023-03-17 Thread Mich Talebzadeh
t;> I've a structured Streaming job which has a trigger of 10 minutes, >>>>>> and I'm using watermark to account for late data coming in. However, the >>>>>> watermark is not working - and instead of a single record with total >>>>>> aggregated value, I see 2 records. >>>>>> >>>>>> Here is the code : >>>>>> >>>>>> ``` >>>>>> >>>>>> 1) StructuredStreaming - Reading from Kafka every 10 mins >>>>>> >>>>>> >>>>>> df_stream = self.spark.readStream.format('kafka') \ >>>>>> .option("kafka.security.protocol", "SSL") \ >>>>>> .option("kafka.ssl.truststore.location", >>>>>> self.ssl_truststore_location) \ >>>>>> .option("kafka.ssl.truststore.password", >>>>>> self.ssl_truststore_password) \ >>>>>> .option("kafka.ssl.keystore.location", >>>>>> self.ssl_keystore_location_bandwidth_intermediate) \ >>>>>> .option("kafka.ssl.keystore.password", >>>>>> self.ssl_keystore_password_bandwidth_intermediate) \ >>>>>> .option("kafka.bootstrap.servers", self.kafkaBrokers) \ >>>>>> .option("subscribe", topic) \ >>>>>> .option("startingOffsets", "latest") \ >>>>>> .option("failOnDataLoss", "false") \ >>>>>> .option("kafka.metadata.max.age.ms", "1000") \ >>>>>> .option("kafka.ssl.keystore.type", "PKCS12") \ >>>>>> .option("kafka.ssl.truststore.type", "PKCS12") \ >>>>>> .load() >>>>>> >>>>>> 2. calling foreachBatch(self.process) >>>>>> # note - outputMode is set to "update" (tried setting outputMode >>>>>> = append as well) >>>>>> >>>>>> # 03/09 ::: outputMode - update instead of append >>>>>> query = df_stream.selectExpr("CAST(value AS STRING)", >>>>>> "timestamp", "topic").writeStream \ >>>>>> .outputMode("update") \ >>>>>> .trigger(processingTime='10 minutes') \ >>>>>> .option("truncate", "false") \ >>>>>> .option("checkpointLocation", self.checkpoint) \ >>>>>> .foreachBatch(self.process) \ >>>>>> .start() >>>>>> >>>>>> >>>>>> self.process - where i do the bulk of the processing, which calls the >>>>>> function 'aggIntfLogs' >>>>>> >>>>>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing >>>>>> groupBy to calculate the sum of sentOctets & recvdOctets >>>>>> >>>>>> >>>>>> def aggIntfLogs(ldf): >>>>>> if ldf and ldf.count() > 0: >>>>>> >>>>>> ldf = ldf.select('applianceName', 'timeslot', >>>>>> 'sentOctets', 'recvdOctets','ts', 'customer') \ >>>>>> .withColumn('sentOctets', >>>>>> ldf["sentOctets"].cast(LongType())) \ >>>>>> .withColumn('recvdOctets', >>>>>> ldf["recvdOctets"].cast(LongType())) \ >>>>>> .withWatermark("ts", "15 minutes") >>>>>> >>>>>> ldf = ldf.groupBy("applianceName", "timeslot", >>>>>> "customer", >>>>>> >>>>>> window(col("ts"), "15 minutes")) \ >>>>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \ >>>>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \ >>>>>> .withColumnRenamed('sum(recvdOctets)', >>>>>> 'recvdOctets') \ >>>>>> .fillna(0) >>>>>> return ldf >>>>>> return ldf >>>>>> >>>>>> >>>>>> Dataframe 'ldf' returned from the function aggIntfLogs - is >>>>>> written to Kafka topic >>>>>> >>>>>> ``` >>>>>> >>>>>> I was expecting that using the watermark will account for late coming >>>>>> data .. i.e. the sentOctets & recvdOctets are calculated for the >>>>>> consolidated data >>>>>> (including late-coming data, since the late coming data comes within >>>>>> 15 mins), however, I'm seeing 2 records for some of the data (i.e. key - >>>>>> applianceName/timeslot/customer) i.e. the aggregated data is calculated >>>>>> individually for the records and I see 2 records instead of single record >>>>>> accounting for late coming data within watermark. >>>>>> >>>>>> What needs to be done to fix this & make this work as desired? >>>>>> >>>>>> tia! >>>>>> >>>>>> >>>>>> Here is the Stackoverflow link as well - >>>>>> >>>>>> >>>>>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected >>>>>> >>>>>> >>>>>> >>>>>>

Re: Spark StructuredStreaming - watermark not working as expected

2023-03-16 Thread karan alang
t;>> .option("kafka.ssl.truststore.password", >>>>> self.ssl_truststore_password) \ >>>>> .option("kafka.ssl.keystore.location", >>>>> self.ssl_keystore_location_bandwidth_intermediate) \ >>>>> .option("kafka.ssl.keystore.password", >>>>> self.ssl_keystore_password_bandwidth_intermediate) \ >>>>> .option("kafka.bootstrap.servers", self.kafkaBrokers) \ >>>>> .option("subscribe", topic) \ >>>>> .option("startingOffsets", "latest") \ >>>>> .option("failOnDataLoss", "false") \ >>>>> .option("kafka.metadata.max.age.ms", "1000") \ >>>>> .option("kafka.ssl.keystore.type", "PKCS12") \ >>>>> .option("kafka.ssl.truststore.type", "PKCS12") \ >>>>> .load() >>>>> >>>>> 2. calling foreachBatch(self.process) >>>>> # note - outputMode is set to "update" (tried setting outputMode >>>>> = append as well) >>>>> >>>>> # 03/09 ::: outputMode - update instead of append >>>>> query = df_stream.selectExpr("CAST(value AS STRING)", >>>>> "timestamp", "topic").writeStream \ >>>>> .outputMode("update") \ >>>>> .trigger(processingTime='10 minutes') \ >>>>> .option("truncate", "false") \ >>>>> .option("checkpointLocation", self.checkpoint) \ >>>>> .foreachBatch(self.process) \ >>>>> .start() >>>>> >>>>> >>>>> self.process - where i do the bulk of the processing, which calls the >>>>> function 'aggIntfLogs' >>>>> >>>>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing >>>>> groupBy to calculate the sum of sentOctets & recvdOctets >>>>> >>>>> >>>>> def aggIntfLogs(ldf): >>>>> if ldf and ldf.count() > 0: >>>>> >>>>> ldf = ldf.select('applianceName', 'timeslot', >>>>> 'sentOctets', 'recvdOctets','ts', 'customer') \ >>>>> .withColumn('sentOctets', >>>>> ldf["sentOctets"].cast(LongType())) \ >>>>> .withColumn('recvdOctets', >>>>> ldf["recvdOctets"].cast(LongType())) \ >>>>> .withWatermark("ts", "15 minutes") >>>>> >>>>> ldf = ldf.groupBy("applianceName", "timeslot", "customer", >>>>> >>>>> window(col("ts"), "15 minutes")) \ >>>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \ >>>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \ >>>>> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') >>>>> \ >>>>> .fillna(0) >>>>> return ldf >>>>> return ldf >>>>> >>>>> >>>>> Dataframe 'ldf' returned from the function aggIntfLogs - is >>>>> written to Kafka topic >>>>> >>>>> ``` >>>>> >>>>> I was expecting that using the watermark will account for late coming >>>>> data .. i.e. the sentOctets & recvdOctets are calculated for the >>>>> consolidated data >>>>> (including late-coming data, since the late coming data comes within >>>>> 15 mins), however, I'm seeing 2 records for some of the data (i.e. key - >>>>> applianceName/timeslot/customer) i.e. the aggregated data is calculated >>>>> individually for the records and I see 2 records instead of single record >>>>> accounting for late coming data within watermark. >>>>> >>>>> What needs to be done to fix this & make this work as desired? >>>>> >>>>> tia! >>>>> >>>>> >>>>> Here is the Stackoverflow link as well - >>>>> >>>>> >>>>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected >>>>> >>>>> >>>>> >>>>>

Re: Spark StructuredStreaming - watermark not working as expected

2023-03-15 Thread karan alang
.option("kafka.ssl.truststore.type", "PKCS12") \ >>>> .load() >>>> >>>> 2. calling foreachBatch(self.process) >>>> # note - outputMode is set to "update" (tried setting outputMode = >>>> append as well) >>>> >>>> # 03/09 ::: outputMode - update instead of append >>>> query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", >>>> "topic").writeStream \ >>>> .outputMode("update") \ >>>> .trigger(processingTime='10 minutes') \ >>>> .option("truncate", "false") \ >>>> .option("checkpointLocation", self.checkpoint) \ >>>> .foreachBatch(self.process) \ >>>> .start() >>>> >>>> >>>> self.process - where i do the bulk of the processing, which calls the >>>> function 'aggIntfLogs' >>>> >>>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing >>>> groupBy to calculate the sum of sentOctets & recvdOctets >>>> >>>> >>>> def aggIntfLogs(ldf): >>>> if ldf and ldf.count() > 0: >>>> >>>> ldf = ldf.select('applianceName', 'timeslot', >>>> 'sentOctets', 'recvdOctets','ts', 'customer') \ >>>> .withColumn('sentOctets', >>>> ldf["sentOctets"].cast(LongType())) \ >>>> .withColumn('recvdOctets', >>>> ldf["recvdOctets"].cast(LongType())) \ >>>> .withWatermark("ts", "15 minutes") >>>> >>>> ldf = ldf.groupBy("applianceName", "timeslot", "customer", >>>> >>>> window(col("ts"), "15 minutes")) \ >>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \ >>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \ >>>> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \ >>>> .fillna(0) >>>> return ldf >>>> return ldf >>>> >>>> >>>> Dataframe 'ldf' returned from the function aggIntfLogs - is >>>> written to Kafka topic >>>> >>>> ``` >>>> >>>> I was expecting that using the watermark will account for late coming >>>> data .. i.e. the sentOctets & recvdOctets are calculated for the >>>> consolidated data >>>> (including late-coming data, since the late coming data comes within 15 >>>> mins), however, I'm seeing 2 records for some of the data (i.e. key - >>>> applianceName/timeslot/customer) i.e. the aggregated data is calculated >>>> individually for the records and I see 2 records instead of single record >>>> accounting for late coming data within watermark. >>>> >>>> What needs to be done to fix this & make this work as desired? >>>> >>>> tia! >>>> >>>> >>>> Here is the Stackoverflow link as well - >>>> >>>> >>>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected >>>> >>>> >>>> >>>>

Re: Spark StructuredStreaming - watermark not working as expected

2023-03-12 Thread Mich Talebzadeh
nd ldf.count() > 0: >>> >>> ldf = ldf.select('applianceName', 'timeslot', 'sentOctets', >>> 'recvdOctets','ts', 'customer') \ >>> .withColumn('sentOctets', >>> ldf["sentOctets"].cast(LongType())) \ >>> .withColumn('recvdOctets', >>> ldf["recvdOctets"].cast(LongType())) \ >>> .withWatermark("ts", "15 minutes") >>> >>> ldf = ldf.groupBy("applianceName", "timeslot", "customer", >>> >>> window(col("ts"), "15 minutes")) \ >>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \ >>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \ >>> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \ >>> .fillna(0) >>> return ldf >>> return ldf >>> >>> >>> Dataframe 'ldf' returned from the function aggIntfLogs - is written >>> to Kafka topic >>> >>> ``` >>> >>> I was expecting that using the watermark will account for late coming >>> data .. i.e. the sentOctets & recvdOctets are calculated for the >>> consolidated data >>> (including late-coming data, since the late coming data comes within 15 >>> mins), however, I'm seeing 2 records for some of the data (i.e. key - >>> applianceName/timeslot/customer) i.e. the aggregated data is calculated >>> individually for the records and I see 2 records instead of single record >>> accounting for late coming data within watermark. >>> >>> What needs to be done to fix this & make this work as desired? >>> >>> tia! >>> >>> >>> Here is the Stackoverflow link as well - >>> >>> >>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected >>> >>> >>> >>>

Re: Spark StructuredStreaming - watermark not working as expected

2023-03-10 Thread karan alang
.agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \ >> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \ >> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \ >> .fillna(0) >> return ldf >> return ldf >> >> >> Dataframe 'ldf' returned from the function aggIntfLogs - is written >> to Kafka topic >> >> ``` >> >> I was expecting that using the watermark will account for late coming >> data .. i.e. the sentOctets & recvdOctets are calculated for the >> consolidated data >> (including late-coming data, since the late coming data comes within 15 >> mins), however, I'm seeing 2 records for some of the data (i.e. key - >> applianceName/timeslot/customer) i.e. the aggregated data is calculated >> individually for the records and I see 2 records instead of single record >> accounting for late coming data within watermark. >> >> What needs to be done to fix this & make this work as desired? >> >> tia! >> >> >> Here is the Stackoverflow link as well - >> >> >> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected >> >> >> >>

Re: Spark StructuredStreaming - watermark not working as expected

2023-03-10 Thread Mich Talebzadeh
; > ``` > > I was expecting that using the watermark will account for late coming data > .. i.e. the sentOctets & recvdOctets are calculated for the consolidated > data > (including late-coming data, since the late coming data comes within 15 > mins), however, I'm seeing 2 records for some of the data (i.e. key - > applianceName/timeslot/customer) i.e. the aggregated data is calculated > individually for the records and I see 2 records instead of single record > accounting for late coming data within watermark. > > What needs to be done to fix this & make this work as desired? > > tia! > > > Here is the Stackoverflow link as well - > > > https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected > > > >

Spark StructuredStreaming - watermark not working as expected

2023-03-09 Thread karan alang
the data (i.e. key - applianceName/timeslot/customer) i.e. the aggregated data is calculated individually for the records and I see 2 records instead of single record accounting for late coming data within watermark. What needs to be done to fix this & make this work as desired? tia! Here is the Stackoverflow link as well - https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected