Makes sense. On Tue, May 15, 2018 at 7:22 PM Harshvardhan Agrawal < [email protected]> wrote:
> It’s more like I have a window and I create a side input for that window. > Once I am done with processing the window I want to discard that side input > and create a new ones for subsequent windows. > > On Tue, May 15, 2018 at 21:54 Harshvardhan Agrawal < > [email protected]> wrote: > >> In my case since I am performing a lookup for some reference data that >> can change periodically I can’t really have a global window. I would want >> to update/re-lookup data per window. >> >> On Tue, May 15, 2018 at 21:06 Raghu Angadi <[email protected]> wrote: >> >>> You are applying windowing to 'billingDataPairs' in the example above. >>> Side input pairs with all the main input windows that exactly match or >>> completely fall within the side input window. Common use case is a side >>> input defined in default global window and it matches all the main input >>> windows. >>> >>> >>> >>> On Tue, May 15, 2018 at 4:40 PM Harshvardhan Agrawal < >>> [email protected]> wrote: >>> >>>> Got it. >>>> >>>> Since I am not applying any windowing strategy to the side input, does >>>> beam automatically pickup the windowing strategy for the side inputs from >>>> the main input? By that I mean the scope of the side input would be a per >>>> window one and it would be different for every window. Is that correct? >>>> >>>> Regards, >>>> Harsh >>>> >>>> On Tue, May 15, 2018 at 17:54 Lukasz Cwik <[email protected]> wrote: >>>> >>>>> Using deduplicate + side inputs will allow you to have a consistent >>>>> view of the account information for the entire window which can be nice >>>>> since it gives consistent processing semantics but using a simple in >>>>> memory >>>>> cache to reduce the amount of lookups will likely be much easier to debug >>>>> and simpler to implement and maintain. >>>>> >>>>> On Tue, May 15, 2018 at 2:31 PM Harshvardhan Agrawal < >>>>> [email protected]> wrote: >>>>> >>>>>> Thanks Raghu! >>>>>> >>>>>> Lukasz, >>>>>> >>>>>> Do you think lookups would be a better option than side inputs in my >>>>>> case? >>>>>> >>>>>> >>>>>> On Tue, May 15, 2018 at 16:33 Raghu Angadi <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> It should work. I think you need apply Distinct before looking up >>>>>>> account info : >>>>>>> billingDataPairs.apply(Keys.create()).apply(Distinct.create()).apply("LookupAccounts", >>>>>>> ...). >>>>>>> Note that all of the accounts are stored in single in-memory map. It >>>>>>> should be small enough for that. >>>>>>> >>>>>>> On Tue, May 15, 2018 at 1:15 PM Harshvardhan Agrawal < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Well ideally, I actually made the example a little easy. In the >>>>>>>> actual example I have multiple reference datasets. Say, I have a tuple >>>>>>>> of >>>>>>>> Account and Product as the key. The reason we don’t do the lookup in >>>>>>>> the >>>>>>>> DoFn directly is that we don’t want to lookup the data for the same >>>>>>>> account >>>>>>>> or same product multiple times across workers in a window. >>>>>>>> >>>>>>>> What I was thinking was that it might be better to perform the >>>>>>>> lookup only once for each account and product in a window and then >>>>>>>> supply >>>>>>>> them as side inputs to the main input. >>>>>>>> >>>>>>>> On Tue, May 15, 2018 at 16:03 Lukasz Cwik <[email protected]> wrote: >>>>>>>> >>>>>>>>> Is there a reason you don't want to read the accounting >>>>>>>>> information within the DoFn directly from the datastore, it seems >>>>>>>>> like that >>>>>>>>> would be your simplest approach. >>>>>>>>> >>>>>>>>> On Tue, May 15, 2018 at 12:43 PM Harshvardhan Agrawal < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> No we don’t receive any such information from Kafka. >>>>>>>>>> >>>>>>>>>> The account information in the external store does change. Every >>>>>>>>>> time we have a change in the account information we will have to >>>>>>>>>> recompute >>>>>>>>>> all the billing info. Our source systems will make sure that they >>>>>>>>>> publish >>>>>>>>>> messages for those accounts again. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, May 15, 2018 at 15:11 Lukasz Cwik <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> For each BillingModel you receive over Kafka, how "fresh" should >>>>>>>>>>> the account information be? >>>>>>>>>>> Does the account information in the external store change? >>>>>>>>>>> >>>>>>>>>>> On Tue, May 15, 2018 at 11:22 AM Harshvardhan Agrawal < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> We have certain billing data that arrives to us from Kafka. The >>>>>>>>>>>> billing data is in json and it contains an account ID. In order >>>>>>>>>>>> for us to >>>>>>>>>>>> generate the final report we need to use some account data >>>>>>>>>>>> associated with >>>>>>>>>>>> the account id and is stored in an external database. >>>>>>>>>>>> >>>>>>>>>>>> It is possible that we get multiple billing info messages for >>>>>>>>>>>> the same account. We want to be able to lookup the account >>>>>>>>>>>> information for >>>>>>>>>>>> the messages in a window and then supply that as a side input to >>>>>>>>>>>> the next >>>>>>>>>>>> PTransform. >>>>>>>>>>>> >>>>>>>>>>>> Is it possible to achieve that in Beam? >>>>>>>>>>>> >>>>>>>>>>>> Here is my attempt: >>>>>>>>>>>> >>>>>>>>>>>> PCollection<KV<Integer, BillingModel>> billingDataPairs = >>>>>>>>>>>> p.apply("ReadBillingInfo", KafkaIO.<String, String>read() >>>>>>>>>>>> .withBootstrapServers(KAFKA_BOOTSTRAP_SERVER) >>>>>>>>>>>> .withTopic(KAFKA_TOPIC) >>>>>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>>>>> .withValueDeserializer(StringDeserializer.class) >>>>>>>>>>>> ) >>>>>>>>>>>> .apply("Window", >>>>>>>>>>>> Window.into(FixedWindows.of(Duration.standardSeconds(30)))) >>>>>>>>>>>> .apply("ProcessKafkaMessages",new KafkaProcessor()); >>>>>>>>>>>> >>>>>>>>>>>> PCollection<KV<Integer, Iterable<BillingModel>> >>>>>>>>>>>> billingData = billingDataPairs.apply(GroupByKey.<Integer, >>>>>>>>>>>> BillingModel>create()); >>>>>>>>>>>> >>>>>>>>>>>> PCollectionView<Map<Integer, Account>> accountData = >>>>>>>>>>>> billingDataPairs.apply("LookupAccounts",new >>>>>>>>>>>> AccountLookupClient()).apply(View.asMap()); >>>>>>>>>>>> >>>>>>>>>>>> billingDataPairs.apply(ParDo.of(new DoFn<KV<Integer, >>>>>>>>>>>> BillingModel>>(){ >>>>>>>>>>>> @ProcessElement >>>>>>>>>>>> public void processElement(ProcessContext ctx) { >>>>>>>>>>>> Integer accountId = ctx.element().getKey(); >>>>>>>>>>>> Iterable<BillingModel> billingModel = >>>>>>>>>>>> ctx.element().getValue(); >>>>>>>>>>>> Account account = >>>>>>>>>>>> ctx.sideinput(accountData).get(accountId); >>>>>>>>>>>> } >>>>>>>>>>>> })); >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> Harsh >>>>>>>>>>>> >>>>>>>>>>> -- > > *Regards,Harshvardhan Agrawal* > *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>* >
