It should work. I think you need apply Distinct before looking up account
info :
billingDataPairs.apply(Keys.create()).apply(Distinct.create()).apply("LookupAccounts",
...).
Note that all of the accounts are stored in single in-memory map. It should
be small enough for that.On Tue, May 15, 2018 at 1:15 PM Harshvardhan Agrawal < [email protected]> wrote: > Well ideally, I actually made the example a little easy. In the actual > example I have multiple reference datasets. Say, I have a tuple of Account > and Product as the key. The reason we don’t do the lookup in the DoFn > directly is that we don’t want to lookup the data for the same account or > same product multiple times across workers in a window. > > What I was thinking was that it might be better to perform the lookup only > once for each account and product in a window and then supply them as side > inputs to the main input. > > On Tue, May 15, 2018 at 16:03 Lukasz Cwik <[email protected]> wrote: > >> Is there a reason you don't want to read the accounting information >> within the DoFn directly from the datastore, it seems like that would be >> your simplest approach. >> >> On Tue, May 15, 2018 at 12:43 PM Harshvardhan Agrawal < >> [email protected]> wrote: >> >>> Hi, >>> >>> No we don’t receive any such information from Kafka. >>> >>> The account information in the external store does change. Every time we >>> have a change in the account information we will have to recompute all the >>> billing info. Our source systems will make sure that they publish messages >>> for those accounts again. >>> >>> >>> On Tue, May 15, 2018 at 15:11 Lukasz Cwik <[email protected]> wrote: >>> >>>> For each BillingModel you receive over Kafka, how "fresh" should the >>>> account information be? >>>> Does the account information in the external store change? >>>> >>>> On Tue, May 15, 2018 at 11:22 AM Harshvardhan Agrawal < >>>> [email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> We have certain billing data that arrives to us from Kafka. The >>>>> billing data is in json and it contains an account ID. In order for us to >>>>> generate the final report we need to use some account data associated with >>>>> the account id and is stored in an external database. >>>>> >>>>> It is possible that we get multiple billing info messages for the same >>>>> account. We want to be able to lookup the account information for the >>>>> messages in a window and then supply that as a side input to the next >>>>> PTransform. >>>>> >>>>> Is it possible to achieve that in Beam? >>>>> >>>>> Here is my attempt: >>>>> >>>>> PCollection<KV<Integer, BillingModel>> billingDataPairs = >>>>> p.apply("ReadBillingInfo", KafkaIO.<String, String>read() >>>>> .withBootstrapServers(KAFKA_BOOTSTRAP_SERVER) >>>>> .withTopic(KAFKA_TOPIC) >>>>> .withKeyDeserializer(StringDeserializer.class) >>>>> .withValueDeserializer(StringDeserializer.class) >>>>> ) >>>>> .apply("Window", >>>>> Window.into(FixedWindows.of(Duration.standardSeconds(30)))) >>>>> .apply("ProcessKafkaMessages",new KafkaProcessor()); >>>>> >>>>> PCollection<KV<Integer, Iterable<BillingModel>> billingData = >>>>> billingDataPairs.apply(GroupByKey.<Integer, BillingModel>create()); >>>>> >>>>> PCollectionView<Map<Integer, Account>> accountData = >>>>> billingDataPairs.apply("LookupAccounts",new >>>>> AccountLookupClient()).apply(View.asMap()); >>>>> >>>>> billingDataPairs.apply(ParDo.of(new DoFn<KV<Integer, >>>>> BillingModel>>(){ >>>>> @ProcessElement >>>>> public void processElement(ProcessContext ctx) { >>>>> Integer accountId = ctx.element().getKey(); >>>>> Iterable<BillingModel> billingModel = ctx.element().getValue(); >>>>> Account account = ctx.sideinput(accountData).get(accountId); >>>>> } >>>>> })); >>>>> >>>>> Regards, >>>>> Harsh >>>>> -- >>>>> >>>>> *Regards,Harshvardhan Agrawal* >>>>> *267.991.6618 | LinkedIn >>>>> <https://www.linkedin.com/in/harshvardhanagr/>* >>>>> >>>> -- >>> >>> *Regards,Harshvardhan Agrawal* >>> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>* >>> >> -- > > *Regards,Harshvardhan Agrawal* > *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>* >
