Fyi .. apache spark version is 3.1.3
On Wed, Mar 15, 2023 at 4:34 PM karan alang <[email protected]> wrote:
> Hi Mich, this doesn't seem to be working for me .. the watermark seems to
> be getting ignored !
>
> Here is the data put into Kafka :
>
> ```
>
>
> +---------------------------------------------------------------------------------------------------+----+
>
> |value
> |key |
>
>
> +---------------------------------------------------------------------------------------------------+----+
>
>
> |{"temparature":14,"insert_ts":"2023-03-15T16:04:33.003-07:00","ts":"2023-03-15T15:12:00.000-07:00"}|null|
>
>
> |{"temparature":10,"insert_ts":"2023-03-15T16:05:58.816-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null|
>
>
> |{"temparature":17,"insert_ts":"2023-03-15T16:07:55.222-07:00","ts":"2023-03-15T16:12:00.000-07:00"}|null|
>
> |{"temparature":6,"insert_ts":"2023-03-15T16:11:41.759-07:00","ts":"2023-03-13T10:12:00.000-07:00"}
> |null|
>
>
> +---------------------------------------------------------------------------------------------------+----+
> ```
> Note :
> insert_ts - specifies when the data was inserted
>
> Here is the output of the Structured Stream:
>
> -------------------------------------------
>
> Batch: 2
>
> -------------------------------------------
>
> +-------------------+-------------------+---------------+
>
> |startOfWindowFrame |endOfWindowFrame |Sum_Temperature|
>
> +-------------------+-------------------+---------------+
>
> |2023-03-15 16:10:00|2023-03-15 16:15:00|27 |
>
> |2023-03-15 15:10:00|2023-03-15 15:15:00|14 |
>
> |2023-03-13 10:10:00|2023-03-13 10:15:00|6 |
>
> +-------------------+-------------------+---------------+
>
> Note: I'm summing up the temperatures (for easy verification)
>
> As per the above - all the 3 'ts' are included in the DataFrame, even when
> I added "ts":"2023-03-13T10:12:00.000-07:00", as the last record.
> Since the wattermark is set to "5 minutes" and the max(ts) ==
> 2023-03-15T16:12:00.000-07:00
> record with ts = "2023-03-13T10:12:00.000-07:00" should have got dropped,
> it is more than 2 days old (i.e. dated - 2023-03-13)!
>
> Any ideas what needs to be changed to make this work ?
>
> Here is the code (modified for my requirement, but essentially the same)
> ```
>
> schema = StructType([
> StructField("temparature", LongType(), False),
> StructField("ts", TimestampType(), False),
> StructField("insert_ts", TimestampType(), False)
> ])
>
> streamingDataFrame = spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers", kafkaBrokers) \
> .option("group.id", 'watermark-grp') \
> .option("subscribe", topic) \
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "latest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> schema=schema).alias("parsed_value"))
>
> resultC = streamingDataFrame.select(
> col("parsed_value.ts").alias("timestamp") \
> , col("parsed_value.temparature").alias("temparature"),
> col("parsed_value.insert_ts").alias("insert_ts"))
>
> resultM = resultC. \
> withWatermark("timestamp", "5 minutes"). \
> groupBy(window(resultC.timestamp, "5 minutes", "5 minutes")). \
> agg({'temparature':'sum'})
>
> resultMF = resultM. \
>
> select(col("window.start").alias("startOfWindowFrame"),col("window.end").alias("endOfWindowFrame")
> \
> , col("sum(temparature)").alias("Sum_Temperature"))
>
> result = resultMF. \
> writeStream. \
> outputMode('complete'). \
> option("numRows", 1000). \
> option("truncate", "false"). \
> format('console'). \
> option('checkpointLocation', checkpoint_path). \
> queryName("sum_temparature"). \
> start()
>
> result.awaitTermination()
>
> ```
>
>
>
> On Sun, Mar 12, 2023 at 3:13 AM Mich Talebzadeh <[email protected]>
> wrote:
>
>> OK
>>
>> ts is the timestamp right?
>>
>> This is a similar code that works out the average temperature with time
>> frame of 5 minutes.
>>
>> Note the comments and catch error with try:
>>
>> try:
>>
>> # construct a streaming dataframe streamingDataFrame that
>> subscribes to topic temperature
>> streamingDataFrame = self.spark \
>> .readStream \
>> .format("kafka") \
>> .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>> .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>> .option("group.id", config['common']['appName']) \
>> .option("zookeeper.connection.timeout.ms",
>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>> .option("rebalance.backoff.ms",
>> config['MDVariables']['rebalanceBackoffMS']) \
>> .option("zookeeper.session.timeout.ms",
>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>> .option("auto.commit.interval.ms",
>> config['MDVariables']['autoCommitIntervalMS']) \
>> .option("subscribe", "temperature") \
>> .option("failOnDataLoss", "false") \
>> .option("includeHeaders", "true") \
>> .option("startingOffsets", "latest") \
>> .load() \
>> .select(from_json(col("value").cast("string"),
>> schema).alias("parsed_value"))
>>
>>
>> resultC = streamingDataFrame.select( \
>> col("parsed_value.rowkey").alias("rowkey") \
>> , col("parsed_value.timestamp").alias("timestamp") \
>> , col("parsed_value.temperature").alias("temperature"))
>>
>> """
>> We work out the window and the AVG(temperature) in the
>> window's timeframe below
>> This should return back the following Dataframe as struct
>>
>> root
>> |-- window: struct (nullable = false)
>> | |-- start: timestamp (nullable = true)
>> | |-- end: timestamp (nullable = true)
>> |-- avg(temperature): double (nullable = true)
>>
>> """
>> resultM = resultC. \
>> withWatermark("timestamp", "5 minutes"). \
>> groupBy(window(resultC.timestamp, "5 minutes", "5
>> minutes")). \
>> avg('temperature')
>>
>> # We take the above Dataframe and flatten it to get the
>> columns aliased as "startOfWindowFrame", "endOfWindowFrame" and
>> "AVGTemperature"
>> resultMF = resultM. \
>> select( \
>>
>> F.col("window.start").alias("startOfWindowFrame") \
>> , F.col("window.end").alias("endOfWindowFrame")
>> \
>> ,
>> F.col("avg(temperature)").alias("AVGTemperature"))
>>
>> resultMF.printSchema()
>>
>> result = resultMF. \
>> writeStream. \
>> outputMode('complete'). \
>> option("numRows", 1000). \
>> option("truncate", "false"). \
>> format('console'). \
>> option('checkpointLocation', checkpoint_path). \
>> queryName("temperature"). \
>> start()
>>
>> except Exception as e:
>> print(f"""{e}, quitting""")
>> sys.exit(1)
>>
>>
>>
>> HTH
>>
>>
>> view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>> https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sat, 11 Mar 2023 at 04:33, karan alang <[email protected]> wrote:
>>
>>> Hi Mich -
>>> Here is the output of the ldf.printSchema() & ldf.show() commands.
>>>
>>> ldf.printSchema()
>>>
>>> root
>>> |-- applianceName: string (nullable = true)
>>> |-- timeslot: long (nullable = true)
>>> |-- customer: string (nullable = true)
>>> |-- window: struct (nullable = false)
>>> | |-- start: timestamp (nullable = true)
>>> | |-- end: timestamp (nullable = true)
>>> |-- sentOctets: long (nullable = true)
>>> |-- recvdOctets: long (nullable = true)
>>>
>>>
>>> ldf.show() :
>>>
>>>
>>>
>>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
>>> |applianceName |timeslot |customer|window
>>> |sentOctets|recvdOctets|
>>>
>>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
>>> |abc1 |2797514 |cust1 |{2023-03-11 04:15:00,
>>> 2023-03-11 04:30:00}|21459264 |32211859 |
>>> |pqrq |2797513 |cust1 |{2023-03-11 04:15:00,
>>> 2023-03-11 04:30:00}|17775527 |31331093 |
>>> |xyz |2797514 |cust1 |{2023-03-11 04:15:00,
>>> 2023-03-11 04:30:00}|12808015 |24191707 |
>>>
>>> +------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
>>>
>>> Also, any comment on the outputMode ? I've set it to 'update', since I'm
>>> using aggregation.
>>>
>>> thanks!
>>>
>>> On Fri, Mar 10, 2023 at 10:55 AM Mich Talebzadeh <
>>> [email protected]> wrote:
>>>
>>>>
>>>> Just looking at the code
>>>>
>>>>
>>>> in here
>>>>
>>>>
>>>> ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>>>>
>>>> window(col("ts"), "15 minutes")) \
>>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>>>> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
>>>> .fillna(0)
>>>>
>>>> What does ldf.printSchema() returns
>>>>
>>>>
>>>> HTH
>>>>
>>>>
>>>> view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>> https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, 10 Mar 2023 at 07:16, karan alang <[email protected]>
>>>> wrote:
>>>>
>>>>>
>>>>> Hello All -
>>>>>
>>>>> I've a structured Streaming job which has a trigger of 10 minutes, and
>>>>> I'm using watermark to account for late data coming in. However, the
>>>>> watermark is not working - and instead of a single record with total
>>>>> aggregated value, I see 2 records.
>>>>>
>>>>> Here is the code :
>>>>>
>>>>> ```
>>>>>
>>>>> 1) StructuredStreaming - Reading from Kafka every 10 mins
>>>>>
>>>>>
>>>>> df_stream = self.spark.readStream.format('kafka') \
>>>>> .option("kafka.security.protocol", "SSL") \
>>>>> .option("kafka.ssl.truststore.location",
>>>>> self.ssl_truststore_location) \
>>>>> .option("kafka.ssl.truststore.password",
>>>>> self.ssl_truststore_password) \
>>>>> .option("kafka.ssl.keystore.location",
>>>>> self.ssl_keystore_location_bandwidth_intermediate) \
>>>>> .option("kafka.ssl.keystore.password",
>>>>> self.ssl_keystore_password_bandwidth_intermediate) \
>>>>> .option("kafka.bootstrap.servers", self.kafkaBrokers) \
>>>>> .option("subscribe", topic) \
>>>>> .option("startingOffsets", "latest") \
>>>>> .option("failOnDataLoss", "false") \
>>>>> .option("kafka.metadata.max.age.ms", "1000") \
>>>>> .option("kafka.ssl.keystore.type", "PKCS12") \
>>>>> .option("kafka.ssl.truststore.type", "PKCS12") \
>>>>> .load()
>>>>>
>>>>> 2. calling foreachBatch(self.process)
>>>>> # note - outputMode is set to "update" (tried setting outputMode
>>>>> = append as well)
>>>>>
>>>>> # 03/09 ::: outputMode - update instead of append
>>>>> query = df_stream.selectExpr("CAST(value AS STRING)",
>>>>> "timestamp", "topic").writeStream \
>>>>> .outputMode("update") \
>>>>> .trigger(processingTime='10 minutes') \
>>>>> .option("truncate", "false") \
>>>>> .option("checkpointLocation", self.checkpoint) \
>>>>> .foreachBatch(self.process) \
>>>>> .start()
>>>>>
>>>>>
>>>>> self.process - where i do the bulk of the processing, which calls the
>>>>> function 'aggIntfLogs'
>>>>>
>>>>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing
>>>>> groupBy to calculate the sum of sentOctets & recvdOctets
>>>>>
>>>>>
>>>>> def aggIntfLogs(ldf):
>>>>> if ldf and ldf.count() > 0:
>>>>>
>>>>> ldf = ldf.select('applianceName', 'timeslot',
>>>>> 'sentOctets', 'recvdOctets','ts', 'customer') \
>>>>> .withColumn('sentOctets',
>>>>> ldf["sentOctets"].cast(LongType())) \
>>>>> .withColumn('recvdOctets',
>>>>> ldf["recvdOctets"].cast(LongType())) \
>>>>> .withWatermark("ts", "15 minutes")
>>>>>
>>>>> ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>>>>>
>>>>> window(col("ts"), "15 minutes")) \
>>>>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>>>>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>>>>> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets')
>>>>> \
>>>>> .fillna(0)
>>>>> return ldf
>>>>> return ldf
>>>>>
>>>>>
>>>>> Dataframe 'ldf' returned from the function aggIntfLogs - is
>>>>> written to Kafka topic
>>>>>
>>>>> ```
>>>>>
>>>>> I was expecting that using the watermark will account for late coming
>>>>> data .. i.e. the sentOctets & recvdOctets are calculated for the
>>>>> consolidated data
>>>>> (including late-coming data, since the late coming data comes within
>>>>> 15 mins), however, I'm seeing 2 records for some of the data (i.e. key -
>>>>> applianceName/timeslot/customer) i.e. the aggregated data is calculated
>>>>> individually for the records and I see 2 records instead of single record
>>>>> accounting for late coming data within watermark.
>>>>>
>>>>> What needs to be done to fix this & make this work as desired?
>>>>>
>>>>> tia!
>>>>>
>>>>>
>>>>> Here is the Stackoverflow link as well -
>>>>>
>>>>>
>>>>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected
>>>>>
>>>>>
>>>>>
>>>>>