Thanks Raghu! Lukasz,
Do you think lookups would be a better option than side inputs in my case? On Tue, May 15, 2018 at 16:33 Raghu Angadi <[email protected]> wrote: > It should work. I think you need apply Distinct before looking up account > info : > billingDataPairs.apply(Keys.create()).apply(Distinct.create()).apply("LookupAccounts", > ...). > Note that all of the accounts are stored in single in-memory map. It > should be small enough for that. > > On Tue, May 15, 2018 at 1:15 PM Harshvardhan Agrawal < > [email protected]> wrote: > >> Well ideally, I actually made the example a little easy. In the actual >> example I have multiple reference datasets. Say, I have a tuple of Account >> and Product as the key. The reason we don’t do the lookup in the DoFn >> directly is that we don’t want to lookup the data for the same account or >> same product multiple times across workers in a window. >> >> What I was thinking was that it might be better to perform the lookup >> only once for each account and product in a window and then supply them as >> side inputs to the main input. >> >> On Tue, May 15, 2018 at 16:03 Lukasz Cwik <[email protected]> wrote: >> >>> Is there a reason you don't want to read the accounting information >>> within the DoFn directly from the datastore, it seems like that would be >>> your simplest approach. >>> >>> On Tue, May 15, 2018 at 12:43 PM Harshvardhan Agrawal < >>> [email protected]> wrote: >>> >>>> Hi, >>>> >>>> No we don’t receive any such information from Kafka. >>>> >>>> The account information in the external store does change. Every time >>>> we have a change in the account information we will have to recompute all >>>> the billing info. Our source systems will make sure that they publish >>>> messages for those accounts again. >>>> >>>> >>>> On Tue, May 15, 2018 at 15:11 Lukasz Cwik <[email protected]> wrote: >>>> >>>>> For each BillingModel you receive over Kafka, how "fresh" should the >>>>> account information be? >>>>> Does the account information in the external store change? >>>>> >>>>> On Tue, May 15, 2018 at 11:22 AM Harshvardhan Agrawal < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> We have certain billing data that arrives to us from Kafka. The >>>>>> billing data is in json and it contains an account ID. In order for us to >>>>>> generate the final report we need to use some account data associated >>>>>> with >>>>>> the account id and is stored in an external database. >>>>>> >>>>>> It is possible that we get multiple billing info messages for the >>>>>> same account. We want to be able to lookup the account information for >>>>>> the >>>>>> messages in a window and then supply that as a side input to the next >>>>>> PTransform. >>>>>> >>>>>> Is it possible to achieve that in Beam? >>>>>> >>>>>> Here is my attempt: >>>>>> >>>>>> PCollection<KV<Integer, BillingModel>> billingDataPairs = >>>>>> p.apply("ReadBillingInfo", KafkaIO.<String, String>read() >>>>>> .withBootstrapServers(KAFKA_BOOTSTRAP_SERVER) >>>>>> .withTopic(KAFKA_TOPIC) >>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>> .withValueDeserializer(StringDeserializer.class) >>>>>> ) >>>>>> .apply("Window", >>>>>> Window.into(FixedWindows.of(Duration.standardSeconds(30)))) >>>>>> .apply("ProcessKafkaMessages",new KafkaProcessor()); >>>>>> >>>>>> PCollection<KV<Integer, Iterable<BillingModel>> billingData = >>>>>> billingDataPairs.apply(GroupByKey.<Integer, BillingModel>create()); >>>>>> >>>>>> PCollectionView<Map<Integer, Account>> accountData = >>>>>> billingDataPairs.apply("LookupAccounts",new >>>>>> AccountLookupClient()).apply(View.asMap()); >>>>>> >>>>>> billingDataPairs.apply(ParDo.of(new DoFn<KV<Integer, >>>>>> BillingModel>>(){ >>>>>> @ProcessElement >>>>>> public void processElement(ProcessContext ctx) { >>>>>> Integer accountId = ctx.element().getKey(); >>>>>> Iterable<BillingModel> billingModel = ctx.element().getValue(); >>>>>> Account account = ctx.sideinput(accountData).get(accountId); >>>>>> } >>>>>> })); >>>>>> >>>>>> Regards, >>>>>> Harsh >>>>>> -- >>>>>> >>>>>> *Regards,Harshvardhan Agrawal* >>>>>> *267.991.6618 | LinkedIn >>>>>> <https://www.linkedin.com/in/harshvardhanagr/>* >>>>>> >>>>> -- >>>> >>>> *Regards,Harshvardhan Agrawal* >>>> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>* >>>> >>> -- >> >> *Regards,Harshvardhan Agrawal* >> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>* >> > -- *Regards,Harshvardhan Agrawal* *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*
