Yes, that is correct. On Tue, May 15, 2018 at 4:40 PM Harshvardhan Agrawal < [email protected]> wrote:
> Got it. > > Since I am not applying any windowing strategy to the side input, does > beam automatically pickup the windowing strategy for the side inputs from > the main input? By that I mean the scope of the side input would be a per > window one and it would be different for every window. Is that correct? > > Regards, > Harsh > > On Tue, May 15, 2018 at 17:54 Lukasz Cwik <[email protected]> wrote: > >> Using deduplicate + side inputs will allow you to have a consistent view >> of the account information for the entire window which can be nice since it >> gives consistent processing semantics but using a simple in memory cache to >> reduce the amount of lookups will likely be much easier to debug and >> simpler to implement and maintain. >> >> On Tue, May 15, 2018 at 2:31 PM Harshvardhan Agrawal < >> [email protected]> wrote: >> >>> Thanks Raghu! >>> >>> Lukasz, >>> >>> Do you think lookups would be a better option than side inputs in my >>> case? >>> >>> >>> On Tue, May 15, 2018 at 16:33 Raghu Angadi <[email protected]> wrote: >>> >>>> It should work. I think you need apply Distinct before looking up >>>> account info : >>>> billingDataPairs.apply(Keys.create()).apply(Distinct.create()).apply("LookupAccounts", >>>> ...). >>>> Note that all of the accounts are stored in single in-memory map. It >>>> should be small enough for that. >>>> >>>> On Tue, May 15, 2018 at 1:15 PM Harshvardhan Agrawal < >>>> [email protected]> wrote: >>>> >>>>> Well ideally, I actually made the example a little easy. In the actual >>>>> example I have multiple reference datasets. Say, I have a tuple of Account >>>>> and Product as the key. The reason we don’t do the lookup in the DoFn >>>>> directly is that we don’t want to lookup the data for the same account or >>>>> same product multiple times across workers in a window. >>>>> >>>>> What I was thinking was that it might be better to perform the lookup >>>>> only once for each account and product in a window and then supply them as >>>>> side inputs to the main input. >>>>> >>>>> On Tue, May 15, 2018 at 16:03 Lukasz Cwik <[email protected]> wrote: >>>>> >>>>>> Is there a reason you don't want to read the accounting information >>>>>> within the DoFn directly from the datastore, it seems like that would be >>>>>> your simplest approach. >>>>>> >>>>>> On Tue, May 15, 2018 at 12:43 PM Harshvardhan Agrawal < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> No we don’t receive any such information from Kafka. >>>>>>> >>>>>>> The account information in the external store does change. Every >>>>>>> time we have a change in the account information we will have to >>>>>>> recompute >>>>>>> all the billing info. Our source systems will make sure that they >>>>>>> publish >>>>>>> messages for those accounts again. >>>>>>> >>>>>>> >>>>>>> On Tue, May 15, 2018 at 15:11 Lukasz Cwik <[email protected]> wrote: >>>>>>> >>>>>>>> For each BillingModel you receive over Kafka, how "fresh" should >>>>>>>> the account information be? >>>>>>>> Does the account information in the external store change? >>>>>>>> >>>>>>>> On Tue, May 15, 2018 at 11:22 AM Harshvardhan Agrawal < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> We have certain billing data that arrives to us from Kafka. The >>>>>>>>> billing data is in json and it contains an account ID. In order for >>>>>>>>> us to >>>>>>>>> generate the final report we need to use some account data associated >>>>>>>>> with >>>>>>>>> the account id and is stored in an external database. >>>>>>>>> >>>>>>>>> It is possible that we get multiple billing info messages for the >>>>>>>>> same account. We want to be able to lookup the account information >>>>>>>>> for the >>>>>>>>> messages in a window and then supply that as a side input to the next >>>>>>>>> PTransform. >>>>>>>>> >>>>>>>>> Is it possible to achieve that in Beam? >>>>>>>>> >>>>>>>>> Here is my attempt: >>>>>>>>> >>>>>>>>> PCollection<KV<Integer, BillingModel>> billingDataPairs = >>>>>>>>> p.apply("ReadBillingInfo", KafkaIO.<String, String>read() >>>>>>>>> .withBootstrapServers(KAFKA_BOOTSTRAP_SERVER) >>>>>>>>> .withTopic(KAFKA_TOPIC) >>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>> .withValueDeserializer(StringDeserializer.class) >>>>>>>>> ) >>>>>>>>> .apply("Window", >>>>>>>>> Window.into(FixedWindows.of(Duration.standardSeconds(30)))) >>>>>>>>> .apply("ProcessKafkaMessages",new KafkaProcessor()); >>>>>>>>> >>>>>>>>> PCollection<KV<Integer, Iterable<BillingModel>> billingData = >>>>>>>>> billingDataPairs.apply(GroupByKey.<Integer, BillingModel>create()); >>>>>>>>> >>>>>>>>> PCollectionView<Map<Integer, Account>> accountData = >>>>>>>>> billingDataPairs.apply("LookupAccounts",new >>>>>>>>> AccountLookupClient()).apply(View.asMap()); >>>>>>>>> >>>>>>>>> billingDataPairs.apply(ParDo.of(new DoFn<KV<Integer, >>>>>>>>> BillingModel>>(){ >>>>>>>>> @ProcessElement >>>>>>>>> public void processElement(ProcessContext ctx) { >>>>>>>>> Integer accountId = ctx.element().getKey(); >>>>>>>>> Iterable<BillingModel> billingModel = >>>>>>>>> ctx.element().getValue(); >>>>>>>>> Account account = ctx.sideinput(accountData).get(accountId); >>>>>>>>> } >>>>>>>>> })); >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Harsh >>>>>>>>> -- >>>>>>>>> >>>>>>>>> *Regards,Harshvardhan Agrawal* >>>>>>>>> *267.991.6618 | LinkedIn >>>>>>>>> <https://www.linkedin.com/in/harshvardhanagr/>* >>>>>>>>> >>>>>>>> -- >>>>>>> >>>>>>> *Regards,Harshvardhan Agrawal* >>>>>>> *267.991.6618 | LinkedIn >>>>>>> <https://www.linkedin.com/in/harshvardhanagr/>* >>>>>>> >>>>>> -- >>>>> >>>>> *Regards,Harshvardhan Agrawal* >>>>> *267.991.6618 | LinkedIn >>>>> <https://www.linkedin.com/in/harshvardhanagr/>* >>>>> >>>> -- >>> >>> *Regards,Harshvardhan Agrawal* >>> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>* >>> >> -- > > *Regards,Harshvardhan Agrawal* > *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>* >
