Always nice to learn something new about jdbc. Thanks, Mich **thumbsup**
> On 3 May 2021, at 20:54, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > > > i would have assumed that reference data like device_id are pretty static so > a snapshot will do. > > JDBC connection is lazy so it will not materialise until the join uses it. > Then data will be collected from the underlying RDBMS table for COMMITED > transactions > > However, this is something that I discussed in another thread > > Spark Streaming with Files > > There is an option that one can trigger once > > result = streamingDataFrame.select( \ > col("parsed_value.rowkey").alias("rowkey") \ > , col("parsed_value.ticker").alias("ticker") \ > , col("parsed_value.timeissued").alias("timeissued") \ > , col("parsed_value.price").alias("price")). \ > writeStream. \ > outputMode('append'). \ > option("truncate", "false"). \ > foreachBatch(sendToSink). \ > queryName('trailFiles'). \ > trigger(once = True). \ > option('checkpointLocation', checkpoint_path). \ > start(data_path) > > This means that the streaming job will run for all data connected and > terminate. In that case JDBC connection will be refreshed according to your > batch interval that restarts the streaming process for unprocessed data and > critically your JDBC snapshot will be updated as read > > This can be done through airflow etc. You won't lose data as the checkpoint > will mark processed records. > > That might be an option. > > HTH > > > view my Linkedin profile > > > Disclaimer: Use it at your own risk. Any and all responsibility for any loss, > damage or destruction of data or any other property which may arise from > relying on this email's technical content is explicitly disclaimed. The > author will in no case be liable for any monetary damages arising from such > loss, damage or destruction. > > > > On Mon, 3 May 2021 at 18:27, "Yuri Oleynikov (יורי אולייניקוב)" > <yur...@gmail.com> wrote: >> You can do the enrichment with stream(events)-static(device table) join when >> the device table is slow changing dimension (let’s say once a day change) >> and it’s in delta format, then for every micro batch with stream-static John >> the device table will be rescanned and up to date device data will be loaded. >> >> If device table is not slow dimension(once an hour change), then you’d >> probably need stream-stream join but I’m not sure if RDBMS (aka jdbc) in >> Spark supports streaming mode. >> So I’d better sync jdbc with parquet/delta periodically in order to emulate >> streaming source >> >> >>>> On 3 May 2021, at 20:02, Eric Beabes <mailinglist...@gmail.com> wrote: >>>> >>> >>> 1) Device_id might be different for messages in a batch. >>> 2) It's a Streaming application. The IOT messages are getting read in a >>> Structured Streaming job in a "Stream". The Dataframe would need to be >>> updated every hour. Have you done something similar in the past? Do you >>> have an example to share? >>> >>>> On Mon, May 3, 2021 at 9:52 AM Mich Talebzadeh <mich.talebza...@gmail.com> >>>> wrote: >>>> Can you please clarify: >>>> >>>> The IOT messages in one batch have the same device_id or every row has >>>> different device_id? >>>> The RDBMS table can be read through JDBC in Spark and a dataframe can be >>>> created on. Does that work for you? You do not really need to stream the >>>> reference table. >>>> >>>> HTH >>>> >>>> >>>> >>>> view my Linkedin profile >>>> >>>> >>>> Disclaimer: Use it at your own risk. Any and all responsibility for any >>>> loss, damage or destruction of data or any other property which may arise >>>> from relying on this email's technical content is explicitly disclaimed. >>>> The author will in no case be liable for any monetary damages arising from >>>> such loss, damage or destruction. >>>> >>>> >>>> >>>>> On Mon, 3 May 2021 at 17:37, Eric Beabes <mailinglist...@gmail.com> wrote: >>>>> I would like to develop a Spark Structured Streaming job that reads >>>>> messages in a Stream which needs to be “joined” with another Stream of >>>>> “Reference” data. >>>>> >>>>> For example, let’s say I’m reading messages from Kafka coming in from >>>>> (lots of) IOT devices. This message has a ‘device_id’. We have a DEVICE >>>>> table on a relational database. What I need to do is “join” the >>>>> ‘device_id’ in the message with the ‘device_id’ on the table to enrich >>>>> the incoming message. Somewhere I read that, this can be done by joining >>>>> two streams. I guess, we can create a “Stream” that reads the DEVICE >>>>> table once every hour or so. >>>>> >>>>> Questions: >>>>> 1) Is this the right way to solve this use case? >>>>> 2) Should we use a Stateful Stream for reading DEVICE table with State >>>>> timeout set to an hour? >>>>> 3) What would happen while the DEVICE state is getting updated from the >>>>> table on the relational database? >>>>> >>>>> Guidance would be greatly appreciated. Thanks.