There is no need for an extra thread. In fact, tuples should be emitted in the operator thread only. This can be done in endWindow()
-- sent from mobile On Jun 9, 2016 11:46 AM, "Sandesh Hegde" <[email protected]> wrote: > > How about something like this, > > Store the incoming tuples in the following format: > TreeMap<TimeStamp, List<Tuples>> > > Create a Flusher thread, which periodically flushes the *fristKey*, after > considering the lag. > > > On Thu, Jun 9, 2016 at 11:09 AM Munagala Ramanath <[email protected]> > wrote: > >> You'll need to have some some limit one how a lag is possible for >> out-of-order messages. >> If that limit is say 30s, then you'll need to buffer tuples for double >> the lag -- 60s. >> >> You can configure the Application Window size suitably to do this. >> >> Ram >> >> On Thu, Jun 9, 2016 at 10:40 AM, Raja.Aravapalli < >> [email protected]> wrote: >> >>> >>> No aggregation, but I need messages to be played in sequential !! >>> >>> >>> Ex: >>> >>> Below is the way actually msgs should come from my kafka topic >>> >>> msg1 ts1 >>> msg2 ts2 >>> msg3 ts3 >>> msg4 ts4 >>> msg5 ts5 >>> msg6 ts6 >>> msg7 ts7 >>> >>> >>> But, due to some network issues I am seeing the messages in kafka topic >>> something like below: >>> >>> msg2 ts2 ==> msg2 which actually should come after msg1, but >>> unfortunately msg2 is coming to kafka before msg1, losing the sequence!! >>> msg1 ts1 ==> delayed by few milli secs to seconds to reach on time!! >>> msg3 ts3 >>> msg4 ts4 >>> msg5 ts5 >>> msg7 ts7 ==> msg7 had come early into topics before msg6 >>> msg6 ts6 ==> delayed !! >>> >>> >>> I am losing the order of messages and business logic gives correct >>> results only when msgs played in sequence!! >>> >>> Now if I define windowing/some buffering and then order on timestamp and >>> play msgs… >>> >>> What if window boundary takes >>> >>> msg2 ts2 >>> —————— window ends here >>> msg1 ts1 >>> msg3 ts3 >>> msg4 ts4 >>> msg5 ts5 >>> msg7 ts7 >>> msg6 ts6 >>> —————— window ends here >>> >>> Now, if you see, even though I am trying to do buffering and then >>> ordering the msgs based on some timstamp, I still face the problem of msg2 >>> already processed before msg1 !! Which I don’t want. >>> >>> Did I really understand windowing correctly…. Pls correct me if I am >>> wrong!! Thanks for your thoughts!! >>> >>> >>> Regards, >>> Raja. >>> >>> From: Thomas Weise <[email protected]> >>> Reply-To: "[email protected]" <[email protected]> >>> Date: Thursday, June 9, 2016 at 10:51 AM >>> To: "[email protected]" <[email protected]> >>> Subject: Re: kafka input is processing records in a jumbled order >>> >>> Apex can do stateful processing, you can define a window in which you >>> can reorder the messages. It will have the same effect on latency as >>> "micro-batching". >>> >>> Why is the ordering important? What operations do you perform on the >>> data? Aggregation? >>> >>> Thanks, >>> Thomas >>> >>> >>> On Thu, Jun 9, 2016 at 8:23 AM, Raja.Aravapalli < >>> [email protected]> wrote: >>> >>>> >>>> My bad… we observes our source data in kafka topics is not really in a >>>> ordered fashion, where we are seeing the messages with few milli secs >>>> delay.!! >>>> >>>> Source couldn’t ensure the ordering guarantee due to the network!! >>>> >>>> Is there a right way for me from consumer standpoint, I can ensure >>>> ordering ?? Will micro batching work for me here ? Or Does apex support >>>> micro batching and order the messages ? >>>> >>>> >>>> >>>> Regards, >>>> Raja >>>> >>>> From: Thomas Weise <[email protected]> >>>> Reply-To: "[email protected]" <[email protected]> >>>> Date: Tuesday, June 7, 2016 at 10:59 PM >>>> >>>> To: "[email protected]" <[email protected]> >>>> Subject: Re: kafka input is processing records in a jumbled order >>>> >>>> Raja, >>>> >>>> Please also confirm how you are using partitioning. If, for example, in >>>> your DAG you shuffle the data received from Kafka in a way that is >>>> different from the original partitioning, then it would be possible that >>>> multiple downstream partitions process data that came from a single Kafka >>>> partition concurrently and therefore in a different order. >>>> >>>> Thomas >>>> >>>> >>>> On Tue, Jun 7, 2016 at 6:33 PM, Raja.Aravapalli < >>>> [email protected]> wrote: >>>> >>>>> >>>>> Yes Devendra. >>>>> >>>>> p1.10 is read before p1.1 !! >>>>> >>>>> Sure I shall check that. Thanks a lot for your response. >>>>> >>>>> >>>>> Regards, >>>>> Raja. >>>>> >>>>> From: Devendra Tagare <[email protected]> >>>>> Reply-To: "[email protected]" <[email protected]> >>>>> Date: Tuesday, June 7, 2016 at 7:59 PM >>>>> >>>>> To: "[email protected]" <[email protected]> >>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>> >>>>> Hi Raja, >>>>> >>>>> Just to be clear are you suggesting that p1.10 is being read before >>>>> p1.1 ? >>>>> >>>>> If thats the case can you use a console consumer that comes packed >>>>> with kafka and verify the ordering based on timestamps ? >>>>> >>>>> Thanks, >>>>> Dev >>>>> >>>>> >>>>> >>>>> On Tue, Jun 7, 2016 at 5:31 PM, Raja.Aravapalli < >>>>> [email protected]> wrote: >>>>> >>>>>> >>>>>> Thanks a lot Devendra Tagare for the response. >>>>>> >>>>>> What you said is very clear and understandable. But, wondering, I am >>>>>> NOT getting that partition level order!! My operator is processing the >>>>>> records in jumbled order rather than in sequence! >>>>>> And, I am saying this because, I am generating timestamps upon tuple >>>>>> receipt and emitting that timestamp to my destination, which is clearly >>>>>> showing the records are receiving to operator in a shuffled order. >>>>>> >>>>>> I get records at milli second level differences!! Will that be a >>>>>> problem ? >>>>>> >>>>>> >>>>>> Regards, >>>>>> Raja. >>>>>> >>>>>> From: Devendra Tagare <[email protected]> >>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>> Date: Tuesday, June 7, 2016 at 7:12 PM >>>>>> >>>>>> To: "[email protected]" <[email protected]> >>>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>>> >>>>>> Hi Raja, >>>>>> >>>>>> When you apply ONE_TO_MANY partitioning scheme, one instance of the >>>>>> operator consumes from many partitions of a kafka topic. >>>>>> >>>>>> When you look at the consumed data, all the events coming from a >>>>>> given partition would be ordered but there are no ordering guarantees >>>>>> across partitions since kafka does not guarantee that >>>>>> >>>>>> eg : If 3 partitions of a topic p1,p2,p3 having 10 messages each are >>>>>> connected to one physical partition of the KafkaInputOperator , then the >>>>>> ordering guarantee of p1.1 to p1.10 is honored.ie message 10 of p1 >>>>>> be consumed only after messages 1 through 9 are consumed but the operator >>>>>> could consumer messages in a order like >>>>>> p1.1,p2.1,p1.2,p1.3,p3.1,p2.2..... >>>>>> which still follows the guarantees per partition. >>>>>> >>>>>> Thanks, >>>>>> Dev >>>>>> >>>>>> On Tue, Jun 7, 2016 at 5:00 PM, Raja.Aravapalli < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> >>>>>>> Thanks for the response Thomas. >>>>>>> >>>>>>> My quick doubt is.. >>>>>>> >>>>>>> I have around 30 partitions of kafka topic, And all of them have >>>>>>> messages ordered at partition level. >>>>>>> >>>>>>> So, when I consume those messages using single consumer[with >>>>>>> ONE_TO_MANY strategy set], still the ordering doesn’t work ? >>>>>>> >>>>>>> >>>>>>> My messages in topic are guaranteed to be ordered at partition >>>>>>> level. >>>>>>> >>>>>>> Thanks a lot in advance for your response. >>>>>>> >>>>>>> >>>>>>> Regards, >>>>>>> Raja. >>>>>>> >>>>>>> From: Thomas Weise <[email protected]> >>>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>>> Date: Tuesday, June 7, 2016 at 5:52 PM >>>>>>> To: "[email protected]" <[email protected]> >>>>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>>>> >>>>>>> Raja, >>>>>>> >>>>>>> Are you expecting ordering across multiple Kafka partitions? >>>>>>> >>>>>>> All messages from a given Kafka partition are received by the same >>>>>>> consumer and thus will be ordered. However, when messages come from >>>>>>> multiple partitions there is no such guarantee. >>>>>>> >>>>>>> Thomas >>>>>>> >>>>>>> >>>>>>> On Tue, Jun 7, 2016 at 3:34 PM, Raja.Aravapalli < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> >>>>>>>> Hi >>>>>>>> >>>>>>>> I have built a DAG, that reads from kafka and in the next >>>>>>>> operators, does lookup to a hbase table and update hbase table based on >>>>>>>> some business logic. >>>>>>>> >>>>>>>> Some times my operator which does hbase lookup and update in the >>>>>>>> same operator(Custom written), is processing the records it receives >>>>>>>> from >>>>>>>> kafka in a jumbled order, which is causing, many records being ignored >>>>>>>> from >>>>>>>> processing!! >>>>>>>> >>>>>>>> I am not using any parallel partitions/instance, and with >>>>>>>> KafkaInputOperator I am using only partition strategy ONE_TO_MANY. >>>>>>>> >>>>>>>> I am very new to Apex. I expected, Apex will guarantee the ordering. >>>>>>>> >>>>>>>> Can someone pls share your knowledge on the issue…? >>>>>>>> >>>>>>>> >>>>>>>> Thanks a lot in advance… >>>>>>>> >>>>>>>> >>>>>>>> Regards, >>>>>>>> Raja. >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>
