Thanks Raghu!

Lukasz,

Do you think lookups would be a better option than side inputs in my case?


On Tue, May 15, 2018 at 16:33 Raghu Angadi <[email protected]> wrote:

> It should work. I think you need apply Distinct before looking up account
> info :
> billingDataPairs.apply(Keys.create()).apply(Distinct.create()).apply("LookupAccounts",
> ...).
> Note that all of the accounts are stored in single in-memory map. It
> should be small enough for that.
>
> On Tue, May 15, 2018 at 1:15 PM Harshvardhan Agrawal <
> [email protected]> wrote:
>
>> Well ideally, I actually made the example a little easy. In the actual
>> example I have multiple reference datasets. Say, I have a tuple of Account
>> and Product as the key. The reason we don’t do the lookup in the DoFn
>> directly is that we don’t want to lookup the data for the same account or
>> same product multiple times across workers in a window.
>>
>> What I was thinking was that it might be better to perform the lookup
>> only once for each account and product in a window and then supply them as
>> side inputs to the main input.
>>
>> On Tue, May 15, 2018 at 16:03 Lukasz Cwik <[email protected]> wrote:
>>
>>> Is there a reason you don't want to read the accounting information
>>> within the DoFn directly from the datastore, it seems like that would be
>>> your simplest approach.
>>>
>>> On Tue, May 15, 2018 at 12:43 PM Harshvardhan Agrawal <
>>> [email protected]> wrote:
>>>
>>>> Hi,
>>>>
>>>> No we don’t receive any such information from Kafka.
>>>>
>>>> The account information in the external store does change. Every time
>>>> we have a change in the account information we will have to recompute all
>>>> the billing info. Our source systems will make sure that they publish
>>>> messages for those accounts again.
>>>>
>>>>
>>>> On Tue, May 15, 2018 at 15:11 Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> For each BillingModel you receive over Kafka, how "fresh" should the
>>>>> account information be?
>>>>> Does the account information in the external store change?
>>>>>
>>>>> On Tue, May 15, 2018 at 11:22 AM Harshvardhan Agrawal <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We have certain billing data that arrives to us from Kafka. The
>>>>>> billing data is in json and it contains an account ID. In order for us to
>>>>>> generate the final report we need to use some account data associated 
>>>>>> with
>>>>>> the account id and is stored in an external database.
>>>>>>
>>>>>> It is possible that we get multiple billing info messages for the
>>>>>> same account. We want to be able to lookup the account information for 
>>>>>> the
>>>>>> messages in a window and then supply that as a side input to the next
>>>>>> PTransform.
>>>>>>
>>>>>> Is it possible to achieve that in Beam?
>>>>>>
>>>>>> Here is my attempt:
>>>>>>
>>>>>>     PCollection<KV<Integer, BillingModel>> billingDataPairs =
>>>>>> p.apply("ReadBillingInfo", KafkaIO.<String, String>read()
>>>>>>      .withBootstrapServers(KAFKA_BOOTSTRAP_SERVER)
>>>>>>      .withTopic(KAFKA_TOPIC)
>>>>>>      .withKeyDeserializer(StringDeserializer.class)
>>>>>>      .withValueDeserializer(StringDeserializer.class)
>>>>>>      )
>>>>>>      .apply("Window",
>>>>>> Window.into(FixedWindows.of(Duration.standardSeconds(30))))
>>>>>>      .apply("ProcessKafkaMessages",new KafkaProcessor());
>>>>>>
>>>>>>      PCollection<KV<Integer, Iterable<BillingModel>> billingData =
>>>>>> billingDataPairs.apply(GroupByKey.<Integer, BillingModel>create());
>>>>>>
>>>>>>      PCollectionView<Map<Integer, Account>> accountData =
>>>>>> billingDataPairs.apply("LookupAccounts",new
>>>>>> AccountLookupClient()).apply(View.asMap());
>>>>>>
>>>>>>     billingDataPairs.apply(ParDo.of(new DoFn<KV<Integer,
>>>>>> BillingModel>>(){
>>>>>>     @ProcessElement
>>>>>>     public void processElement(ProcessContext ctx) {
>>>>>>     Integer accountId = ctx.element().getKey();
>>>>>>     Iterable<BillingModel> billingModel = ctx.element().getValue();
>>>>>>     Account account = ctx.sideinput(accountData).get(accountId);
>>>>>>     }
>>>>>>     }));
>>>>>>
>>>>>> Regards,
>>>>>> Harsh
>>>>>> --
>>>>>>
>>>>>> *Regards,Harshvardhan Agrawal*
>>>>>> *267.991.6618 | LinkedIn
>>>>>> <https://www.linkedin.com/in/harshvardhanagr/>*
>>>>>>
>>>>> --
>>>>
>>>> *Regards,Harshvardhan Agrawal*
>>>> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*
>>>>
>>> --
>>
>> *Regards,Harshvardhan Agrawal*
>> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*
>>
> --

*Regards,Harshvardhan Agrawal*
*267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*

Reply via email to