Any fields within an operator that are not declared "transient" are considered part of the operator state and are checkpointed; on failure, they are restored from the checkpoint automatically by the platform.
Ram On Thu, Jun 9, 2016 at 1:04 PM, Raja.Aravapalli <[email protected]> wrote: > > Really a great thought!! > > Wondering how will application handle the failures ? > > Also, what does this phrase mean “hold them in the operator state” ? Hold > incoming messages in some data structure, map etc ? > > -Regards, > Raja. > > From: Thomas Weise <[email protected]> > Reply-To: "[email protected]" <[email protected]> > Date: Thursday, June 9, 2016 at 2:57 PM > To: "[email protected]" <[email protected]> > Subject: Re: kafka input is processing records in a jumbled order > > You can order the messages by event time as they arrive, hold them in the > operator state and then only emit those in the endWindow callback that are > older than <threshold> (note that you are not simply emitting all, only > those that are old enough). > > Thomas > > On Thu, Jun 9, 2016 at 12:22 PM, Raja.Aravapalli < > [email protected]> wrote: > >> >> So, you want me to add all the incoming tuples into a Map, and complete >> the processing in endwindow() ?? >> >> How would this solve my problem as described below with windowing. >> >> >> msg2 ts2 >> —————— window ends here >> msg1 ts1 >> msg3 ts3 >> msg4 ts4 >> msg5 ts5 >> msg7 ts7 >> msg6 ts6 >> —————— window ends here >> >> Thanks a lot for your inputs. Your thoughts are valuable!! >> >> >> >> Regards, >> Raja. >> >> From: Thomas Weise <[email protected]> >> Reply-To: "[email protected]" <[email protected]> >> Date: Thursday, June 9, 2016 at 1:49 PM >> >> To: "[email protected]" <[email protected]> >> Subject: Re: kafka input is processing records in a jumbled order >> >> There is no need for an extra thread. In fact, tuples should be emitted >> in the operator thread only. This can be done in endWindow() >> >> -- >> sent from mobile >> On Jun 9, 2016 11:46 AM, "Sandesh Hegde" <[email protected]> wrote: >> >>> >>> How about something like this, >>> >>> Store the incoming tuples in the following format: >>> TreeMap<TimeStamp, List<Tuples>> >>> >>> Create a Flusher thread, which periodically flushes the *fristKey*, >>> after considering the lag. >>> >>> >>> On Thu, Jun 9, 2016 at 11:09 AM Munagala Ramanath <[email protected]> >>> wrote: >>> >>>> You'll need to have some some limit one how a lag is possible for >>>> out-of-order messages. >>>> If that limit is say 30s, then you'll need to buffer tuples for double >>>> the lag -- 60s. >>>> >>>> You can configure the Application Window size suitably to do this. >>>> >>>> Ram >>>> >>>> On Thu, Jun 9, 2016 at 10:40 AM, Raja.Aravapalli < >>>> [email protected]> wrote: >>>> >>>>> >>>>> No aggregation, but I need messages to be played in sequential !! >>>>> >>>>> >>>>> Ex: >>>>> >>>>> Below is the way actually msgs should come from my kafka topic >>>>> >>>>> msg1 ts1 >>>>> msg2 ts2 >>>>> msg3 ts3 >>>>> msg4 ts4 >>>>> msg5 ts5 >>>>> msg6 ts6 >>>>> msg7 ts7 >>>>> >>>>> >>>>> But, due to some network issues I am seeing the messages in kafka >>>>> topic something like below: >>>>> >>>>> msg2 ts2 ==> msg2 which actually should come after msg1, but >>>>> unfortunately msg2 is coming to kafka before msg1, losing the sequence!! >>>>> msg1 ts1 ==> delayed by few milli secs to seconds to reach on time!! >>>>> msg3 ts3 >>>>> msg4 ts4 >>>>> msg5 ts5 >>>>> msg7 ts7 ==> msg7 had come early into topics before msg6 >>>>> msg6 ts6 ==> delayed !! >>>>> >>>>> >>>>> I am losing the order of messages and business logic gives correct >>>>> results only when msgs played in sequence!! >>>>> >>>>> Now if I define windowing/some buffering and then order on timestamp >>>>> and play msgs… >>>>> >>>>> What if window boundary takes >>>>> >>>>> msg2 ts2 >>>>> —————— window ends here >>>>> msg1 ts1 >>>>> msg3 ts3 >>>>> msg4 ts4 >>>>> msg5 ts5 >>>>> msg7 ts7 >>>>> msg6 ts6 >>>>> —————— window ends here >>>>> >>>>> Now, if you see, even though I am trying to do buffering and then >>>>> ordering the msgs based on some timstamp, I still face the problem of msg2 >>>>> already processed before msg1 !! Which I don’t want. >>>>> >>>>> Did I really understand windowing correctly…. Pls correct me if I am >>>>> wrong!! Thanks for your thoughts!! >>>>> >>>>> >>>>> Regards, >>>>> Raja. >>>>> >>>>> From: Thomas Weise <[email protected]> >>>>> Reply-To: "[email protected]" <[email protected]> >>>>> Date: Thursday, June 9, 2016 at 10:51 AM >>>>> To: "[email protected]" <[email protected]> >>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>> >>>>> Apex can do stateful processing, you can define a window in which you >>>>> can reorder the messages. It will have the same effect on latency as >>>>> "micro-batching". >>>>> >>>>> Why is the ordering important? What operations do you perform on the >>>>> data? Aggregation? >>>>> >>>>> Thanks, >>>>> Thomas >>>>> >>>>> >>>>> On Thu, Jun 9, 2016 at 8:23 AM, Raja.Aravapalli < >>>>> [email protected]> wrote: >>>>> >>>>>> >>>>>> My bad… we observes our source data in kafka topics is not really in >>>>>> a ordered fashion, where we are seeing the messages with few milli secs >>>>>> delay.!! >>>>>> >>>>>> Source couldn’t ensure the ordering guarantee due to the network!! >>>>>> >>>>>> Is there a right way for me from consumer standpoint, I can ensure >>>>>> ordering ?? Will micro batching work for me here ? Or Does apex support >>>>>> micro batching and order the messages ? >>>>>> >>>>>> >>>>>> >>>>>> Regards, >>>>>> Raja >>>>>> >>>>>> From: Thomas Weise <[email protected]> >>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>> Date: Tuesday, June 7, 2016 at 10:59 PM >>>>>> >>>>>> To: "[email protected]" <[email protected]> >>>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>>> >>>>>> Raja, >>>>>> >>>>>> Please also confirm how you are using partitioning. If, for example, >>>>>> in your DAG you shuffle the data received from Kafka in a way that is >>>>>> different from the original partitioning, then it would be possible that >>>>>> multiple downstream partitions process data that came from a single Kafka >>>>>> partition concurrently and therefore in a different order. >>>>>> >>>>>> Thomas >>>>>> >>>>>> >>>>>> On Tue, Jun 7, 2016 at 6:33 PM, Raja.Aravapalli < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> >>>>>>> Yes Devendra. >>>>>>> >>>>>>> p1.10 is read before p1.1 !! >>>>>>> >>>>>>> Sure I shall check that. Thanks a lot for your response. >>>>>>> >>>>>>> >>>>>>> Regards, >>>>>>> Raja. >>>>>>> >>>>>>> From: Devendra Tagare <[email protected]> >>>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>>> Date: Tuesday, June 7, 2016 at 7:59 PM >>>>>>> >>>>>>> To: "[email protected]" <[email protected]> >>>>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>>>> >>>>>>> Hi Raja, >>>>>>> >>>>>>> Just to be clear are you suggesting that p1.10 is being read before >>>>>>> p1.1 ? >>>>>>> >>>>>>> If thats the case can you use a console consumer that comes packed >>>>>>> with kafka and verify the ordering based on timestamps ? >>>>>>> >>>>>>> Thanks, >>>>>>> Dev >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Jun 7, 2016 at 5:31 PM, Raja.Aravapalli < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> >>>>>>>> Thanks a lot Devendra Tagare for the response. >>>>>>>> >>>>>>>> What you said is very clear and understandable. But, wondering, I >>>>>>>> am NOT getting that partition level order!! My operator is processing >>>>>>>> the >>>>>>>> records in jumbled order rather than in sequence! >>>>>>>> And, I am saying this because, I am generating timestamps upon >>>>>>>> tuple receipt and emitting that timestamp to my destination, which is >>>>>>>> clearly showing the records are receiving to operator in a shuffled >>>>>>>> order. >>>>>>>> >>>>>>>> I get records at milli second level differences!! Will that be a >>>>>>>> problem ? >>>>>>>> >>>>>>>> >>>>>>>> Regards, >>>>>>>> Raja. >>>>>>>> >>>>>>>> From: Devendra Tagare <[email protected]> >>>>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>>>> Date: Tuesday, June 7, 2016 at 7:12 PM >>>>>>>> >>>>>>>> To: "[email protected]" <[email protected]> >>>>>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>>>>> >>>>>>>> Hi Raja, >>>>>>>> >>>>>>>> When you apply ONE_TO_MANY partitioning scheme, one instance of >>>>>>>> the operator consumes from many partitions of a kafka topic. >>>>>>>> >>>>>>>> When you look at the consumed data, all the events coming from a >>>>>>>> given partition would be ordered but there are no ordering guarantees >>>>>>>> across partitions since kafka does not guarantee that >>>>>>>> >>>>>>>> eg : If 3 partitions of a topic p1,p2,p3 having 10 messages each >>>>>>>> are connected to one physical partition of the KafkaInputOperator , >>>>>>>> then >>>>>>>> the ordering guarantee of p1.1 to p1.10 is honored.ie message 10 >>>>>>>> of p1 be consumed only after messages 1 through 9 are consumed but the >>>>>>>> operator could consumer messages in a order like >>>>>>>> p1.1,p2.1,p1.2,p1.3,p3.1,p2.2..... which still follows the guarantees >>>>>>>> per >>>>>>>> partition. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Dev >>>>>>>> >>>>>>>> On Tue, Jun 7, 2016 at 5:00 PM, Raja.Aravapalli < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> Thanks for the response Thomas. >>>>>>>>> >>>>>>>>> My quick doubt is.. >>>>>>>>> >>>>>>>>> I have around 30 partitions of kafka topic, And all of them have >>>>>>>>> messages ordered at partition level. >>>>>>>>> >>>>>>>>> So, when I consume those messages using single consumer[with >>>>>>>>> ONE_TO_MANY strategy set], still the ordering doesn’t work ? >>>>>>>>> >>>>>>>>> >>>>>>>>> My messages in topic are guaranteed to be ordered at partition >>>>>>>>> level. >>>>>>>>> >>>>>>>>> Thanks a lot in advance for your response. >>>>>>>>> >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Raja. >>>>>>>>> >>>>>>>>> From: Thomas Weise <[email protected]> >>>>>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>>>>> Date: Tuesday, June 7, 2016 at 5:52 PM >>>>>>>>> To: "[email protected]" <[email protected]> >>>>>>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>>>>>> >>>>>>>>> Raja, >>>>>>>>> >>>>>>>>> Are you expecting ordering across multiple Kafka partitions? >>>>>>>>> >>>>>>>>> All messages from a given Kafka partition are received by the same >>>>>>>>> consumer and thus will be ordered. However, when messages come from >>>>>>>>> multiple partitions there is no such guarantee. >>>>>>>>> >>>>>>>>> Thomas >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Jun 7, 2016 at 3:34 PM, Raja.Aravapalli < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> Hi >>>>>>>>>> >>>>>>>>>> I have built a DAG, that reads from kafka and in the next >>>>>>>>>> operators, does lookup to a hbase table and update hbase table based >>>>>>>>>> on >>>>>>>>>> some business logic. >>>>>>>>>> >>>>>>>>>> Some times my operator which does hbase lookup and update in the >>>>>>>>>> same operator(Custom written), is processing the records it receives >>>>>>>>>> from >>>>>>>>>> kafka in a jumbled order, which is causing, many records being >>>>>>>>>> ignored from >>>>>>>>>> processing!! >>>>>>>>>> >>>>>>>>>> I am not using any parallel partitions/instance, and with >>>>>>>>>> KafkaInputOperator I am using only partition strategy ONE_TO_MANY. >>>>>>>>>> >>>>>>>>>> I am very new to Apex. I expected, Apex will guarantee the >>>>>>>>>> ordering. >>>>>>>>>> >>>>>>>>>> Can someone pls share your knowledge on the issue…? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks a lot in advance… >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Raja. >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >
