You can order the messages by event time as they arrive, hold them in the operator state and then only emit those in the endWindow callback that are older than <threshold> (note that you are not simply emitting all, only those that are old enough).
Thomas On Thu, Jun 9, 2016 at 12:22 PM, Raja.Aravapalli <[email protected] > wrote: > > So, you want me to add all the incoming tuples into a Map, and complete > the processing in endwindow() ?? > > How would this solve my problem as described below with windowing. > > > msg2 ts2 > —————— window ends here > msg1 ts1 > msg3 ts3 > msg4 ts4 > msg5 ts5 > msg7 ts7 > msg6 ts6 > —————— window ends here > > Thanks a lot for your inputs. Your thoughts are valuable!! > > > > Regards, > Raja. > > From: Thomas Weise <[email protected]> > Reply-To: "[email protected]" <[email protected]> > Date: Thursday, June 9, 2016 at 1:49 PM > > To: "[email protected]" <[email protected]> > Subject: Re: kafka input is processing records in a jumbled order > > There is no need for an extra thread. In fact, tuples should be emitted in > the operator thread only. This can be done in endWindow() > > -- > sent from mobile > On Jun 9, 2016 11:46 AM, "Sandesh Hegde" <[email protected]> wrote: > >> >> How about something like this, >> >> Store the incoming tuples in the following format: >> TreeMap<TimeStamp, List<Tuples>> >> >> Create a Flusher thread, which periodically flushes the *fristKey*, after >> considering the lag. >> >> >> On Thu, Jun 9, 2016 at 11:09 AM Munagala Ramanath <[email protected]> >> wrote: >> >>> You'll need to have some some limit one how a lag is possible for >>> out-of-order messages. >>> If that limit is say 30s, then you'll need to buffer tuples for double >>> the lag -- 60s. >>> >>> You can configure the Application Window size suitably to do this. >>> >>> Ram >>> >>> On Thu, Jun 9, 2016 at 10:40 AM, Raja.Aravapalli < >>> [email protected]> wrote: >>> >>>> >>>> No aggregation, but I need messages to be played in sequential !! >>>> >>>> >>>> Ex: >>>> >>>> Below is the way actually msgs should come from my kafka topic >>>> >>>> msg1 ts1 >>>> msg2 ts2 >>>> msg3 ts3 >>>> msg4 ts4 >>>> msg5 ts5 >>>> msg6 ts6 >>>> msg7 ts7 >>>> >>>> >>>> But, due to some network issues I am seeing the messages in kafka topic >>>> something like below: >>>> >>>> msg2 ts2 ==> msg2 which actually should come after msg1, but >>>> unfortunately msg2 is coming to kafka before msg1, losing the sequence!! >>>> msg1 ts1 ==> delayed by few milli secs to seconds to reach on time!! >>>> msg3 ts3 >>>> msg4 ts4 >>>> msg5 ts5 >>>> msg7 ts7 ==> msg7 had come early into topics before msg6 >>>> msg6 ts6 ==> delayed !! >>>> >>>> >>>> I am losing the order of messages and business logic gives correct >>>> results only when msgs played in sequence!! >>>> >>>> Now if I define windowing/some buffering and then order on timestamp >>>> and play msgs… >>>> >>>> What if window boundary takes >>>> >>>> msg2 ts2 >>>> —————— window ends here >>>> msg1 ts1 >>>> msg3 ts3 >>>> msg4 ts4 >>>> msg5 ts5 >>>> msg7 ts7 >>>> msg6 ts6 >>>> —————— window ends here >>>> >>>> Now, if you see, even though I am trying to do buffering and then >>>> ordering the msgs based on some timstamp, I still face the problem of msg2 >>>> already processed before msg1 !! Which I don’t want. >>>> >>>> Did I really understand windowing correctly…. Pls correct me if I am >>>> wrong!! Thanks for your thoughts!! >>>> >>>> >>>> Regards, >>>> Raja. >>>> >>>> From: Thomas Weise <[email protected]> >>>> Reply-To: "[email protected]" <[email protected]> >>>> Date: Thursday, June 9, 2016 at 10:51 AM >>>> To: "[email protected]" <[email protected]> >>>> Subject: Re: kafka input is processing records in a jumbled order >>>> >>>> Apex can do stateful processing, you can define a window in which you >>>> can reorder the messages. It will have the same effect on latency as >>>> "micro-batching". >>>> >>>> Why is the ordering important? What operations do you perform on the >>>> data? Aggregation? >>>> >>>> Thanks, >>>> Thomas >>>> >>>> >>>> On Thu, Jun 9, 2016 at 8:23 AM, Raja.Aravapalli < >>>> [email protected]> wrote: >>>> >>>>> >>>>> My bad… we observes our source data in kafka topics is not really in a >>>>> ordered fashion, where we are seeing the messages with few milli secs >>>>> delay.!! >>>>> >>>>> Source couldn’t ensure the ordering guarantee due to the network!! >>>>> >>>>> Is there a right way for me from consumer standpoint, I can ensure >>>>> ordering ?? Will micro batching work for me here ? Or Does apex support >>>>> micro batching and order the messages ? >>>>> >>>>> >>>>> >>>>> Regards, >>>>> Raja >>>>> >>>>> From: Thomas Weise <[email protected]> >>>>> Reply-To: "[email protected]" <[email protected]> >>>>> Date: Tuesday, June 7, 2016 at 10:59 PM >>>>> >>>>> To: "[email protected]" <[email protected]> >>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>> >>>>> Raja, >>>>> >>>>> Please also confirm how you are using partitioning. If, for example, >>>>> in your DAG you shuffle the data received from Kafka in a way that is >>>>> different from the original partitioning, then it would be possible that >>>>> multiple downstream partitions process data that came from a single Kafka >>>>> partition concurrently and therefore in a different order. >>>>> >>>>> Thomas >>>>> >>>>> >>>>> On Tue, Jun 7, 2016 at 6:33 PM, Raja.Aravapalli < >>>>> [email protected]> wrote: >>>>> >>>>>> >>>>>> Yes Devendra. >>>>>> >>>>>> p1.10 is read before p1.1 !! >>>>>> >>>>>> Sure I shall check that. Thanks a lot for your response. >>>>>> >>>>>> >>>>>> Regards, >>>>>> Raja. >>>>>> >>>>>> From: Devendra Tagare <[email protected]> >>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>> Date: Tuesday, June 7, 2016 at 7:59 PM >>>>>> >>>>>> To: "[email protected]" <[email protected]> >>>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>>> >>>>>> Hi Raja, >>>>>> >>>>>> Just to be clear are you suggesting that p1.10 is being read before >>>>>> p1.1 ? >>>>>> >>>>>> If thats the case can you use a console consumer that comes packed >>>>>> with kafka and verify the ordering based on timestamps ? >>>>>> >>>>>> Thanks, >>>>>> Dev >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Jun 7, 2016 at 5:31 PM, Raja.Aravapalli < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> >>>>>>> Thanks a lot Devendra Tagare for the response. >>>>>>> >>>>>>> What you said is very clear and understandable. But, wondering, I am >>>>>>> NOT getting that partition level order!! My operator is processing the >>>>>>> records in jumbled order rather than in sequence! >>>>>>> And, I am saying this because, I am generating timestamps upon tuple >>>>>>> receipt and emitting that timestamp to my destination, which is clearly >>>>>>> showing the records are receiving to operator in a shuffled order. >>>>>>> >>>>>>> I get records at milli second level differences!! Will that be a >>>>>>> problem ? >>>>>>> >>>>>>> >>>>>>> Regards, >>>>>>> Raja. >>>>>>> >>>>>>> From: Devendra Tagare <[email protected]> >>>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>>> Date: Tuesday, June 7, 2016 at 7:12 PM >>>>>>> >>>>>>> To: "[email protected]" <[email protected]> >>>>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>>>> >>>>>>> Hi Raja, >>>>>>> >>>>>>> When you apply ONE_TO_MANY partitioning scheme, one instance of the >>>>>>> operator consumes from many partitions of a kafka topic. >>>>>>> >>>>>>> When you look at the consumed data, all the events coming from a >>>>>>> given partition would be ordered but there are no ordering guarantees >>>>>>> across partitions since kafka does not guarantee that >>>>>>> >>>>>>> eg : If 3 partitions of a topic p1,p2,p3 having 10 messages each are >>>>>>> connected to one physical partition of the KafkaInputOperator , then the >>>>>>> ordering guarantee of p1.1 to p1.10 is honored.ie message 10 of p1 >>>>>>> be consumed only after messages 1 through 9 are consumed but the >>>>>>> operator >>>>>>> could consumer messages in a order like >>>>>>> p1.1,p2.1,p1.2,p1.3,p3.1,p2.2..... >>>>>>> which still follows the guarantees per partition. >>>>>>> >>>>>>> Thanks, >>>>>>> Dev >>>>>>> >>>>>>> On Tue, Jun 7, 2016 at 5:00 PM, Raja.Aravapalli < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> >>>>>>>> Thanks for the response Thomas. >>>>>>>> >>>>>>>> My quick doubt is.. >>>>>>>> >>>>>>>> I have around 30 partitions of kafka topic, And all of them have >>>>>>>> messages ordered at partition level. >>>>>>>> >>>>>>>> So, when I consume those messages using single consumer[with >>>>>>>> ONE_TO_MANY strategy set], still the ordering doesn’t work ? >>>>>>>> >>>>>>>> >>>>>>>> My messages in topic are guaranteed to be ordered at partition >>>>>>>> level. >>>>>>>> >>>>>>>> Thanks a lot in advance for your response. >>>>>>>> >>>>>>>> >>>>>>>> Regards, >>>>>>>> Raja. >>>>>>>> >>>>>>>> From: Thomas Weise <[email protected]> >>>>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>>>> Date: Tuesday, June 7, 2016 at 5:52 PM >>>>>>>> To: "[email protected]" <[email protected]> >>>>>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>>>>> >>>>>>>> Raja, >>>>>>>> >>>>>>>> Are you expecting ordering across multiple Kafka partitions? >>>>>>>> >>>>>>>> All messages from a given Kafka partition are received by the same >>>>>>>> consumer and thus will be ordered. However, when messages come from >>>>>>>> multiple partitions there is no such guarantee. >>>>>>>> >>>>>>>> Thomas >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jun 7, 2016 at 3:34 PM, Raja.Aravapalli < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> Hi >>>>>>>>> >>>>>>>>> I have built a DAG, that reads from kafka and in the next >>>>>>>>> operators, does lookup to a hbase table and update hbase table based >>>>>>>>> on >>>>>>>>> some business logic. >>>>>>>>> >>>>>>>>> Some times my operator which does hbase lookup and update in the >>>>>>>>> same operator(Custom written), is processing the records it receives >>>>>>>>> from >>>>>>>>> kafka in a jumbled order, which is causing, many records being >>>>>>>>> ignored from >>>>>>>>> processing!! >>>>>>>>> >>>>>>>>> I am not using any parallel partitions/instance, and with >>>>>>>>> KafkaInputOperator I am using only partition strategy ONE_TO_MANY. >>>>>>>>> >>>>>>>>> I am very new to Apex. I expected, Apex will guarantee the >>>>>>>>> ordering. >>>>>>>>> >>>>>>>>> Can someone pls share your knowledge on the issue…? >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks a lot in advance… >>>>>>>>> >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Raja. >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>
