It’s more like I have a window and I create a side input for that window. Once I am done with processing the window I want to discard that side input and create a new ones for subsequent windows.
On Tue, May 15, 2018 at 21:54 Harshvardhan Agrawal < [email protected]> wrote: > In my case since I am performing a lookup for some reference data that can > change periodically I can’t really have a global window. I would want to > update/re-lookup data per window. > > On Tue, May 15, 2018 at 21:06 Raghu Angadi <[email protected]> wrote: > >> You are applying windowing to 'billingDataPairs' in the example above. >> Side input pairs with all the main input windows that exactly match or >> completely fall within the side input window. Common use case is a side >> input defined in default global window and it matches all the main input >> windows. >> >> >> >> On Tue, May 15, 2018 at 4:40 PM Harshvardhan Agrawal < >> [email protected]> wrote: >> >>> Got it. >>> >>> Since I am not applying any windowing strategy to the side input, does >>> beam automatically pickup the windowing strategy for the side inputs from >>> the main input? By that I mean the scope of the side input would be a per >>> window one and it would be different for every window. Is that correct? >>> >>> Regards, >>> Harsh >>> >>> On Tue, May 15, 2018 at 17:54 Lukasz Cwik <[email protected]> wrote: >>> >>>> Using deduplicate + side inputs will allow you to have a consistent >>>> view of the account information for the entire window which can be nice >>>> since it gives consistent processing semantics but using a simple in memory >>>> cache to reduce the amount of lookups will likely be much easier to debug >>>> and simpler to implement and maintain. >>>> >>>> On Tue, May 15, 2018 at 2:31 PM Harshvardhan Agrawal < >>>> [email protected]> wrote: >>>> >>>>> Thanks Raghu! >>>>> >>>>> Lukasz, >>>>> >>>>> Do you think lookups would be a better option than side inputs in my >>>>> case? >>>>> >>>>> >>>>> On Tue, May 15, 2018 at 16:33 Raghu Angadi <[email protected]> wrote: >>>>> >>>>>> It should work. I think you need apply Distinct before looking up >>>>>> account info : >>>>>> billingDataPairs.apply(Keys.create()).apply(Distinct.create()).apply("LookupAccounts", >>>>>> ...). >>>>>> Note that all of the accounts are stored in single in-memory map. It >>>>>> should be small enough for that. >>>>>> >>>>>> On Tue, May 15, 2018 at 1:15 PM Harshvardhan Agrawal < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Well ideally, I actually made the example a little easy. In the >>>>>>> actual example I have multiple reference datasets. Say, I have a tuple >>>>>>> of >>>>>>> Account and Product as the key. The reason we don’t do the lookup in the >>>>>>> DoFn directly is that we don’t want to lookup the data for the same >>>>>>> account >>>>>>> or same product multiple times across workers in a window. >>>>>>> >>>>>>> What I was thinking was that it might be better to perform the >>>>>>> lookup only once for each account and product in a window and then >>>>>>> supply >>>>>>> them as side inputs to the main input. >>>>>>> >>>>>>> On Tue, May 15, 2018 at 16:03 Lukasz Cwik <[email protected]> wrote: >>>>>>> >>>>>>>> Is there a reason you don't want to read the accounting information >>>>>>>> within the DoFn directly from the datastore, it seems like that would >>>>>>>> be >>>>>>>> your simplest approach. >>>>>>>> >>>>>>>> On Tue, May 15, 2018 at 12:43 PM Harshvardhan Agrawal < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> No we don’t receive any such information from Kafka. >>>>>>>>> >>>>>>>>> The account information in the external store does change. Every >>>>>>>>> time we have a change in the account information we will have to >>>>>>>>> recompute >>>>>>>>> all the billing info. Our source systems will make sure that they >>>>>>>>> publish >>>>>>>>> messages for those accounts again. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, May 15, 2018 at 15:11 Lukasz Cwik <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> For each BillingModel you receive over Kafka, how "fresh" should >>>>>>>>>> the account information be? >>>>>>>>>> Does the account information in the external store change? >>>>>>>>>> >>>>>>>>>> On Tue, May 15, 2018 at 11:22 AM Harshvardhan Agrawal < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> We have certain billing data that arrives to us from Kafka. The >>>>>>>>>>> billing data is in json and it contains an account ID. In order for >>>>>>>>>>> us to >>>>>>>>>>> generate the final report we need to use some account data >>>>>>>>>>> associated with >>>>>>>>>>> the account id and is stored in an external database. >>>>>>>>>>> >>>>>>>>>>> It is possible that we get multiple billing info messages for >>>>>>>>>>> the same account. We want to be able to lookup the account >>>>>>>>>>> information for >>>>>>>>>>> the messages in a window and then supply that as a side input to >>>>>>>>>>> the next >>>>>>>>>>> PTransform. >>>>>>>>>>> >>>>>>>>>>> Is it possible to achieve that in Beam? >>>>>>>>>>> >>>>>>>>>>> Here is my attempt: >>>>>>>>>>> >>>>>>>>>>> PCollection<KV<Integer, BillingModel>> billingDataPairs = >>>>>>>>>>> p.apply("ReadBillingInfo", KafkaIO.<String, String>read() >>>>>>>>>>> .withBootstrapServers(KAFKA_BOOTSTRAP_SERVER) >>>>>>>>>>> .withTopic(KAFKA_TOPIC) >>>>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>>>> .withValueDeserializer(StringDeserializer.class) >>>>>>>>>>> ) >>>>>>>>>>> .apply("Window", >>>>>>>>>>> Window.into(FixedWindows.of(Duration.standardSeconds(30)))) >>>>>>>>>>> .apply("ProcessKafkaMessages",new KafkaProcessor()); >>>>>>>>>>> >>>>>>>>>>> PCollection<KV<Integer, Iterable<BillingModel>> billingData >>>>>>>>>>> = billingDataPairs.apply(GroupByKey.<Integer, >>>>>>>>>>> BillingModel>create()); >>>>>>>>>>> >>>>>>>>>>> PCollectionView<Map<Integer, Account>> accountData = >>>>>>>>>>> billingDataPairs.apply("LookupAccounts",new >>>>>>>>>>> AccountLookupClient()).apply(View.asMap()); >>>>>>>>>>> >>>>>>>>>>> billingDataPairs.apply(ParDo.of(new DoFn<KV<Integer, >>>>>>>>>>> BillingModel>>(){ >>>>>>>>>>> @ProcessElement >>>>>>>>>>> public void processElement(ProcessContext ctx) { >>>>>>>>>>> Integer accountId = ctx.element().getKey(); >>>>>>>>>>> Iterable<BillingModel> billingModel = >>>>>>>>>>> ctx.element().getValue(); >>>>>>>>>>> Account account = ctx.sideinput(accountData).get(accountId); >>>>>>>>>>> } >>>>>>>>>>> })); >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> Harsh >>>>>>>>>>> >>>>>>>>>> -- *Regards,Harshvardhan Agrawal* *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*
