Always nice to learn something new about jdbc.
Thanks, Mich **thumbsup**

> On 3 May 2021, at 20:54, Mich Talebzadeh <mich.talebza...@gmail.com> wrote:
> 
> 
> i would have assumed that reference data like device_id are pretty static so 
> a snapshot will do.
> 
> JDBC connection is lazy so it will not materialise until the join uses it. 
> Then data will be collected from the underlying RDBMS table for COMMITED 
> transactions
> 
> However, this is something that I discussed in another thread
> 
> Spark Streaming with Files
> 
> There is an option that one can trigger once 
> 
>               result = streamingDataFrame.select( \
>                      col("parsed_value.rowkey").alias("rowkey") \
>                    , col("parsed_value.ticker").alias("ticker") \
>                    , col("parsed_value.timeissued").alias("timeissued") \
>                    , col("parsed_value.price").alias("price")). \
>                      writeStream. \
>                      outputMode('append'). \
>                      option("truncate", "false"). \
>                      foreachBatch(sendToSink). \
>                      queryName('trailFiles'). \
>                      trigger(once = True). \
>                      option('checkpointLocation', checkpoint_path). \
>                      start(data_path)
> 
> This means that the streaming job will run for all data connected and 
> terminate. In that case JDBC connection will be refreshed according to your 
> batch interval that restarts the streaming process for unprocessed data and 
> critically your JDBC snapshot will be updated as read
> 
> This can be done through airflow etc. You won't lose data as the checkpoint 
> will mark processed records.
> 
> That might be an option.
> 
> HTH
> 
> 
>    view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
> ‪On Mon, 3 May 2021 at 18:27, ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ 
> <yur...@gmail.com> wrote:‬
>> You can do the enrichment with stream(events)-static(device table) join when 
>> the device table is slow changing dimension (let’s say once a day change) 
>> and it’s in delta format, then for every micro batch with stream-static John 
>> the device table will be rescanned and up to date device data will be loaded.
>> 
>> If device table is not slow dimension(once an hour change), then you’d 
>> probably need stream-stream join but I’m not sure if RDBMS (aka jdbc) in 
>> Spark supports streaming mode.
>> So I’d better sync jdbc with parquet/delta periodically in order to emulate 
>> streaming source
>> 
>> 
>>>> On 3 May 2021, at 20:02, Eric Beabes <mailinglist...@gmail.com> wrote:
>>>> 
>>> 
>>> 1) Device_id might be different for messages in a batch.
>>> 2) It's a Streaming application. The IOT messages are getting read in a 
>>> Structured Streaming job in a "Stream". The Dataframe would need to be 
>>> updated every hour. Have you done something similar in the past? Do you 
>>> have an example to share?
>>> 
>>>> On Mon, May 3, 2021 at 9:52 AM Mich Talebzadeh <mich.talebza...@gmail.com> 
>>>> wrote:
>>>> Can you please clarify:
>>>> 
>>>> The IOT messages in one batch have the same device_id or every row has 
>>>> different device_id?
>>>> The RDBMS table can be read through JDBC in Spark and a dataframe can be 
>>>> created on. Does that work for you? You do not really need to stream the 
>>>> reference table. 
>>>> 
>>>> HTH
>>>> 
>>>>  
>>>> 
>>>>    view my Linkedin profile
>>>> 
>>>>  
>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>>> loss, damage or destruction of data or any other property which may arise 
>>>> from relying on this email's technical content is explicitly disclaimed. 
>>>> The author will in no case be liable for any monetary damages arising from 
>>>> such loss, damage or destruction.
>>>>  
>>>> 
>>>> 
>>>>> On Mon, 3 May 2021 at 17:37, Eric Beabes <mailinglist...@gmail.com> wrote:
>>>>> I would like to develop a Spark Structured Streaming job that reads 
>>>>> messages in a Stream which needs to be “joined” with another Stream of 
>>>>> “Reference” data.
>>>>> 
>>>>> For example, let’s say I’m reading messages from Kafka coming in from 
>>>>> (lots of) IOT devices. This message has a ‘device_id’. We have a DEVICE 
>>>>> table on a relational database. What I need to do is “join” the 
>>>>> ‘device_id’ in the message with the ‘device_id’ on the table to enrich 
>>>>> the incoming message. Somewhere I read that, this can be done by joining 
>>>>> two streams. I guess, we can create a “Stream” that reads the DEVICE 
>>>>> table once every hour or so. 
>>>>> 
>>>>> Questions:
>>>>> 1) Is this the right way to solve this use case? 
>>>>> 2) Should we use a Stateful Stream for reading DEVICE table with State 
>>>>> timeout set to an hour?
>>>>> 3) What would happen while the DEVICE state is getting updated from the 
>>>>> table on the relational database?
>>>>> 
>>>>> Guidance would be greatly appreciated. Thanks.

Reply via email to