Sure Raja, you can start a design discussion thread on the dev mailing list
for initial design and we can evolve it from there as a community.

Here are the contribution guidelines.
https://apex.apache.org/contributing.html

Regards,
Ashwin.

On Sat, Jun 11, 2016 at 7:03 AM, Raja.Aravapalli <[email protected]
> wrote:

>
> Thanks Ashwin for sharing your thoughts.
>
> Sure, I will work towards contributing. But, I am very new to this. May
> take a good amount of time for me to come with right design for operator.
>
> I still couldn’t able to find the right lag time for my application,
> because still some times few messages are arriving delay than expected (may
> be network is too slow)!!
>
> However, with introduction of lag, I was able to decrease a amount of
> records that were missing significantly than earlier.
>
> Thanks team for sharing your thoughts :)
>
>
>
> -Regards,
> Raja.
>
>
> From: Ashwin Chandra Putta <[email protected]>
> Reply-To: "[email protected]" <[email protected]>
> Date: Friday, June 10, 2016 at 5:24 PM
>
> To: "[email protected]" <[email protected]>
> Subject: Re: kafka input is processing records in a jumbled order
>
> Raja,
>
> Great discussion, just a couple more thoughts.
>
> 1. Add this operator to your DAG just before the operator which expects
> the tuples to be in order. If you order the tuples right after kafka input
> and then you have multiple partitions of a downstream operator, then the
> order will be lost again. So better to order only at the point in the DAG
> just before where it is required.
> 2. Seems like you are building this operator for your use case, it will be
> useful for a lot of other use cases too. So if possible, can you contribute
> the operator back to malhar? That way we can make it available for other to
> use too and collectively enhance it as needed.
>
> Regards,
> Ashwin.
>
> On Thu, Jun 9, 2016 at 2:43 PM, Raja.Aravapalli <
> [email protected]> wrote:
>
>>
>> Great.
>>
>> I will explore. Thanks for your inputs.
>>
>>
>> Regards,
>> Raja.
>>
>> From: Munagala Ramanath <[email protected]>
>> Reply-To: "[email protected]" <[email protected]>
>> Date: Thursday, June 9, 2016 at 3:38 PM
>>
>> To: "[email protected]" <[email protected]>
>> Subject: Re: kafka input is processing records in a jumbled order
>>
>> Any fields within an operator that are not declared "transient" are
>> considered part of the operator state and
>> are checkpointed; on failure, they are restored from the checkpoint
>> automatically by the platform.
>>
>> Ram
>>
>> On Thu, Jun 9, 2016 at 1:04 PM, Raja.Aravapalli <
>> [email protected]> wrote:
>>
>>>
>>> Really a great thought!!
>>>
>>> Wondering how will application handle the failures ?
>>>
>>> Also, what does this phrase mean “hold them in the operator state” ?
>>> Hold incoming messages in some data structure, map etc ?
>>>
>>> -Regards,
>>> Raja.
>>>
>>> From: Thomas Weise <[email protected]>
>>> Reply-To: "[email protected]" <[email protected]>
>>> Date: Thursday, June 9, 2016 at 2:57 PM
>>> To: "[email protected]" <[email protected]>
>>> Subject: Re: kafka input is processing records in a jumbled order
>>>
>>> You can order the messages by event time as they arrive, hold them in
>>> the operator state and then only emit those in the endWindow callback that
>>> are older than <threshold> (note that you are not simply emitting all, only
>>> those that are old enough).
>>>
>>> Thomas
>>>
>>> On Thu, Jun 9, 2016 at 12:22 PM, Raja.Aravapalli <
>>> [email protected]> wrote:
>>>
>>>>
>>>> So,  you want me to add all the incoming tuples into a Map, and
>>>> complete the processing in endwindow() ??
>>>>
>>>> How would this solve my problem as described below with windowing.
>>>>
>>>>
>>>> msg2 ts2
>>>> —————— window ends here
>>>> msg1 ts1
>>>> msg3 ts3
>>>> msg4 ts4
>>>> msg5 ts5
>>>> msg7 ts7
>>>> msg6 ts6
>>>> —————— window ends here
>>>>
>>>> Thanks a lot for your inputs. Your thoughts are valuable!!
>>>>
>>>>
>>>>
>>>> Regards,
>>>> Raja.
>>>>
>>>> From: Thomas Weise <[email protected]>
>>>> Reply-To: "[email protected]" <[email protected]>
>>>> Date: Thursday, June 9, 2016 at 1:49 PM
>>>>
>>>> To: "[email protected]" <[email protected]>
>>>> Subject: Re: kafka input is processing records in a jumbled order
>>>>
>>>> There is no need for an extra thread. In fact, tuples should be emitted
>>>> in the operator thread only. This can be done in endWindow()
>>>>
>>>> --
>>>> sent from mobile
>>>> On Jun 9, 2016 11:46 AM, "Sandesh Hegde" <[email protected]>
>>>> wrote:
>>>>
>>>>>
>>>>> How about something like this,
>>>>>
>>>>> Store the incoming tuples in the following format:
>>>>>    TreeMap<TimeStamp, List<Tuples>>
>>>>>
>>>>> Create a Flusher thread, which periodically flushes the *fristKey*,
>>>>> after considering the lag.
>>>>>
>>>>>
>>>>> On Thu, Jun 9, 2016 at 11:09 AM Munagala Ramanath <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> You'll need to have some some limit one how a lag is possible for
>>>>>> out-of-order messages.
>>>>>> If that limit is say 30s, then you'll need to buffer tuples for
>>>>>> double the lag -- 60s.
>>>>>>
>>>>>> You can configure the Application Window size suitably to do this.
>>>>>>
>>>>>> Ram
>>>>>>
>>>>>> On Thu, Jun 9, 2016 at 10:40 AM, Raja.Aravapalli <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>>
>>>>>>> No aggregation, but I need messages to be played in sequential !!
>>>>>>>
>>>>>>>
>>>>>>> Ex:
>>>>>>>
>>>>>>> Below is the way actually msgs should come from my kafka topic
>>>>>>>
>>>>>>> msg1 ts1
>>>>>>> msg2 ts2
>>>>>>> msg3 ts3
>>>>>>> msg4 ts4
>>>>>>> msg5 ts5
>>>>>>> msg6 ts6
>>>>>>> msg7 ts7
>>>>>>>
>>>>>>>
>>>>>>> But, due to some network issues I am seeing the messages in kafka
>>>>>>> topic something like below:
>>>>>>>
>>>>>>> msg2 ts2  ==> msg2 which actually should come after msg1, but
>>>>>>> unfortunately msg2 is coming to kafka before msg1, losing the sequence!!
>>>>>>> msg1 ts1 ==> delayed by few milli secs to seconds to reach on time!!
>>>>>>> msg3 ts3
>>>>>>> msg4 ts4
>>>>>>> msg5 ts5
>>>>>>> msg7 ts7 ==> msg7 had come early into topics before msg6
>>>>>>> msg6 ts6 ==> delayed !!
>>>>>>>
>>>>>>>
>>>>>>> I am losing the order of messages and business logic gives correct
>>>>>>> results only when msgs played in sequence!!
>>>>>>>
>>>>>>> Now if I define windowing/some buffering and then order on timestamp
>>>>>>> and play msgs…
>>>>>>>
>>>>>>> What if window boundary takes
>>>>>>>
>>>>>>> msg2 ts2
>>>>>>> —————— window ends here
>>>>>>> msg1 ts1
>>>>>>> msg3 ts3
>>>>>>> msg4 ts4
>>>>>>> msg5 ts5
>>>>>>> msg7 ts7
>>>>>>> msg6 ts6
>>>>>>> —————— window ends here
>>>>>>>
>>>>>>> Now, if you see, even though I am trying to do buffering and then
>>>>>>> ordering the msgs based on some timstamp, I still face the problem of 
>>>>>>> msg2
>>>>>>> already processed before msg1 !! Which I don’t want.
>>>>>>>
>>>>>>> Did I really understand windowing correctly…. Pls correct me if I am
>>>>>>> wrong!! Thanks for your thoughts!!
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> Raja.
>>>>>>>
>>>>>>> From: Thomas Weise <[email protected]>
>>>>>>> Reply-To: "[email protected]" <[email protected]>
>>>>>>> Date: Thursday, June 9, 2016 at 10:51 AM
>>>>>>> To: "[email protected]" <[email protected]>
>>>>>>> Subject: Re: kafka input is processing records in a jumbled order
>>>>>>>
>>>>>>> Apex can do stateful processing, you can define a window in which
>>>>>>> you can reorder the messages. It will have the same effect on latency as
>>>>>>> "micro-batching".
>>>>>>>
>>>>>>> Why is the ordering important? What operations do you perform on the
>>>>>>> data? Aggregation?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Thomas
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 9, 2016 at 8:23 AM, Raja.Aravapalli <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> My bad… we observes our source data in kafka topics is not really
>>>>>>>> in a ordered fashion, where we are seeing the messages with few milli 
>>>>>>>> secs
>>>>>>>> delay.!!
>>>>>>>>
>>>>>>>> Source couldn’t ensure the ordering guarantee due to the network!!
>>>>>>>>
>>>>>>>> Is there a right way for me from consumer standpoint, I can ensure
>>>>>>>> ordering ?? Will micro batching work for me here ? Or Does apex support
>>>>>>>> micro batching and order the messages ?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Raja
>>>>>>>>
>>>>>>>> From: Thomas Weise <[email protected]>
>>>>>>>> Reply-To: "[email protected]" <[email protected]>
>>>>>>>> Date: Tuesday, June 7, 2016 at 10:59 PM
>>>>>>>>
>>>>>>>> To: "[email protected]" <[email protected]>
>>>>>>>> Subject: Re: kafka input is processing records in a jumbled order
>>>>>>>>
>>>>>>>> Raja,
>>>>>>>>
>>>>>>>> Please also confirm how you are using partitioning. If, for
>>>>>>>> example, in your DAG you shuffle the data received from Kafka in a way 
>>>>>>>> that
>>>>>>>> is different from the original partitioning, then it would be possible 
>>>>>>>> that
>>>>>>>> multiple downstream partitions process data that came from a single 
>>>>>>>> Kafka
>>>>>>>> partition concurrently and therefore in a different order.
>>>>>>>>
>>>>>>>> Thomas
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jun 7, 2016 at 6:33 PM, Raja.Aravapalli <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Yes Devendra.
>>>>>>>>>
>>>>>>>>> p1.10 is read before p1.1 !!
>>>>>>>>>
>>>>>>>>> Sure I shall check that. Thanks a lot for your response.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Raja.
>>>>>>>>>
>>>>>>>>> From: Devendra Tagare <[email protected]>
>>>>>>>>> Reply-To: "[email protected]" <[email protected]>
>>>>>>>>> Date: Tuesday, June 7, 2016 at 7:59 PM
>>>>>>>>>
>>>>>>>>> To: "[email protected]" <[email protected]>
>>>>>>>>> Subject: Re: kafka input is processing records in a jumbled order
>>>>>>>>>
>>>>>>>>> Hi Raja,
>>>>>>>>>
>>>>>>>>> Just to be clear are you suggesting that p1.10 is being read
>>>>>>>>> before p1.1 ?
>>>>>>>>>
>>>>>>>>> If thats the case can you use a console consumer that comes packed
>>>>>>>>> with kafka and verify the ordering based on timestamps ?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Dev
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Jun 7, 2016 at 5:31 PM, Raja.Aravapalli <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks a lot Devendra Tagare for the response.
>>>>>>>>>>
>>>>>>>>>> What you said is very clear and understandable. But, wondering, I
>>>>>>>>>> am NOT getting that partition level order!! My operator is 
>>>>>>>>>> processing the
>>>>>>>>>> records in jumbled order rather than in sequence!
>>>>>>>>>> And, I am saying this because, I am generating timestamps upon
>>>>>>>>>> tuple receipt and emitting that timestamp to my destination, which is
>>>>>>>>>> clearly showing the records are receiving to operator in a shuffled 
>>>>>>>>>> order.
>>>>>>>>>>
>>>>>>>>>> I get records at milli second level differences!! Will that be a
>>>>>>>>>> problem ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Raja.
>>>>>>>>>>
>>>>>>>>>> From: Devendra Tagare <[email protected]>
>>>>>>>>>> Reply-To: "[email protected]" <[email protected]>
>>>>>>>>>> Date: Tuesday, June 7, 2016 at 7:12 PM
>>>>>>>>>>
>>>>>>>>>> To: "[email protected]" <[email protected]>
>>>>>>>>>> Subject: Re: kafka input is processing records in a jumbled order
>>>>>>>>>>
>>>>>>>>>> Hi Raja,
>>>>>>>>>>
>>>>>>>>>> When you apply ONE_TO_MANY partitioning scheme, one instance of
>>>>>>>>>> the operator consumes from many partitions of a kafka topic.
>>>>>>>>>>
>>>>>>>>>> When you look at the consumed data, all the events coming from a
>>>>>>>>>> given partition would be ordered but there are no ordering guarantees
>>>>>>>>>> across partitions since kafka does not guarantee that
>>>>>>>>>>
>>>>>>>>>> eg : If 3 partitions of a topic p1,p2,p3 having 10 messages each
>>>>>>>>>> are connected to one physical partition of the KafkaInputOperator , 
>>>>>>>>>> then
>>>>>>>>>> the ordering guarantee of p1.1 to p1.10 is honored.ie message 10
>>>>>>>>>> of p1 be consumed only after messages 1 through 9 are consumed but 
>>>>>>>>>> the
>>>>>>>>>> operator could consumer messages in a order like
>>>>>>>>>> p1.1,p2.1,p1.2,p1.3,p3.1,p2.2..... which still follows the 
>>>>>>>>>> guarantees per
>>>>>>>>>> partition.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Dev
>>>>>>>>>>
>>>>>>>>>> On Tue, Jun 7, 2016 at 5:00 PM, Raja.Aravapalli <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the response Thomas.
>>>>>>>>>>>
>>>>>>>>>>> My quick doubt is..
>>>>>>>>>>>
>>>>>>>>>>> I have around 30 partitions of kafka topic, And all of them have
>>>>>>>>>>> messages ordered at partition level.
>>>>>>>>>>>
>>>>>>>>>>> So, when I consume those messages using single consumer[with
>>>>>>>>>>> ONE_TO_MANY strategy set], still the ordering doesn’t work ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> My messages in topic are guaranteed to be ordered at partition
>>>>>>>>>>> level.
>>>>>>>>>>>
>>>>>>>>>>> Thanks a lot in advance for your response.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Raja.
>>>>>>>>>>>
>>>>>>>>>>> From: Thomas Weise <[email protected]>
>>>>>>>>>>> Reply-To: "[email protected]" <[email protected]>
>>>>>>>>>>> Date: Tuesday, June 7, 2016 at 5:52 PM
>>>>>>>>>>> To: "[email protected]" <[email protected]>
>>>>>>>>>>> Subject: Re: kafka input is processing records in a jumbled
>>>>>>>>>>> order
>>>>>>>>>>>
>>>>>>>>>>> Raja,
>>>>>>>>>>>
>>>>>>>>>>> Are you expecting ordering across multiple Kafka partitions?
>>>>>>>>>>>
>>>>>>>>>>> All messages from a given Kafka partition are received by the
>>>>>>>>>>> same consumer and thus will be ordered. However, when messages come 
>>>>>>>>>>> from
>>>>>>>>>>> multiple partitions there is no such guarantee.
>>>>>>>>>>>
>>>>>>>>>>> Thomas
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jun 7, 2016 at 3:34 PM, Raja.Aravapalli <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Hi
>>>>>>>>>>>>
>>>>>>>>>>>> I have built a DAG, that reads from kafka and in the next
>>>>>>>>>>>> operators, does lookup to a hbase table and update hbase table 
>>>>>>>>>>>> based on
>>>>>>>>>>>> some business logic.
>>>>>>>>>>>>
>>>>>>>>>>>> Some times my operator which does hbase lookup and update in
>>>>>>>>>>>> the same operator(Custom written), is processing the records it 
>>>>>>>>>>>> receives
>>>>>>>>>>>> from kafka in a jumbled order, which is causing, many records 
>>>>>>>>>>>> being ignored
>>>>>>>>>>>> from processing!!
>>>>>>>>>>>>
>>>>>>>>>>>> I am not using any parallel partitions/instance, and with
>>>>>>>>>>>> KafkaInputOperator I am using only partition strategy ONE_TO_MANY.
>>>>>>>>>>>>
>>>>>>>>>>>> I am very new to Apex. I expected, Apex will guarantee the
>>>>>>>>>>>> ordering.
>>>>>>>>>>>>
>>>>>>>>>>>> Can someone pls share your knowledge on the issue…?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks a lot in advance…
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Raja.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>
>>
>
>
> --
>
> Regards,
> Ashwin.
>



-- 

Regards,
Ashwin.

Reply via email to