Raja, Great discussion, just a couple more thoughts.
1. Add this operator to your DAG just before the operator which expects the tuples to be in order. If you order the tuples right after kafka input and then you have multiple partitions of a downstream operator, then the order will be lost again. So better to order only at the point in the DAG just before where it is required. 2. Seems like you are building this operator for your use case, it will be useful for a lot of other use cases too. So if possible, can you contribute the operator back to malhar? That way we can make it available for other to use too and collectively enhance it as needed. Regards, Ashwin. On Thu, Jun 9, 2016 at 2:43 PM, Raja.Aravapalli <[email protected]> wrote: > > Great. > > I will explore. Thanks for your inputs. > > > Regards, > Raja. > > From: Munagala Ramanath <[email protected]> > Reply-To: "[email protected]" <[email protected]> > Date: Thursday, June 9, 2016 at 3:38 PM > > To: "[email protected]" <[email protected]> > Subject: Re: kafka input is processing records in a jumbled order > > Any fields within an operator that are not declared "transient" are > considered part of the operator state and > are checkpointed; on failure, they are restored from the checkpoint > automatically by the platform. > > Ram > > On Thu, Jun 9, 2016 at 1:04 PM, Raja.Aravapalli < > [email protected]> wrote: > >> >> Really a great thought!! >> >> Wondering how will application handle the failures ? >> >> Also, what does this phrase mean “hold them in the operator state” ? >> Hold incoming messages in some data structure, map etc ? >> >> -Regards, >> Raja. >> >> From: Thomas Weise <[email protected]> >> Reply-To: "[email protected]" <[email protected]> >> Date: Thursday, June 9, 2016 at 2:57 PM >> To: "[email protected]" <[email protected]> >> Subject: Re: kafka input is processing records in a jumbled order >> >> You can order the messages by event time as they arrive, hold them in the >> operator state and then only emit those in the endWindow callback that are >> older than <threshold> (note that you are not simply emitting all, only >> those that are old enough). >> >> Thomas >> >> On Thu, Jun 9, 2016 at 12:22 PM, Raja.Aravapalli < >> [email protected]> wrote: >> >>> >>> So, you want me to add all the incoming tuples into a Map, and complete >>> the processing in endwindow() ?? >>> >>> How would this solve my problem as described below with windowing. >>> >>> >>> msg2 ts2 >>> —————— window ends here >>> msg1 ts1 >>> msg3 ts3 >>> msg4 ts4 >>> msg5 ts5 >>> msg7 ts7 >>> msg6 ts6 >>> —————— window ends here >>> >>> Thanks a lot for your inputs. Your thoughts are valuable!! >>> >>> >>> >>> Regards, >>> Raja. >>> >>> From: Thomas Weise <[email protected]> >>> Reply-To: "[email protected]" <[email protected]> >>> Date: Thursday, June 9, 2016 at 1:49 PM >>> >>> To: "[email protected]" <[email protected]> >>> Subject: Re: kafka input is processing records in a jumbled order >>> >>> There is no need for an extra thread. In fact, tuples should be emitted >>> in the operator thread only. This can be done in endWindow() >>> >>> -- >>> sent from mobile >>> On Jun 9, 2016 11:46 AM, "Sandesh Hegde" <[email protected]> >>> wrote: >>> >>>> >>>> How about something like this, >>>> >>>> Store the incoming tuples in the following format: >>>> TreeMap<TimeStamp, List<Tuples>> >>>> >>>> Create a Flusher thread, which periodically flushes the *fristKey*, >>>> after considering the lag. >>>> >>>> >>>> On Thu, Jun 9, 2016 at 11:09 AM Munagala Ramanath <[email protected]> >>>> wrote: >>>> >>>>> You'll need to have some some limit one how a lag is possible for >>>>> out-of-order messages. >>>>> If that limit is say 30s, then you'll need to buffer tuples for double >>>>> the lag -- 60s. >>>>> >>>>> You can configure the Application Window size suitably to do this. >>>>> >>>>> Ram >>>>> >>>>> On Thu, Jun 9, 2016 at 10:40 AM, Raja.Aravapalli < >>>>> [email protected]> wrote: >>>>> >>>>>> >>>>>> No aggregation, but I need messages to be played in sequential !! >>>>>> >>>>>> >>>>>> Ex: >>>>>> >>>>>> Below is the way actually msgs should come from my kafka topic >>>>>> >>>>>> msg1 ts1 >>>>>> msg2 ts2 >>>>>> msg3 ts3 >>>>>> msg4 ts4 >>>>>> msg5 ts5 >>>>>> msg6 ts6 >>>>>> msg7 ts7 >>>>>> >>>>>> >>>>>> But, due to some network issues I am seeing the messages in kafka >>>>>> topic something like below: >>>>>> >>>>>> msg2 ts2 ==> msg2 which actually should come after msg1, but >>>>>> unfortunately msg2 is coming to kafka before msg1, losing the sequence!! >>>>>> msg1 ts1 ==> delayed by few milli secs to seconds to reach on time!! >>>>>> msg3 ts3 >>>>>> msg4 ts4 >>>>>> msg5 ts5 >>>>>> msg7 ts7 ==> msg7 had come early into topics before msg6 >>>>>> msg6 ts6 ==> delayed !! >>>>>> >>>>>> >>>>>> I am losing the order of messages and business logic gives correct >>>>>> results only when msgs played in sequence!! >>>>>> >>>>>> Now if I define windowing/some buffering and then order on timestamp >>>>>> and play msgs… >>>>>> >>>>>> What if window boundary takes >>>>>> >>>>>> msg2 ts2 >>>>>> —————— window ends here >>>>>> msg1 ts1 >>>>>> msg3 ts3 >>>>>> msg4 ts4 >>>>>> msg5 ts5 >>>>>> msg7 ts7 >>>>>> msg6 ts6 >>>>>> —————— window ends here >>>>>> >>>>>> Now, if you see, even though I am trying to do buffering and then >>>>>> ordering the msgs based on some timstamp, I still face the problem of >>>>>> msg2 >>>>>> already processed before msg1 !! Which I don’t want. >>>>>> >>>>>> Did I really understand windowing correctly…. Pls correct me if I am >>>>>> wrong!! Thanks for your thoughts!! >>>>>> >>>>>> >>>>>> Regards, >>>>>> Raja. >>>>>> >>>>>> From: Thomas Weise <[email protected]> >>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>> Date: Thursday, June 9, 2016 at 10:51 AM >>>>>> To: "[email protected]" <[email protected]> >>>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>>> >>>>>> Apex can do stateful processing, you can define a window in which you >>>>>> can reorder the messages. It will have the same effect on latency as >>>>>> "micro-batching". >>>>>> >>>>>> Why is the ordering important? What operations do you perform on the >>>>>> data? Aggregation? >>>>>> >>>>>> Thanks, >>>>>> Thomas >>>>>> >>>>>> >>>>>> On Thu, Jun 9, 2016 at 8:23 AM, Raja.Aravapalli < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> >>>>>>> My bad… we observes our source data in kafka topics is not really in >>>>>>> a ordered fashion, where we are seeing the messages with few milli secs >>>>>>> delay.!! >>>>>>> >>>>>>> Source couldn’t ensure the ordering guarantee due to the network!! >>>>>>> >>>>>>> Is there a right way for me from consumer standpoint, I can ensure >>>>>>> ordering ?? Will micro batching work for me here ? Or Does apex support >>>>>>> micro batching and order the messages ? >>>>>>> >>>>>>> >>>>>>> >>>>>>> Regards, >>>>>>> Raja >>>>>>> >>>>>>> From: Thomas Weise <[email protected]> >>>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>>> Date: Tuesday, June 7, 2016 at 10:59 PM >>>>>>> >>>>>>> To: "[email protected]" <[email protected]> >>>>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>>>> >>>>>>> Raja, >>>>>>> >>>>>>> Please also confirm how you are using partitioning. If, for example, >>>>>>> in your DAG you shuffle the data received from Kafka in a way that is >>>>>>> different from the original partitioning, then it would be possible that >>>>>>> multiple downstream partitions process data that came from a single >>>>>>> Kafka >>>>>>> partition concurrently and therefore in a different order. >>>>>>> >>>>>>> Thomas >>>>>>> >>>>>>> >>>>>>> On Tue, Jun 7, 2016 at 6:33 PM, Raja.Aravapalli < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> >>>>>>>> Yes Devendra. >>>>>>>> >>>>>>>> p1.10 is read before p1.1 !! >>>>>>>> >>>>>>>> Sure I shall check that. Thanks a lot for your response. >>>>>>>> >>>>>>>> >>>>>>>> Regards, >>>>>>>> Raja. >>>>>>>> >>>>>>>> From: Devendra Tagare <[email protected]> >>>>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>>>> Date: Tuesday, June 7, 2016 at 7:59 PM >>>>>>>> >>>>>>>> To: "[email protected]" <[email protected]> >>>>>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>>>>> >>>>>>>> Hi Raja, >>>>>>>> >>>>>>>> Just to be clear are you suggesting that p1.10 is being read before >>>>>>>> p1.1 ? >>>>>>>> >>>>>>>> If thats the case can you use a console consumer that comes packed >>>>>>>> with kafka and verify the ordering based on timestamps ? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Dev >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jun 7, 2016 at 5:31 PM, Raja.Aravapalli < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> Thanks a lot Devendra Tagare for the response. >>>>>>>>> >>>>>>>>> What you said is very clear and understandable. But, wondering, I >>>>>>>>> am NOT getting that partition level order!! My operator is processing >>>>>>>>> the >>>>>>>>> records in jumbled order rather than in sequence! >>>>>>>>> And, I am saying this because, I am generating timestamps upon >>>>>>>>> tuple receipt and emitting that timestamp to my destination, which is >>>>>>>>> clearly showing the records are receiving to operator in a shuffled >>>>>>>>> order. >>>>>>>>> >>>>>>>>> I get records at milli second level differences!! Will that be a >>>>>>>>> problem ? >>>>>>>>> >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Raja. >>>>>>>>> >>>>>>>>> From: Devendra Tagare <[email protected]> >>>>>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>>>>> Date: Tuesday, June 7, 2016 at 7:12 PM >>>>>>>>> >>>>>>>>> To: "[email protected]" <[email protected]> >>>>>>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>>>>>> >>>>>>>>> Hi Raja, >>>>>>>>> >>>>>>>>> When you apply ONE_TO_MANY partitioning scheme, one instance of >>>>>>>>> the operator consumes from many partitions of a kafka topic. >>>>>>>>> >>>>>>>>> When you look at the consumed data, all the events coming from a >>>>>>>>> given partition would be ordered but there are no ordering guarantees >>>>>>>>> across partitions since kafka does not guarantee that >>>>>>>>> >>>>>>>>> eg : If 3 partitions of a topic p1,p2,p3 having 10 messages each >>>>>>>>> are connected to one physical partition of the KafkaInputOperator , >>>>>>>>> then >>>>>>>>> the ordering guarantee of p1.1 to p1.10 is honored.ie message 10 >>>>>>>>> of p1 be consumed only after messages 1 through 9 are consumed but the >>>>>>>>> operator could consumer messages in a order like >>>>>>>>> p1.1,p2.1,p1.2,p1.3,p3.1,p2.2..... which still follows the guarantees >>>>>>>>> per >>>>>>>>> partition. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Dev >>>>>>>>> >>>>>>>>> On Tue, Jun 7, 2016 at 5:00 PM, Raja.Aravapalli < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks for the response Thomas. >>>>>>>>>> >>>>>>>>>> My quick doubt is.. >>>>>>>>>> >>>>>>>>>> I have around 30 partitions of kafka topic, And all of them have >>>>>>>>>> messages ordered at partition level. >>>>>>>>>> >>>>>>>>>> So, when I consume those messages using single consumer[with >>>>>>>>>> ONE_TO_MANY strategy set], still the ordering doesn’t work ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> My messages in topic are guaranteed to be ordered at partition >>>>>>>>>> level. >>>>>>>>>> >>>>>>>>>> Thanks a lot in advance for your response. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Raja. >>>>>>>>>> >>>>>>>>>> From: Thomas Weise <[email protected]> >>>>>>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>>>>>> Date: Tuesday, June 7, 2016 at 5:52 PM >>>>>>>>>> To: "[email protected]" <[email protected]> >>>>>>>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>>>>>>> >>>>>>>>>> Raja, >>>>>>>>>> >>>>>>>>>> Are you expecting ordering across multiple Kafka partitions? >>>>>>>>>> >>>>>>>>>> All messages from a given Kafka partition are received by the >>>>>>>>>> same consumer and thus will be ordered. However, when messages come >>>>>>>>>> from >>>>>>>>>> multiple partitions there is no such guarantee. >>>>>>>>>> >>>>>>>>>> Thomas >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Jun 7, 2016 at 3:34 PM, Raja.Aravapalli < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Hi >>>>>>>>>>> >>>>>>>>>>> I have built a DAG, that reads from kafka and in the next >>>>>>>>>>> operators, does lookup to a hbase table and update hbase table >>>>>>>>>>> based on >>>>>>>>>>> some business logic. >>>>>>>>>>> >>>>>>>>>>> Some times my operator which does hbase lookup and update in the >>>>>>>>>>> same operator(Custom written), is processing the records it >>>>>>>>>>> receives from >>>>>>>>>>> kafka in a jumbled order, which is causing, many records being >>>>>>>>>>> ignored from >>>>>>>>>>> processing!! >>>>>>>>>>> >>>>>>>>>>> I am not using any parallel partitions/instance, and with >>>>>>>>>>> KafkaInputOperator I am using only partition strategy ONE_TO_MANY. >>>>>>>>>>> >>>>>>>>>>> I am very new to Apex. I expected, Apex will guarantee the >>>>>>>>>>> ordering. >>>>>>>>>>> >>>>>>>>>>> Can someone pls share your knowledge on the issue…? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks a lot in advance… >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> Raja. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >> > -- Regards, Ashwin.
