I was looking for something like "Processing Time Temporal Join" in Flink as described here:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#processing-time-temporal-join On Mon, May 3, 2021 at 11:07 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > You are welcome Yuri. However, I stand corrected :) > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Mon, 3 May 2021 at 19:02, "Yuri Oleynikov (יורי אולייניקוב)" < > yur...@gmail.com> wrote: > >> Always nice to learn something new about jdbc. >> Thanks, Mich **thumbsup** >> >> >> On 3 May 2021, at 20:54, Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >> >> i would have assumed that reference data like device_id are pretty static >> so a snapshot will do. >> >> JDBC connection is lazy so it will not materialise until the join uses >> it. Then data will be collected from the underlying RDBMS table for >> COMMITED transactions >> >> However, this is something that I discussed in another thread >> >> *Spark Streaming with Files* >> >> There is an option that one can trigger once >> >> result = streamingDataFrame.select( \ >> col("parsed_value.rowkey").alias("rowkey") \ >> , col("parsed_value.ticker").alias("ticker") \ >> , col("parsed_value.timeissued").alias("timeissued") \ >> , col("parsed_value.price").alias("price")). \ >> writeStream. \ >> outputMode('append'). \ >> option("truncate", "false"). \ >> foreachBatch(sendToSink). \ >> queryName('trailFiles'). \ >> * trigger(once = True). \* >> * option('checkpointLocation', checkpoint_path). \* >> start(data_path) >> >> This means that the streaming job will run for all data connected and >> terminate. In that case JDBC connection will be refreshed according to your >> batch interval that restarts the streaming process for unprocessed data and >> critically your JDBC snapshot will be updated as read >> >> This can be done through airflow etc. You won't lose data as the >> checkpoint will mark processed records. >> >> That might be an option. >> >> HTH >> >> >> view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Mon, 3 May 2021 at 18:27, "Yuri Oleynikov (יורי אולייניקוב)" < >> yur...@gmail.com> wrote: >> >>> You can do the enrichment with stream(events)-static(device table) join >>> when the device table is slow changing dimension (let’s say once a day >>> change) and it’s in delta format, then for every micro batch with >>> stream-static John the device table will be rescanned and up to date device >>> data will be loaded. >>> >>> If device table is not slow dimension(once an hour change), then you’d >>> probably need stream-stream join but I’m not sure if RDBMS (aka jdbc) in >>> Spark supports streaming mode. >>> So I’d better sync jdbc with parquet/delta periodically in order to >>> emulate streaming source >>> >>> >>> On 3 May 2021, at 20:02, Eric Beabes <mailinglist...@gmail.com> wrote: >>> >>> >>> 1) Device_id might be different for messages in a batch. >>> 2) It's a Streaming application. The IOT messages are getting read in a >>> Structured Streaming job in a "Stream". The Dataframe would need to be >>> updated every hour. Have you done something similar in the past? Do you >>> have an example to share? >>> >>> On Mon, May 3, 2021 at 9:52 AM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> Can you please clarify: >>>> >>>> >>>> 1. The IOT messages in one batch have the same device_id or every >>>> row has different device_id? >>>> 2. The RDBMS table can be read through JDBC in Spark and a >>>> dataframe can be created on. Does that work for you? You do not really >>>> need >>>> to stream the reference table. >>>> >>>> >>>> HTH >>>> >>>> >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Mon, 3 May 2021 at 17:37, Eric Beabes <mailinglist...@gmail.com> >>>> wrote: >>>> >>>>> I would like to develop a Spark Structured Streaming job that reads >>>>> messages in a Stream which needs to be “joined” with another Stream of >>>>> “Reference” data. >>>>> >>>>> For example, let’s say I’m reading messages from Kafka coming in from >>>>> (lots of) IOT devices. This message has a ‘device_id’. We have a DEVICE >>>>> table on a relational database. What I need to do is “join” the >>>>> ‘device_id’ >>>>> in the message with the ‘device_id’ on the table to enrich the incoming >>>>> message. Somewhere I read that, this can be done by joining two streams. I >>>>> guess, we can create a “Stream” that reads the DEVICE table once every >>>>> hour >>>>> or so. >>>>> >>>>> Questions: >>>>> 1) Is this the right way to solve this use case? >>>>> 2) Should we use a Stateful Stream for reading DEVICE table with State >>>>> timeout set to an hour? >>>>> 3) What would happen while the DEVICE state is getting updated from >>>>> the table on the relational database? >>>>> >>>>> Guidance would be greatly appreciated. Thanks. >>>>> >>>>