Hi Mich -
Here is the output of the ldf.printSchema() & ldf.show() commands.

ldf.printSchema()

root
 |-- applianceName: string (nullable = true)
 |-- timeslot: long (nullable = true)
 |-- customer: string (nullable = true)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- sentOctets: long (nullable = true)
 |-- recvdOctets: long (nullable = true)


 ldf.show() :

 
+------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
|applianceName     |timeslot    |customer|window
                     |sentOctets|recvdOctets|
+------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
|abc1              |2797514    |cust1     |{2023-03-11 04:15:00, 2023-03-11
04:30:00}|21459264  |32211859   |
|pqrq              |2797513    |cust1     |{2023-03-11 04:15:00, 2023-03-11
04:30:00}|17775527  |31331093   |
|xyz                |2797514    |cust1     |{2023-03-11 04:15:00,
2023-03-11 04:30:00}|12808015  |24191707   |
+------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+

Also, any comment on the outputMode ? I've set it to 'update', since I'm
using aggregation.

thanks!

On Fri, Mar 10, 2023 at 10:55 AM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

>
> Just looking at the code
>
>
> in here
>
>
> ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>                                                              
> window(col("ts"), "15 minutes")) \
>                     .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>                     .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>                     .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
>                     .fillna(0)
>
> What does ldf.printSchema() returns
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 10 Mar 2023 at 07:16, karan alang <karan.al...@gmail.com> wrote:
>
>>
>> Hello All -
>>
>> I've a structured Streaming job which has a trigger of 10 minutes, and
>> I'm using watermark to account for late data coming in. However, the
>> watermark is not working - and instead of a single record with total
>> aggregated value, I see 2 records.
>>
>> Here is the code :
>>
>> ```
>>
>> 1) StructuredStreaming - Reading from Kafka every 10 mins
>>
>>
>>         df_stream = self.spark.readStream.format('kafka') \
>>             .option("kafka.security.protocol", "SSL") \
>>             .option("kafka.ssl.truststore.location", 
>> self.ssl_truststore_location) \
>>             .option("kafka.ssl.truststore.password", 
>> self.ssl_truststore_password) \
>>             .option("kafka.ssl.keystore.location", 
>> self.ssl_keystore_location_bandwidth_intermediate) \
>>             .option("kafka.ssl.keystore.password", 
>> self.ssl_keystore_password_bandwidth_intermediate) \
>>             .option("kafka.bootstrap.servers", self.kafkaBrokers) \
>>             .option("subscribe", topic) \
>>             .option("startingOffsets", "latest") \
>>             .option("failOnDataLoss", "false") \
>>             .option("kafka.metadata.max.age.ms", "1000") \
>>             .option("kafka.ssl.keystore.type", "PKCS12") \
>>             .option("kafka.ssl.truststore.type", "PKCS12") \
>>             .load()
>>
>> 2. calling foreachBatch(self.process)
>>         # note - outputMode is set to "update" (tried setting outputMode = 
>> append as well)
>>
>>         # 03/09 ::: outputMode - update instead of append
>>         query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", 
>> "topic").writeStream \
>>             .outputMode("update") \
>>             .trigger(processingTime='10 minutes') \
>>             .option("truncate", "false") \
>>             .option("checkpointLocation", self.checkpoint) \
>>             .foreachBatch(self.process) \
>>             .start()
>>
>>
>> self.process - where i do the bulk of the  processing, which calls the 
>> function  'aggIntfLogs'
>>
>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing  groupBy 
>> to calculate the sum of sentOctets & recvdOctets
>>
>>
>>         def aggIntfLogs(ldf):
>>             if ldf and ldf.count() > 0:
>>
>>                 ldf = ldf.select('applianceName', 'timeslot', 'sentOctets', 
>> 'recvdOctets','ts', 'customer') \
>>                     .withColumn('sentOctets', 
>> ldf["sentOctets"].cast(LongType())) \
>>                     .withColumn('recvdOctets', 
>> ldf["recvdOctets"].cast(LongType())) \
>>                     .withWatermark("ts", "15 minutes")
>>
>>                 ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>>                                                              
>> window(col("ts"), "15 minutes")) \
>>                     .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>>                     .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>>                     .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
>>                     .fillna(0)
>>                 return ldf
>>             return ldf
>>
>>
>>         Dataframe 'ldf' returned from the function aggIntfLogs - is written 
>> to Kafka topic
>>
>> ```
>>
>> I was expecting that using the watermark will account for late coming
>> data .. i.e. the sentOctets & recvdOctets are calculated for the
>> consolidated data
>> (including late-coming data, since the late coming data comes within 15
>> mins), however, I'm seeing 2 records for some of the data (i.e. key -
>> applianceName/timeslot/customer) i.e. the aggregated data is calculated
>> individually for the records and I see 2 records instead of single record
>> accounting for late coming data within watermark.
>>
>> What needs to be done to fix this & make this work as desired?
>>
>> tia!
>>
>>
>> Here is the Stackoverflow link as well -
>>
>>
>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected
>>
>>
>>
>>

Reply via email to