It’s more like I have a window and I create a side input for that window.
Once I am done with processing the window I want to discard that side input
and create a new ones for subsequent windows.

On Tue, May 15, 2018 at 21:54 Harshvardhan Agrawal <
[email protected]> wrote:

> In my case since I am performing a lookup for some reference data that can
> change periodically I can’t really have a global window. I would want to
> update/re-lookup data per window.
>
> On Tue, May 15, 2018 at 21:06 Raghu Angadi <[email protected]> wrote:
>
>> You are applying windowing to 'billingDataPairs' in the example above.
>> Side input pairs with all the main input windows that exactly match or
>> completely fall within the side input window. Common use case is a side
>> input defined in default global window and it matches all the main input
>> windows.
>>
>>
>>
>> On Tue, May 15, 2018 at 4:40 PM Harshvardhan Agrawal <
>> [email protected]> wrote:
>>
>>> Got it.
>>>
>>> Since I am not applying any windowing strategy to the side input, does
>>> beam automatically pickup the windowing strategy for the side inputs from
>>> the main input? By that I mean the scope of the side input would be a per
>>> window one and it would be different for every window. Is that correct?
>>>
>>> Regards,
>>> Harsh
>>>
>>> On Tue, May 15, 2018 at 17:54 Lukasz Cwik <[email protected]> wrote:
>>>
>>>> Using deduplicate + side inputs will allow you to have a consistent
>>>> view of the account information for the entire window which can be nice
>>>> since it gives consistent processing semantics but using a simple in memory
>>>> cache to reduce the amount of lookups will likely be much easier to debug
>>>> and simpler to implement and maintain.
>>>>
>>>> On Tue, May 15, 2018 at 2:31 PM Harshvardhan Agrawal <
>>>> [email protected]> wrote:
>>>>
>>>>> Thanks Raghu!
>>>>>
>>>>> Lukasz,
>>>>>
>>>>> Do you think lookups would be a better option than side inputs in my
>>>>> case?
>>>>>
>>>>>
>>>>> On Tue, May 15, 2018 at 16:33 Raghu Angadi <[email protected]> wrote:
>>>>>
>>>>>> It should work. I think you need apply Distinct before looking up
>>>>>> account info :
>>>>>> billingDataPairs.apply(Keys.create()).apply(Distinct.create()).apply("LookupAccounts",
>>>>>> ...).
>>>>>> Note that all of the accounts are stored in single in-memory map. It
>>>>>> should be small enough for that.
>>>>>>
>>>>>> On Tue, May 15, 2018 at 1:15 PM Harshvardhan Agrawal <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Well ideally, I actually made the example a little easy. In the
>>>>>>> actual example I have multiple reference datasets. Say, I have a tuple 
>>>>>>> of
>>>>>>> Account and Product as the key. The reason we don’t do the lookup in the
>>>>>>> DoFn directly is that we don’t want to lookup the data for the same 
>>>>>>> account
>>>>>>> or same product multiple times across workers in a window.
>>>>>>>
>>>>>>> What I was thinking was that it might be better to perform the
>>>>>>> lookup only once for each account and product in a window and then 
>>>>>>> supply
>>>>>>> them as side inputs to the main input.
>>>>>>>
>>>>>>> On Tue, May 15, 2018 at 16:03 Lukasz Cwik <[email protected]> wrote:
>>>>>>>
>>>>>>>> Is there a reason you don't want to read the accounting information
>>>>>>>> within the DoFn directly from the datastore, it seems like that would 
>>>>>>>> be
>>>>>>>> your simplest approach.
>>>>>>>>
>>>>>>>> On Tue, May 15, 2018 at 12:43 PM Harshvardhan Agrawal <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> No we don’t receive any such information from Kafka.
>>>>>>>>>
>>>>>>>>> The account information in the external store does change. Every
>>>>>>>>> time we have a change in the account information we will have to 
>>>>>>>>> recompute
>>>>>>>>> all the billing info. Our source systems will make sure that they 
>>>>>>>>> publish
>>>>>>>>> messages for those accounts again.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, May 15, 2018 at 15:11 Lukasz Cwik <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> For each BillingModel you receive over Kafka, how "fresh" should
>>>>>>>>>> the account information be?
>>>>>>>>>> Does the account information in the external store change?
>>>>>>>>>>
>>>>>>>>>> On Tue, May 15, 2018 at 11:22 AM Harshvardhan Agrawal <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> We have certain billing data that arrives to us from Kafka. The
>>>>>>>>>>> billing data is in json and it contains an account ID. In order for 
>>>>>>>>>>> us to
>>>>>>>>>>> generate the final report we need to use some account data 
>>>>>>>>>>> associated with
>>>>>>>>>>> the account id and is stored in an external database.
>>>>>>>>>>>
>>>>>>>>>>> It is possible that we get multiple billing info messages for
>>>>>>>>>>> the same account. We want to be able to lookup the account 
>>>>>>>>>>> information for
>>>>>>>>>>> the messages in a window and then supply that as a side input to 
>>>>>>>>>>> the next
>>>>>>>>>>> PTransform.
>>>>>>>>>>>
>>>>>>>>>>> Is it possible to achieve that in Beam?
>>>>>>>>>>>
>>>>>>>>>>> Here is my attempt:
>>>>>>>>>>>
>>>>>>>>>>>     PCollection<KV<Integer, BillingModel>> billingDataPairs =
>>>>>>>>>>> p.apply("ReadBillingInfo", KafkaIO.<String, String>read()
>>>>>>>>>>>      .withBootstrapServers(KAFKA_BOOTSTRAP_SERVER)
>>>>>>>>>>>      .withTopic(KAFKA_TOPIC)
>>>>>>>>>>>      .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>>      .withValueDeserializer(StringDeserializer.class)
>>>>>>>>>>>      )
>>>>>>>>>>>      .apply("Window",
>>>>>>>>>>> Window.into(FixedWindows.of(Duration.standardSeconds(30))))
>>>>>>>>>>>      .apply("ProcessKafkaMessages",new KafkaProcessor());
>>>>>>>>>>>
>>>>>>>>>>>      PCollection<KV<Integer, Iterable<BillingModel>> billingData
>>>>>>>>>>> = billingDataPairs.apply(GroupByKey.<Integer, 
>>>>>>>>>>> BillingModel>create());
>>>>>>>>>>>
>>>>>>>>>>>      PCollectionView<Map<Integer, Account>> accountData =
>>>>>>>>>>> billingDataPairs.apply("LookupAccounts",new
>>>>>>>>>>> AccountLookupClient()).apply(View.asMap());
>>>>>>>>>>>
>>>>>>>>>>>     billingDataPairs.apply(ParDo.of(new DoFn<KV<Integer,
>>>>>>>>>>> BillingModel>>(){
>>>>>>>>>>>     @ProcessElement
>>>>>>>>>>>     public void processElement(ProcessContext ctx) {
>>>>>>>>>>>     Integer accountId = ctx.element().getKey();
>>>>>>>>>>>     Iterable<BillingModel> billingModel =
>>>>>>>>>>> ctx.element().getValue();
>>>>>>>>>>>     Account account = ctx.sideinput(accountData).get(accountId);
>>>>>>>>>>>     }
>>>>>>>>>>>     }));
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Harsh
>>>>>>>>>>>
>>>>>>>>>> --

*Regards,Harshvardhan Agrawal*
*267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*

Reply via email to