Yes, that is correct.

On Tue, May 15, 2018 at 4:40 PM Harshvardhan Agrawal <
[email protected]> wrote:

> Got it.
>
> Since I am not applying any windowing strategy to the side input, does
> beam automatically pickup the windowing strategy for the side inputs from
> the main input? By that I mean the scope of the side input would be a per
> window one and it would be different for every window. Is that correct?
>
> Regards,
> Harsh
>
> On Tue, May 15, 2018 at 17:54 Lukasz Cwik <[email protected]> wrote:
>
>> Using deduplicate + side inputs will allow you to have a consistent view
>> of the account information for the entire window which can be nice since it
>> gives consistent processing semantics but using a simple in memory cache to
>> reduce the amount of lookups will likely be much easier to debug and
>> simpler to implement and maintain.
>>
>> On Tue, May 15, 2018 at 2:31 PM Harshvardhan Agrawal <
>> [email protected]> wrote:
>>
>>> Thanks Raghu!
>>>
>>> Lukasz,
>>>
>>> Do you think lookups would be a better option than side inputs in my
>>> case?
>>>
>>>
>>> On Tue, May 15, 2018 at 16:33 Raghu Angadi <[email protected]> wrote:
>>>
>>>> It should work. I think you need apply Distinct before looking up
>>>> account info :
>>>> billingDataPairs.apply(Keys.create()).apply(Distinct.create()).apply("LookupAccounts",
>>>> ...).
>>>> Note that all of the accounts are stored in single in-memory map. It
>>>> should be small enough for that.
>>>>
>>>> On Tue, May 15, 2018 at 1:15 PM Harshvardhan Agrawal <
>>>> [email protected]> wrote:
>>>>
>>>>> Well ideally, I actually made the example a little easy. In the actual
>>>>> example I have multiple reference datasets. Say, I have a tuple of Account
>>>>> and Product as the key. The reason we don’t do the lookup in the DoFn
>>>>> directly is that we don’t want to lookup the data for the same account or
>>>>> same product multiple times across workers in a window.
>>>>>
>>>>> What I was thinking was that it might be better to perform the lookup
>>>>> only once for each account and product in a window and then supply them as
>>>>> side inputs to the main input.
>>>>>
>>>>> On Tue, May 15, 2018 at 16:03 Lukasz Cwik <[email protected]> wrote:
>>>>>
>>>>>> Is there a reason you don't want to read the accounting information
>>>>>> within the DoFn directly from the datastore, it seems like that would be
>>>>>> your simplest approach.
>>>>>>
>>>>>> On Tue, May 15, 2018 at 12:43 PM Harshvardhan Agrawal <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> No we don’t receive any such information from Kafka.
>>>>>>>
>>>>>>> The account information in the external store does change. Every
>>>>>>> time we have a change in the account information we will have to 
>>>>>>> recompute
>>>>>>> all the billing info. Our source systems will make sure that they 
>>>>>>> publish
>>>>>>> messages for those accounts again.
>>>>>>>
>>>>>>>
>>>>>>> On Tue, May 15, 2018 at 15:11 Lukasz Cwik <[email protected]> wrote:
>>>>>>>
>>>>>>>> For each BillingModel you receive over Kafka, how "fresh" should
>>>>>>>> the account information be?
>>>>>>>> Does the account information in the external store change?
>>>>>>>>
>>>>>>>> On Tue, May 15, 2018 at 11:22 AM Harshvardhan Agrawal <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> We have certain billing data that arrives to us from Kafka. The
>>>>>>>>> billing data is in json and it contains an account ID. In order for 
>>>>>>>>> us to
>>>>>>>>> generate the final report we need to use some account data associated 
>>>>>>>>> with
>>>>>>>>> the account id and is stored in an external database.
>>>>>>>>>
>>>>>>>>> It is possible that we get multiple billing info messages for the
>>>>>>>>> same account. We want to be able to lookup the account information 
>>>>>>>>> for the
>>>>>>>>> messages in a window and then supply that as a side input to the next
>>>>>>>>> PTransform.
>>>>>>>>>
>>>>>>>>> Is it possible to achieve that in Beam?
>>>>>>>>>
>>>>>>>>> Here is my attempt:
>>>>>>>>>
>>>>>>>>>     PCollection<KV<Integer, BillingModel>> billingDataPairs =
>>>>>>>>> p.apply("ReadBillingInfo", KafkaIO.<String, String>read()
>>>>>>>>>      .withBootstrapServers(KAFKA_BOOTSTRAP_SERVER)
>>>>>>>>>      .withTopic(KAFKA_TOPIC)
>>>>>>>>>      .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>      .withValueDeserializer(StringDeserializer.class)
>>>>>>>>>      )
>>>>>>>>>      .apply("Window",
>>>>>>>>> Window.into(FixedWindows.of(Duration.standardSeconds(30))))
>>>>>>>>>      .apply("ProcessKafkaMessages",new KafkaProcessor());
>>>>>>>>>
>>>>>>>>>      PCollection<KV<Integer, Iterable<BillingModel>> billingData =
>>>>>>>>> billingDataPairs.apply(GroupByKey.<Integer, BillingModel>create());
>>>>>>>>>
>>>>>>>>>      PCollectionView<Map<Integer, Account>> accountData =
>>>>>>>>> billingDataPairs.apply("LookupAccounts",new
>>>>>>>>> AccountLookupClient()).apply(View.asMap());
>>>>>>>>>
>>>>>>>>>     billingDataPairs.apply(ParDo.of(new DoFn<KV<Integer,
>>>>>>>>> BillingModel>>(){
>>>>>>>>>     @ProcessElement
>>>>>>>>>     public void processElement(ProcessContext ctx) {
>>>>>>>>>     Integer accountId = ctx.element().getKey();
>>>>>>>>>     Iterable<BillingModel> billingModel =
>>>>>>>>> ctx.element().getValue();
>>>>>>>>>     Account account = ctx.sideinput(accountData).get(accountId);
>>>>>>>>>     }
>>>>>>>>>     }));
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Harsh
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> *Regards,Harshvardhan Agrawal*
>>>>>>>>> *267.991.6618 | LinkedIn
>>>>>>>>> <https://www.linkedin.com/in/harshvardhanagr/>*
>>>>>>>>>
>>>>>>>> --
>>>>>>>
>>>>>>> *Regards,Harshvardhan Agrawal*
>>>>>>> *267.991.6618 | LinkedIn
>>>>>>> <https://www.linkedin.com/in/harshvardhanagr/>*
>>>>>>>
>>>>>> --
>>>>>
>>>>> *Regards,Harshvardhan Agrawal*
>>>>> *267.991.6618 | LinkedIn
>>>>> <https://www.linkedin.com/in/harshvardhanagr/>*
>>>>>
>>>> --
>>>
>>> *Regards,Harshvardhan Agrawal*
>>> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*
>>>
>> --
>
> *Regards,Harshvardhan Agrawal*
> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*
>

Reply via email to