Hi Maxim,

thanks for the explanation. I think you can set a ValueState and a
ListState for the price and the purchase events, separately. On one hand,
when receiving a purchase event, you first check the price state. If it
exists, you just collect the PurchaseTotal result; otherwise you can
temporarily cache the event into the ListState. On the other hand, when
receiving a price event, you first update the state and check if there
exist some cached purchase events that need to be processed.

You can set a boolean flag in the function to avoid checking the purchase
state every time (since it takes extra time). Don't worry about the state
distribution problem. Flink will automatically divide them according to the
keys (productId in your example). For more information about the state, you
can refer to this document
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html>
.

Hope that helps.

Best,
Xingcan

On Fri, Nov 3, 2017 at 2:11 PM, Maxim Parkachov <lazy.gop...@gmail.com>
wrote:

> Hi Xingcan,
>
> On Fri, Nov 3, 2017 at 3:38 AM, Xingcan Cui <xingc...@gmail.com> wrote:
>
>> Hi Maxim,
>>
>> if I understand correctly, you actually need to JOIN the fast stream with
>> the slow stream. Could you please share more details about your problem?
>>
>
> Sure I can explain more, with some example of pseudo-code. I have external
> DB with price list with following structure:
>
> case class PriceList(productId, price)
>
> My events are purchase events with following structure:
>
> case class Purchase(productId, amount)
>
> I would like to get final stream with TotalAmount = Amount*Price in
> structure like this:
>
> case class PurchaseTotal(productId, totalAmount)
>
> I have 2 corresponding input streams:
>
> val prices = env.addSource(new PriceListSource).keyBy(_.productId)
> val purchases = env.addSource(new PurchaseSource).keyBy(_.productId)
>
> PriceListSource delivers me all CHANGES to external DB table.
>
> Calculate function looks similar to:
>
> class CalculateFunction extends CoProcessFunction[Purchase, PriceList,
> PurchaseTotal] {
>
>   private var price: ValueState[Int] = _
>
>   override def processElement1....... {
>     out.collect(PurchaseTotal(purchase.productId, purchase.amount *
> priceList.value))
>   }
>
>   override def processElement2....... {
>     price.update(priceList.value)
>   }
> }
>
> And finally pipeline:
>
> purchases.connect(prices).process(new CalculateFunction).print
>
> The issue is, when I start program my price ValueState is empty and will
> not be populated with data which is not updated in DB.
> BTW, I cannot use AsyncIO to query DB, because of several technical
> restrictions.
>
> 1. When you mentioned "they have the same key", did you mean all the data
>> get the same key or the logic should be applied with fast.key = slow.key?
>>
>
> I meant here that productId in purchase event is definitely exist in
> external price list DB (so, it is kind of inner join)
>
>
>> 2. What should be done to initialize the state?
>>
>
> I need to read external DB table and populate price ValueState before
> processing first purchase event.
>
> Hope this minimal example helps to understand.
> Maxim.
>
>
>>
>> Best,
>> Xingcan
>>
>>
>> On Fri, Nov 3, 2017 at 5:54 AM, Maxim Parkachov <lazy.gop...@gmail.com>
>> wrote:
>>
>>> Hi Flink users,
>>>
>>> I'm struggling with some basic concept and would appreciate some help.
>>> I have 2 Input streams, one is fast event stream and one is slow changing
>>> dimension. They have the same key and I use CoProcessFunction to store
>>> slow data in state and enrich fast data from this state. Everything
>>> works as expected.
>>>
>>> Before I start processing fast streams on first run, I would like to 
>>> completely
>>> initialise state. I though it could be done in open(), but I don't
>>> understand how it will be re-distributed across parallel operators.
>>>
>>> Another alternative would be to create custom source and push all slow 
>>> dimension
>>> data downstream, but I could not find how to hold processing fast data
>>> until state is initialised.
>>>
>>> I realise that FLIP-17 (Side Inputs) is what I need, but is there some other
>>> way to implement it now ?
>>>
>>> Thanks,
>>> Maxim.
>>>
>>>
>>
>
>

Reply via email to