Sure Raja, you can start a design discussion thread on the dev mailing list for initial design and we can evolve it from there as a community.
Here are the contribution guidelines. https://apex.apache.org/contributing.html Regards, Ashwin. On Sat, Jun 11, 2016 at 7:03 AM, Raja.Aravapalli <[email protected] > wrote: > > Thanks Ashwin for sharing your thoughts. > > Sure, I will work towards contributing. But, I am very new to this. May > take a good amount of time for me to come with right design for operator. > > I still couldn’t able to find the right lag time for my application, > because still some times few messages are arriving delay than expected (may > be network is too slow)!! > > However, with introduction of lag, I was able to decrease a amount of > records that were missing significantly than earlier. > > Thanks team for sharing your thoughts :) > > > > -Regards, > Raja. > > > From: Ashwin Chandra Putta <[email protected]> > Reply-To: "[email protected]" <[email protected]> > Date: Friday, June 10, 2016 at 5:24 PM > > To: "[email protected]" <[email protected]> > Subject: Re: kafka input is processing records in a jumbled order > > Raja, > > Great discussion, just a couple more thoughts. > > 1. Add this operator to your DAG just before the operator which expects > the tuples to be in order. If you order the tuples right after kafka input > and then you have multiple partitions of a downstream operator, then the > order will be lost again. So better to order only at the point in the DAG > just before where it is required. > 2. Seems like you are building this operator for your use case, it will be > useful for a lot of other use cases too. So if possible, can you contribute > the operator back to malhar? That way we can make it available for other to > use too and collectively enhance it as needed. > > Regards, > Ashwin. > > On Thu, Jun 9, 2016 at 2:43 PM, Raja.Aravapalli < > [email protected]> wrote: > >> >> Great. >> >> I will explore. Thanks for your inputs. >> >> >> Regards, >> Raja. >> >> From: Munagala Ramanath <[email protected]> >> Reply-To: "[email protected]" <[email protected]> >> Date: Thursday, June 9, 2016 at 3:38 PM >> >> To: "[email protected]" <[email protected]> >> Subject: Re: kafka input is processing records in a jumbled order >> >> Any fields within an operator that are not declared "transient" are >> considered part of the operator state and >> are checkpointed; on failure, they are restored from the checkpoint >> automatically by the platform. >> >> Ram >> >> On Thu, Jun 9, 2016 at 1:04 PM, Raja.Aravapalli < >> [email protected]> wrote: >> >>> >>> Really a great thought!! >>> >>> Wondering how will application handle the failures ? >>> >>> Also, what does this phrase mean “hold them in the operator state” ? >>> Hold incoming messages in some data structure, map etc ? >>> >>> -Regards, >>> Raja. >>> >>> From: Thomas Weise <[email protected]> >>> Reply-To: "[email protected]" <[email protected]> >>> Date: Thursday, June 9, 2016 at 2:57 PM >>> To: "[email protected]" <[email protected]> >>> Subject: Re: kafka input is processing records in a jumbled order >>> >>> You can order the messages by event time as they arrive, hold them in >>> the operator state and then only emit those in the endWindow callback that >>> are older than <threshold> (note that you are not simply emitting all, only >>> those that are old enough). >>> >>> Thomas >>> >>> On Thu, Jun 9, 2016 at 12:22 PM, Raja.Aravapalli < >>> [email protected]> wrote: >>> >>>> >>>> So, you want me to add all the incoming tuples into a Map, and >>>> complete the processing in endwindow() ?? >>>> >>>> How would this solve my problem as described below with windowing. >>>> >>>> >>>> msg2 ts2 >>>> —————— window ends here >>>> msg1 ts1 >>>> msg3 ts3 >>>> msg4 ts4 >>>> msg5 ts5 >>>> msg7 ts7 >>>> msg6 ts6 >>>> —————— window ends here >>>> >>>> Thanks a lot for your inputs. Your thoughts are valuable!! >>>> >>>> >>>> >>>> Regards, >>>> Raja. >>>> >>>> From: Thomas Weise <[email protected]> >>>> Reply-To: "[email protected]" <[email protected]> >>>> Date: Thursday, June 9, 2016 at 1:49 PM >>>> >>>> To: "[email protected]" <[email protected]> >>>> Subject: Re: kafka input is processing records in a jumbled order >>>> >>>> There is no need for an extra thread. In fact, tuples should be emitted >>>> in the operator thread only. This can be done in endWindow() >>>> >>>> -- >>>> sent from mobile >>>> On Jun 9, 2016 11:46 AM, "Sandesh Hegde" <[email protected]> >>>> wrote: >>>> >>>>> >>>>> How about something like this, >>>>> >>>>> Store the incoming tuples in the following format: >>>>> TreeMap<TimeStamp, List<Tuples>> >>>>> >>>>> Create a Flusher thread, which periodically flushes the *fristKey*, >>>>> after considering the lag. >>>>> >>>>> >>>>> On Thu, Jun 9, 2016 at 11:09 AM Munagala Ramanath <[email protected]> >>>>> wrote: >>>>> >>>>>> You'll need to have some some limit one how a lag is possible for >>>>>> out-of-order messages. >>>>>> If that limit is say 30s, then you'll need to buffer tuples for >>>>>> double the lag -- 60s. >>>>>> >>>>>> You can configure the Application Window size suitably to do this. >>>>>> >>>>>> Ram >>>>>> >>>>>> On Thu, Jun 9, 2016 at 10:40 AM, Raja.Aravapalli < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> >>>>>>> No aggregation, but I need messages to be played in sequential !! >>>>>>> >>>>>>> >>>>>>> Ex: >>>>>>> >>>>>>> Below is the way actually msgs should come from my kafka topic >>>>>>> >>>>>>> msg1 ts1 >>>>>>> msg2 ts2 >>>>>>> msg3 ts3 >>>>>>> msg4 ts4 >>>>>>> msg5 ts5 >>>>>>> msg6 ts6 >>>>>>> msg7 ts7 >>>>>>> >>>>>>> >>>>>>> But, due to some network issues I am seeing the messages in kafka >>>>>>> topic something like below: >>>>>>> >>>>>>> msg2 ts2 ==> msg2 which actually should come after msg1, but >>>>>>> unfortunately msg2 is coming to kafka before msg1, losing the sequence!! >>>>>>> msg1 ts1 ==> delayed by few milli secs to seconds to reach on time!! >>>>>>> msg3 ts3 >>>>>>> msg4 ts4 >>>>>>> msg5 ts5 >>>>>>> msg7 ts7 ==> msg7 had come early into topics before msg6 >>>>>>> msg6 ts6 ==> delayed !! >>>>>>> >>>>>>> >>>>>>> I am losing the order of messages and business logic gives correct >>>>>>> results only when msgs played in sequence!! >>>>>>> >>>>>>> Now if I define windowing/some buffering and then order on timestamp >>>>>>> and play msgs… >>>>>>> >>>>>>> What if window boundary takes >>>>>>> >>>>>>> msg2 ts2 >>>>>>> —————— window ends here >>>>>>> msg1 ts1 >>>>>>> msg3 ts3 >>>>>>> msg4 ts4 >>>>>>> msg5 ts5 >>>>>>> msg7 ts7 >>>>>>> msg6 ts6 >>>>>>> —————— window ends here >>>>>>> >>>>>>> Now, if you see, even though I am trying to do buffering and then >>>>>>> ordering the msgs based on some timstamp, I still face the problem of >>>>>>> msg2 >>>>>>> already processed before msg1 !! Which I don’t want. >>>>>>> >>>>>>> Did I really understand windowing correctly…. Pls correct me if I am >>>>>>> wrong!! Thanks for your thoughts!! >>>>>>> >>>>>>> >>>>>>> Regards, >>>>>>> Raja. >>>>>>> >>>>>>> From: Thomas Weise <[email protected]> >>>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>>> Date: Thursday, June 9, 2016 at 10:51 AM >>>>>>> To: "[email protected]" <[email protected]> >>>>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>>>> >>>>>>> Apex can do stateful processing, you can define a window in which >>>>>>> you can reorder the messages. It will have the same effect on latency as >>>>>>> "micro-batching". >>>>>>> >>>>>>> Why is the ordering important? What operations do you perform on the >>>>>>> data? Aggregation? >>>>>>> >>>>>>> Thanks, >>>>>>> Thomas >>>>>>> >>>>>>> >>>>>>> On Thu, Jun 9, 2016 at 8:23 AM, Raja.Aravapalli < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> >>>>>>>> My bad… we observes our source data in kafka topics is not really >>>>>>>> in a ordered fashion, where we are seeing the messages with few milli >>>>>>>> secs >>>>>>>> delay.!! >>>>>>>> >>>>>>>> Source couldn’t ensure the ordering guarantee due to the network!! >>>>>>>> >>>>>>>> Is there a right way for me from consumer standpoint, I can ensure >>>>>>>> ordering ?? Will micro batching work for me here ? Or Does apex support >>>>>>>> micro batching and order the messages ? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Regards, >>>>>>>> Raja >>>>>>>> >>>>>>>> From: Thomas Weise <[email protected]> >>>>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>>>> Date: Tuesday, June 7, 2016 at 10:59 PM >>>>>>>> >>>>>>>> To: "[email protected]" <[email protected]> >>>>>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>>>>> >>>>>>>> Raja, >>>>>>>> >>>>>>>> Please also confirm how you are using partitioning. If, for >>>>>>>> example, in your DAG you shuffle the data received from Kafka in a way >>>>>>>> that >>>>>>>> is different from the original partitioning, then it would be possible >>>>>>>> that >>>>>>>> multiple downstream partitions process data that came from a single >>>>>>>> Kafka >>>>>>>> partition concurrently and therefore in a different order. >>>>>>>> >>>>>>>> Thomas >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jun 7, 2016 at 6:33 PM, Raja.Aravapalli < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> Yes Devendra. >>>>>>>>> >>>>>>>>> p1.10 is read before p1.1 !! >>>>>>>>> >>>>>>>>> Sure I shall check that. Thanks a lot for your response. >>>>>>>>> >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Raja. >>>>>>>>> >>>>>>>>> From: Devendra Tagare <[email protected]> >>>>>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>>>>> Date: Tuesday, June 7, 2016 at 7:59 PM >>>>>>>>> >>>>>>>>> To: "[email protected]" <[email protected]> >>>>>>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>>>>>> >>>>>>>>> Hi Raja, >>>>>>>>> >>>>>>>>> Just to be clear are you suggesting that p1.10 is being read >>>>>>>>> before p1.1 ? >>>>>>>>> >>>>>>>>> If thats the case can you use a console consumer that comes packed >>>>>>>>> with kafka and verify the ordering based on timestamps ? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Dev >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Jun 7, 2016 at 5:31 PM, Raja.Aravapalli < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks a lot Devendra Tagare for the response. >>>>>>>>>> >>>>>>>>>> What you said is very clear and understandable. But, wondering, I >>>>>>>>>> am NOT getting that partition level order!! My operator is >>>>>>>>>> processing the >>>>>>>>>> records in jumbled order rather than in sequence! >>>>>>>>>> And, I am saying this because, I am generating timestamps upon >>>>>>>>>> tuple receipt and emitting that timestamp to my destination, which is >>>>>>>>>> clearly showing the records are receiving to operator in a shuffled >>>>>>>>>> order. >>>>>>>>>> >>>>>>>>>> I get records at milli second level differences!! Will that be a >>>>>>>>>> problem ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Raja. >>>>>>>>>> >>>>>>>>>> From: Devendra Tagare <[email protected]> >>>>>>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>>>>>> Date: Tuesday, June 7, 2016 at 7:12 PM >>>>>>>>>> >>>>>>>>>> To: "[email protected]" <[email protected]> >>>>>>>>>> Subject: Re: kafka input is processing records in a jumbled order >>>>>>>>>> >>>>>>>>>> Hi Raja, >>>>>>>>>> >>>>>>>>>> When you apply ONE_TO_MANY partitioning scheme, one instance of >>>>>>>>>> the operator consumes from many partitions of a kafka topic. >>>>>>>>>> >>>>>>>>>> When you look at the consumed data, all the events coming from a >>>>>>>>>> given partition would be ordered but there are no ordering guarantees >>>>>>>>>> across partitions since kafka does not guarantee that >>>>>>>>>> >>>>>>>>>> eg : If 3 partitions of a topic p1,p2,p3 having 10 messages each >>>>>>>>>> are connected to one physical partition of the KafkaInputOperator , >>>>>>>>>> then >>>>>>>>>> the ordering guarantee of p1.1 to p1.10 is honored.ie message 10 >>>>>>>>>> of p1 be consumed only after messages 1 through 9 are consumed but >>>>>>>>>> the >>>>>>>>>> operator could consumer messages in a order like >>>>>>>>>> p1.1,p2.1,p1.2,p1.3,p3.1,p2.2..... which still follows the >>>>>>>>>> guarantees per >>>>>>>>>> partition. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Dev >>>>>>>>>> >>>>>>>>>> On Tue, Jun 7, 2016 at 5:00 PM, Raja.Aravapalli < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks for the response Thomas. >>>>>>>>>>> >>>>>>>>>>> My quick doubt is.. >>>>>>>>>>> >>>>>>>>>>> I have around 30 partitions of kafka topic, And all of them have >>>>>>>>>>> messages ordered at partition level. >>>>>>>>>>> >>>>>>>>>>> So, when I consume those messages using single consumer[with >>>>>>>>>>> ONE_TO_MANY strategy set], still the ordering doesn’t work ? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> My messages in topic are guaranteed to be ordered at partition >>>>>>>>>>> level. >>>>>>>>>>> >>>>>>>>>>> Thanks a lot in advance for your response. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> Raja. >>>>>>>>>>> >>>>>>>>>>> From: Thomas Weise <[email protected]> >>>>>>>>>>> Reply-To: "[email protected]" <[email protected]> >>>>>>>>>>> Date: Tuesday, June 7, 2016 at 5:52 PM >>>>>>>>>>> To: "[email protected]" <[email protected]> >>>>>>>>>>> Subject: Re: kafka input is processing records in a jumbled >>>>>>>>>>> order >>>>>>>>>>> >>>>>>>>>>> Raja, >>>>>>>>>>> >>>>>>>>>>> Are you expecting ordering across multiple Kafka partitions? >>>>>>>>>>> >>>>>>>>>>> All messages from a given Kafka partition are received by the >>>>>>>>>>> same consumer and thus will be ordered. However, when messages come >>>>>>>>>>> from >>>>>>>>>>> multiple partitions there is no such guarantee. >>>>>>>>>>> >>>>>>>>>>> Thomas >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, Jun 7, 2016 at 3:34 PM, Raja.Aravapalli < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Hi >>>>>>>>>>>> >>>>>>>>>>>> I have built a DAG, that reads from kafka and in the next >>>>>>>>>>>> operators, does lookup to a hbase table and update hbase table >>>>>>>>>>>> based on >>>>>>>>>>>> some business logic. >>>>>>>>>>>> >>>>>>>>>>>> Some times my operator which does hbase lookup and update in >>>>>>>>>>>> the same operator(Custom written), is processing the records it >>>>>>>>>>>> receives >>>>>>>>>>>> from kafka in a jumbled order, which is causing, many records >>>>>>>>>>>> being ignored >>>>>>>>>>>> from processing!! >>>>>>>>>>>> >>>>>>>>>>>> I am not using any parallel partitions/instance, and with >>>>>>>>>>>> KafkaInputOperator I am using only partition strategy ONE_TO_MANY. >>>>>>>>>>>> >>>>>>>>>>>> I am very new to Apex. I expected, Apex will guarantee the >>>>>>>>>>>> ordering. >>>>>>>>>>>> >>>>>>>>>>>> Can someone pls share your knowledge on the issue…? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thanks a lot in advance… >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> Raja. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>> >> > > > -- > > Regards, > Ashwin. > -- Regards, Ashwin.
