How about something like this, Store the incoming tuples in the following format: TreeMap<TimeStamp, List<Tuples>>
Create a Flusher thread, which periodically flushes the *fristKey*, after considering the lag. On Thu, Jun 9, 2016 at 11:09 AM Munagala Ramanath <[email protected]> wrote: > You'll need to have some some limit one how a lag is possible for > out-of-order messages. > If that limit is say 30s, then you'll need to buffer tuples for double the > lag -- 60s. > > You can configure the Application Window size suitably to do this. > > Ram > > On Thu, Jun 9, 2016 at 10:40 AM, Raja.Aravapalli < > [email protected]> wrote: > >> >> No aggregation, but I need messages to be played in sequential !! >> >> >> Ex: >> >> Below is the way actually msgs should come from my kafka topic >> >> msg1 ts1 >> msg2 ts2 >> msg3 ts3 >> msg4 ts4 >> msg5 ts5 >> msg6 ts6 >> msg7 ts7 >> >> >> But, due to some network issues I am seeing the messages in kafka topic >> something like below: >> >> msg2 ts2 ==> msg2 which actually should come after msg1, but >> unfortunately msg2 is coming to kafka before msg1, losing the sequence!! >> msg1 ts1 ==> delayed by few milli secs to seconds to reach on time!! >> msg3 ts3 >> msg4 ts4 >> msg5 ts5 >> msg7 ts7 ==> msg7 had come early into topics before msg6 >> msg6 ts6 ==> delayed !! >> >> >> I am losing the order of messages and business logic gives correct >> results only when msgs played in sequence!! >> >> Now if I define windowing/some buffering and then order on timestamp and >> play msgs… >> >> What if window boundary takes >> >> msg2 ts2 >> —————— window ends here >> msg1 ts1 >> msg3 ts3 >> msg4 ts4 >> msg5 ts5 >> msg7 ts7 >> msg6 ts6 >> —————— window ends here >> >> Now, if you see, even though I am trying to do buffering and then >> ordering the msgs based on some timstamp, I still face the problem of msg2 >> already processed before msg1 !! Which I don’t want. >> >> Did I really understand windowing correctly…. Pls correct me if I am >> wrong!! Thanks for your thoughts!! >> >> >> Regards, >> Raja. >> >> From: Thomas Weise <[email protected]> >> Reply-To: "[email protected]" <[email protected]> >> Date: Thursday, June 9, 2016 at 10:51 AM >> To: "[email protected]" <[email protected]> >> Subject: Re: kafka input is processing records in a jumbled order >> >> Apex can do stateful processing, you can define a window in which you can >> reorder the messages. It will have the same effect on latency as >> "micro-batching". >> >> Why is the ordering important? What operations do you perform on the >> data? Aggregation? >> >> Thanks, >> Thomas >> >> >> On Thu, Jun 9, 2016 at 8:23 AM, Raja.Aravapalli < >> [email protected]> wrote: >> >>> >>> My bad… we observes our source data in kafka topics is not really in a >>> ordered fashion, where we are seeing the messages with few milli secs >>> delay.!! >>> >>> Source couldn’t ensure the ordering guarantee due to the network!! >>> >>> Is there a right way for me from consumer standpoint, I can ensure >>> ordering ?? Will micro batching work for me here ? Or Does apex support >>> micro batching and order the messages ? >>> >>> >>> >>> Regards, >>> Raja >>> >>> From: Thomas Weise <[email protected]> >>> Reply-To: "[email protected]" <[email protected]> >>> Date: Tuesday, June 7, 2016 at 10:59 PM >>> >>> To: "[email protected]" <[email protected]> >>> Subject: Re: kafka input is processing records in a jumbled order >>> >>> Raja, >>> >>> Please also confirm how you are using partitioning. If, for example, in >>> your DAG you shuffle the data received from Kafka in a way that is >>> different from the original partitioning, then it would be possible that >>> multiple downstream partitions process data that came from a single Kafka >>> partition concurrently and therefore in a different order. >>> >>> Thomas >>> >>> >>> On Tue, Jun 7, 2016 at 6:33 PM, Raja.Aravapalli < >>> [email protected]> wrote: >>> >>>> >>>> Yes Devendra. >>>> >>>> p1.10 is read before p1.1 !! >>>> >>>> Sure I shall check that. Thanks a lot for your response. >>>> >>>> >>>> Regards, >>>> Raja. >>>> >>>> From: Devendra Tagare <[email protected]> >>>> Reply-To: "[email protected]" <[email protected]> >>>> Date: Tuesday, June 7, 2016 at 7:59 PM >>>> >>>> To: "[email protected]" <[email protected]> >>>> Subject: Re: kafka input is processing records in a jumbled order >>>> >>>> Hi Raja, >>>> >>>> Just to be clear are you suggesting that p1.10 is being read before >>>> p1.1 ? >>>> >>>> If thats the case can you use a console consumer that comes packed with >>>> kafka and verify the ordering based on timestamps ? >>>> >>>> Thanks, >>>> Dev >>>> >>>> >>>> >>>> On Tue, Jun 7, 2016 at 5:31 PM, Raja.Aravapalli < >>>> [email protected]> wrote: >>>> >>>>> >>>>> Thanks a lot Devendra Tagare for the response. >>>>> >>>>> What you said is very clear and understandable. But, wondering, I am >>>>> NOT getting that partition level order!! My operator is processing the >>>>> records in jumbled order rather than in sequence! >>>>> And, I am saying this because, I am generating timestamps upon tuple >>>>> receipt and emitting that timestamp to my destination, which is clearly >>>>> showing the records are receiving to operator in a shuffled order. >>>>> >>>>> I get records at milli second level differences!! Will that be a >>>>> problem ? >>>>> >>>>> >>>>> Regards, >>>>> Raja. >>>>> >>>>> From: Devendra Tagare <[email protected]> >>>>> Reply-To: "[email protected]" <[email protected]> >>>>> Date: Tuesday, June 7, 2016 at 7:12 PM >>>>> >>>>> To: "[email protected]" <[email protected]> >>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>> >>>>> Hi Raja, >>>>> >>>>> When you apply ONE_TO_MANY partitioning scheme, one instance of the >>>>> operator consumes from many partitions of a kafka topic. >>>>> >>>>> When you look at the consumed data, all the events coming from a given >>>>> partition would be ordered but there are no ordering guarantees across >>>>> partitions since kafka does not guarantee that >>>>> >>>>> eg : If 3 partitions of a topic p1,p2,p3 having 10 messages each are >>>>> connected to one physical partition of the KafkaInputOperator , then the >>>>> ordering guarantee of p1.1 to p1.10 is honored.ie message 10 of p1 be >>>>> consumed only after messages 1 through 9 are consumed but the operator >>>>> could consumer messages in a order like p1.1,p2.1,p1.2,p1.3,p3.1,p2.2..... >>>>> which still follows the guarantees per partition. >>>>> >>>>> Thanks, >>>>> Dev >>>>> >>>>> On Tue, Jun 7, 2016 at 5:00 PM, Raja.Aravapalli < >>>>> [email protected]> wrote: >>>>> >>>>>> >>>>>> Thanks for the response Thomas. >>>>>> >>>>>> My quick doubt is.. >>>>>> >>>>>> I have around 30 partitions of kafka topic, And all of them have >>>>>> messages ordered at partition level. >>>>>> >>>>>> So, when I consume those messages using single consumer[with >>>>>> ONE_TO_MANY strategy set], still the ordering doesn’t work ? >>>>>> >>>>>> >>>>>> My messages in topic are guaranteed to be ordered at partition level. >>>>>> >>>>>> Thanks a lot in advance for your response. >>>>>> >>>>>> >>>>>> Regards, >>>>>> Raja. >>>>>> >>>>>> From: Thomas Weise <[email protected]> >>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>> Date: Tuesday, June 7, 2016 at 5:52 PM >>>>>> To: "[email protected]" <[email protected]> >>>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>>> >>>>>> Raja, >>>>>> >>>>>> Are you expecting ordering across multiple Kafka partitions? >>>>>> >>>>>> All messages from a given Kafka partition are received by the same >>>>>> consumer and thus will be ordered. However, when messages come from >>>>>> multiple partitions there is no such guarantee. >>>>>> >>>>>> Thomas >>>>>> >>>>>> >>>>>> On Tue, Jun 7, 2016 at 3:34 PM, Raja.Aravapalli < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> >>>>>>> Hi >>>>>>> >>>>>>> I have built a DAG, that reads from kafka and in the next operators, >>>>>>> does lookup to a hbase table and update hbase table based on some >>>>>>> business >>>>>>> logic. >>>>>>> >>>>>>> Some times my operator which does hbase lookup and update in the >>>>>>> same operator(Custom written), is processing the records it receives >>>>>>> from >>>>>>> kafka in a jumbled order, which is causing, many records being ignored >>>>>>> from >>>>>>> processing!! >>>>>>> >>>>>>> I am not using any parallel partitions/instance, and with >>>>>>> KafkaInputOperator I am using only partition strategy ONE_TO_MANY. >>>>>>> >>>>>>> I am very new to Apex. I expected, Apex will guarantee the ordering. >>>>>>> >>>>>>> Can someone pls share your knowledge on the issue…? >>>>>>> >>>>>>> >>>>>>> Thanks a lot in advance… >>>>>>> >>>>>>> >>>>>>> Regards, >>>>>>> Raja. >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >
