Makes sense.

On Tue, May 15, 2018 at 7:22 PM Harshvardhan Agrawal <
[email protected]> wrote:

> It’s more like I have a window and I create a side input for that window.
> Once I am done with processing the window I want to discard that side input
> and create a new ones for subsequent windows.
>
> On Tue, May 15, 2018 at 21:54 Harshvardhan Agrawal <
> [email protected]> wrote:
>
>> In my case since I am performing a lookup for some reference data that
>> can change periodically I can’t really have a global window. I would want
>> to update/re-lookup data per window.
>>
>> On Tue, May 15, 2018 at 21:06 Raghu Angadi <[email protected]> wrote:
>>
>>> You are applying windowing to 'billingDataPairs' in the example above.
>>> Side input pairs with all the main input windows that exactly match or
>>> completely fall within the side input window. Common use case is a side
>>> input defined in default global window and it matches all the main input
>>> windows.
>>>
>>>
>>>
>>> On Tue, May 15, 2018 at 4:40 PM Harshvardhan Agrawal <
>>> [email protected]> wrote:
>>>
>>>> Got it.
>>>>
>>>> Since I am not applying any windowing strategy to the side input, does
>>>> beam automatically pickup the windowing strategy for the side inputs from
>>>> the main input? By that I mean the scope of the side input would be a per
>>>> window one and it would be different for every window. Is that correct?
>>>>
>>>> Regards,
>>>> Harsh
>>>>
>>>> On Tue, May 15, 2018 at 17:54 Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> Using deduplicate + side inputs will allow you to have a consistent
>>>>> view of the account information for the entire window which can be nice
>>>>> since it gives consistent processing semantics but using a simple in 
>>>>> memory
>>>>> cache to reduce the amount of lookups will likely be much easier to debug
>>>>> and simpler to implement and maintain.
>>>>>
>>>>> On Tue, May 15, 2018 at 2:31 PM Harshvardhan Agrawal <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Thanks Raghu!
>>>>>>
>>>>>> Lukasz,
>>>>>>
>>>>>> Do you think lookups would be a better option than side inputs in my
>>>>>> case?
>>>>>>
>>>>>>
>>>>>> On Tue, May 15, 2018 at 16:33 Raghu Angadi <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> It should work. I think you need apply Distinct before looking up
>>>>>>> account info :
>>>>>>> billingDataPairs.apply(Keys.create()).apply(Distinct.create()).apply("LookupAccounts",
>>>>>>> ...).
>>>>>>> Note that all of the accounts are stored in single in-memory map. It
>>>>>>> should be small enough for that.
>>>>>>>
>>>>>>> On Tue, May 15, 2018 at 1:15 PM Harshvardhan Agrawal <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Well ideally, I actually made the example a little easy. In the
>>>>>>>> actual example I have multiple reference datasets. Say, I have a tuple 
>>>>>>>> of
>>>>>>>> Account and Product as the key. The reason we don’t do the lookup in 
>>>>>>>> the
>>>>>>>> DoFn directly is that we don’t want to lookup the data for the same 
>>>>>>>> account
>>>>>>>> or same product multiple times across workers in a window.
>>>>>>>>
>>>>>>>> What I was thinking was that it might be better to perform the
>>>>>>>> lookup only once for each account and product in a window and then 
>>>>>>>> supply
>>>>>>>> them as side inputs to the main input.
>>>>>>>>
>>>>>>>> On Tue, May 15, 2018 at 16:03 Lukasz Cwik <[email protected]> wrote:
>>>>>>>>
>>>>>>>>> Is there a reason you don't want to read the accounting
>>>>>>>>> information within the DoFn directly from the datastore, it seems 
>>>>>>>>> like that
>>>>>>>>> would be your simplest approach.
>>>>>>>>>
>>>>>>>>> On Tue, May 15, 2018 at 12:43 PM Harshvardhan Agrawal <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> No we don’t receive any such information from Kafka.
>>>>>>>>>>
>>>>>>>>>> The account information in the external store does change. Every
>>>>>>>>>> time we have a change in the account information we will have to 
>>>>>>>>>> recompute
>>>>>>>>>> all the billing info. Our source systems will make sure that they 
>>>>>>>>>> publish
>>>>>>>>>> messages for those accounts again.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, May 15, 2018 at 15:11 Lukasz Cwik <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> For each BillingModel you receive over Kafka, how "fresh" should
>>>>>>>>>>> the account information be?
>>>>>>>>>>> Does the account information in the external store change?
>>>>>>>>>>>
>>>>>>>>>>> On Tue, May 15, 2018 at 11:22 AM Harshvardhan Agrawal <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> We have certain billing data that arrives to us from Kafka. The
>>>>>>>>>>>> billing data is in json and it contains an account ID. In order 
>>>>>>>>>>>> for us to
>>>>>>>>>>>> generate the final report we need to use some account data 
>>>>>>>>>>>> associated with
>>>>>>>>>>>> the account id and is stored in an external database.
>>>>>>>>>>>>
>>>>>>>>>>>> It is possible that we get multiple billing info messages for
>>>>>>>>>>>> the same account. We want to be able to lookup the account 
>>>>>>>>>>>> information for
>>>>>>>>>>>> the messages in a window and then supply that as a side input to 
>>>>>>>>>>>> the next
>>>>>>>>>>>> PTransform.
>>>>>>>>>>>>
>>>>>>>>>>>> Is it possible to achieve that in Beam?
>>>>>>>>>>>>
>>>>>>>>>>>> Here is my attempt:
>>>>>>>>>>>>
>>>>>>>>>>>>     PCollection<KV<Integer, BillingModel>> billingDataPairs =
>>>>>>>>>>>> p.apply("ReadBillingInfo", KafkaIO.<String, String>read()
>>>>>>>>>>>>      .withBootstrapServers(KAFKA_BOOTSTRAP_SERVER)
>>>>>>>>>>>>      .withTopic(KAFKA_TOPIC)
>>>>>>>>>>>>      .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>>>      .withValueDeserializer(StringDeserializer.class)
>>>>>>>>>>>>      )
>>>>>>>>>>>>      .apply("Window",
>>>>>>>>>>>> Window.into(FixedWindows.of(Duration.standardSeconds(30))))
>>>>>>>>>>>>      .apply("ProcessKafkaMessages",new KafkaProcessor());
>>>>>>>>>>>>
>>>>>>>>>>>>      PCollection<KV<Integer, Iterable<BillingModel>>
>>>>>>>>>>>> billingData = billingDataPairs.apply(GroupByKey.<Integer,
>>>>>>>>>>>> BillingModel>create());
>>>>>>>>>>>>
>>>>>>>>>>>>      PCollectionView<Map<Integer, Account>> accountData =
>>>>>>>>>>>> billingDataPairs.apply("LookupAccounts",new
>>>>>>>>>>>> AccountLookupClient()).apply(View.asMap());
>>>>>>>>>>>>
>>>>>>>>>>>>     billingDataPairs.apply(ParDo.of(new DoFn<KV<Integer,
>>>>>>>>>>>> BillingModel>>(){
>>>>>>>>>>>>     @ProcessElement
>>>>>>>>>>>>     public void processElement(ProcessContext ctx) {
>>>>>>>>>>>>     Integer accountId = ctx.element().getKey();
>>>>>>>>>>>>     Iterable<BillingModel> billingModel =
>>>>>>>>>>>> ctx.element().getValue();
>>>>>>>>>>>>     Account account =
>>>>>>>>>>>> ctx.sideinput(accountData).get(accountId);
>>>>>>>>>>>>     }
>>>>>>>>>>>>     }));
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Harsh
>>>>>>>>>>>>
>>>>>>>>>>> --
>
> *Regards,Harshvardhan Agrawal*
> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*
>

Reply via email to