BTW, 
We got around bootstrap problem for similar use case using a “nohup” topic as 
input stream. Our CICD pipeline currently passes an initialize option to app IF 
there is a need to bootstrap and waits for X minutes before taking a savepoint 
and restart app normally listening to right topic(s). I believe there is work 
underway to handle this gracefully using Side Input as well. Other than 
determining X minutes for initialization to complete, we havent had any issue 
with this solution - we have over 40 million states refreshes daily and close 
to 200Mbps input streams being joined to states.
Hope this helps!


- Ashish

On Tuesday, July 24, 2018, 11:37 AM, Elias Levy <fearsome.lucid...@gmail.com> 
wrote:

Alas, this suffer from the bootstrap problem.  At the moment Flink does not 
allow you to pause a source (the positions), so you can't fully consume the and 
preload the accounts or products to perform the join before the positions start 
flowing.  Additionally, Flink SQL does not support materializing an upset table 
for the accounts or products to perform the join, so yo have to develop your 
own KeyedProcessFunction, maintain the state, and perform the join on your own 
if you only want to join against the latest value for each key.
On Tue, Jul 24, 2018 at 7:27 AM Till Rohrmann <trohrm...@apache.org> wrote:

Yes, using Kafka which you initialize with the initial values and then feed 
changes to the Kafka topic from which you consume could be a solution.
On Tue, Jul 24, 2018 at 3:58 PM Harshvardhan Agrawal 
<harshvardhan.ag...@gmail.com> wrote:

Hi Till,
How would we do the initial hydration of the Product and Account data since 
it’s currently in a relational DB? Do we have to copy over data to Kafka and 
then use them? 
Regards,Harsh
On Tue, Jul 24, 2018 at 09:22 Till Rohrmann <trohrm...@apache.org> wrote:

Hi Harshvardhan,
I agree with Ankit that this problem could actually be solved quite elegantly 
with Flink's state. If you can ingest the product/account information changes 
as a stream, you can keep the latest version of it in Flink state by using a 
co-map function [1, 2]. One input of the co-map function would be the 
product/account update stream which updates the respective entries in Flink's 
state and the other input stream is the one to be enriched. When receiving 
input from this stream one would lookup the latest information contained in the 
operator's state and join it with the incoming event.
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/[2] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.html
Cheers,Till
On Tue, Jul 24, 2018 at 2:15 PM Harshvardhan Agrawal 
<harshvardhan.ag...@gmail.com> wrote:

Hi,
Thanks for your responses.
There is no fixed interval for the data being updated. It’s more like whenever 
you onboard a new product or there are any mandates that change will trigger 
the reference data to change.
It’s not just the enrichment we are doing here. Once we have enriched the data 
we will be performing a bunch of aggregations using the enriched data. 
Which approach would you recommend?
Regards,Harshvardhan
On Tue, Jul 24, 2018 at 04:04 Jain, Ankit <ankit.j...@here.com> wrote:


How often is the product db updated? Based on that you can store product 
metadata as state in Flink, maybe setup the state on cluster startup and then 
update daily etc.

 

Also, just based on this feature, flink doesn’t seem to add a lot of value on 
top of Kafka. As Jorn said below, you can very well store all the events in an 
external store and then periodically run a cron to enrich later since your 
processing doesn’t seem to require absolute real time.

 

Thanks

Ankit

 

From: Jörn Franke <jornfra...@gmail.com>
Date: Monday, July 23, 2018 at 10:10 PM
To: Harshvardhan Agrawal <harshvardhan.ag...@gmail.com>
Cc: <user@flink.apache.org>
Subject: Re: Implement Joins with Lookup Data

 

For the first one (lookup of single entries) you could use a NoSQL db (eg key 
value store) - a relational database will not scale.

 

Depending on when you need to do the enrichment you could also first store the 
data and enrich it later as part of a batch process. 


On 24. Jul 2018, at 05:25, Harshvardhan Agrawal <harshvardhan.ag...@gmail.com> 
wrote:


Hi,

 

We are using Flink for financial data enrichment and aggregations. We have 
Positions data that we are currently receiving from Kafka. We want to enrich 
that data with reference data like Product and Account information that is 
present in a relational database. From my understanding of Flink so far I think 
there are two ways to achieve this. Here are two ways to do it:

 

1) First Approach:

a) Get positions from Kafka and key by product key.

b) Perform lookup from the database for each key and then obtain 
Tuple2<Position, Product>

 

2) Second Approach:

a) Get positions from Kafka and key by product key.

b) Window the keyed stream into say 15 seconds each.

c) For each window get the unique product keys and perform a single lookup.

d) Somehow join Positions and Products

 

In the first approach we will be making a lot of calls to the DB and the 
solution is very chatty. Its hard to scale this cos the database storing the 
reference data might not be very responsive.

 

In the second approach, I wish to join the WindowedStream with the 
SingleOutputStream and turns out I can't join a windowed stream. So I am not 
quite sure how to do that. 

 

I wanted an opinion for what is the right thing to do. Should I go with the 
first approach or the second one. If the second one, how can I implement the 
join?


 

-- 

Regards,
Harshvardhan Agrawal


-- 
Regards,
Harshvardhan

-- 
Regards,
Harshvardhan




Reply via email to