So,  you want me to add all the incoming tuples into a Map, and complete the 
processing in endwindow() ??

How would this solve my problem as described below with windowing.


msg2 ts2
—————— window ends here
msg1 ts1
msg3 ts3
msg4 ts4
msg5 ts5
msg7 ts7
msg6 ts6
—————— window ends here

Thanks a lot for your inputs. Your thoughts are valuable!!



Regards,
Raja.

From: Thomas Weise <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Thursday, June 9, 2016 at 1:49 PM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: kafka input is processing records in a jumbled order


There is no need for an extra thread. In fact, tuples should be emitted in the 
operator thread only. This can be done in endWindow()

--
sent from mobile

On Jun 9, 2016 11:46 AM, "Sandesh Hegde" 
<[email protected]<mailto:[email protected]>> wrote:

How about something like this,

Store the incoming tuples in the following format:
   TreeMap<TimeStamp, List<Tuples>>

Create a Flusher thread, which periodically flushes the *fristKey*, after 
considering the lag.


On Thu, Jun 9, 2016 at 11:09 AM Munagala Ramanath 
<[email protected]<mailto:[email protected]>> wrote:
You'll need to have some some limit one how a lag is possible for out-of-order 
messages.
If that limit is say 30s, then you'll need to buffer tuples for double the lag 
-- 60s.

You can configure the Application Window size suitably to do this.

Ram

On Thu, Jun 9, 2016 at 10:40 AM, Raja.Aravapalli 
<[email protected]<mailto:[email protected]>> wrote:

No aggregation, but I need messages to be played in sequential !!


Ex:

Below is the way actually msgs should come from my kafka topic

msg1 ts1
msg2 ts2
msg3 ts3
msg4 ts4
msg5 ts5
msg6 ts6
msg7 ts7


But, due to some network issues I am seeing the messages in kafka topic 
something like below:

msg2 ts2  ==> msg2 which actually should come after msg1, but unfortunately 
msg2 is coming to kafka before msg1, losing the sequence!!
msg1 ts1 ==> delayed by few milli secs to seconds to reach on time!!
msg3 ts3
msg4 ts4
msg5 ts5
msg7 ts7 ==> msg7 had come early into topics before msg6
msg6 ts6 ==> delayed !!


I am losing the order of messages and business logic gives correct results only 
when msgs played in sequence!!

Now if I define windowing/some buffering and then order on timestamp and play 
msgs…

What if window boundary takes

msg2 ts2
—————— window ends here
msg1 ts1
msg3 ts3
msg4 ts4
msg5 ts5
msg7 ts7
msg6 ts6
—————— window ends here

Now, if you see, even though I am trying to do buffering and then ordering the 
msgs based on some timstamp, I still face the problem of msg2 already processed 
before msg1 !! Which I don’t want.

Did I really understand windowing correctly…. Pls correct me if I am wrong!! 
Thanks for your thoughts!!


Regards,
Raja.

From: Thomas Weise <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Thursday, June 9, 2016 at 10:51 AM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: kafka input is processing records in a jumbled order

Apex can do stateful processing, you can define a window in which you can 
reorder the messages. It will have the same effect on latency as 
"micro-batching".

Why is the ordering important? What operations do you perform on the data? 
Aggregation?

Thanks,
Thomas


On Thu, Jun 9, 2016 at 8:23 AM, Raja.Aravapalli 
<[email protected]<mailto:[email protected]>> wrote:

My bad… we observes our source data in kafka topics is not really in a ordered 
fashion, where we are seeing the messages with few milli secs delay.!!

Source couldn’t ensure the ordering guarantee due to the network!!

Is there a right way for me from consumer standpoint, I can ensure ordering ?? 
Will micro batching work for me here ? Or Does apex support micro batching and 
order the messages ?



Regards,
Raja

From: Thomas Weise <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Tuesday, June 7, 2016 at 10:59 PM

To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: kafka input is processing records in a jumbled order

Raja,

Please also confirm how you are using partitioning. If, for example, in your 
DAG you shuffle the data received from Kafka in a way that is different from 
the original partitioning, then it would be possible that multiple downstream 
partitions process data that came from a single Kafka partition concurrently 
and therefore in a different order.

Thomas


On Tue, Jun 7, 2016 at 6:33 PM, Raja.Aravapalli 
<[email protected]<mailto:[email protected]>> wrote:

Yes Devendra.

p1.10 is read before p1.1 !!

Sure I shall check that. Thanks a lot for your response.


Regards,
Raja.

From: Devendra Tagare 
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Tuesday, June 7, 2016 at 7:59 PM

To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: kafka input is processing records in a jumbled order

Hi Raja,

Just to be clear are you suggesting that p1.10 is being read before p1.1 ?

If thats the case can you use a console consumer that comes packed with kafka 
and verify the ordering based on timestamps ?

Thanks,
Dev



On Tue, Jun 7, 2016 at 5:31 PM, Raja.Aravapalli 
<[email protected]<mailto:[email protected]>> wrote:

Thanks a lot Devendra Tagare for the response.

What you said is very clear and understandable. But, wondering, I am NOT 
getting that partition level order!! My operator is processing the records in 
jumbled order rather than in sequence!
And, I am saying this because, I am generating timestamps upon tuple receipt 
and emitting that timestamp to my destination, which is clearly showing the 
records are receiving to operator in a shuffled order.

I get records at milli second level differences!! Will that be a problem ?


Regards,
Raja.

From: Devendra Tagare 
<[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Tuesday, June 7, 2016 at 7:12 PM

To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: kafka input is processing records in a jumbled order

Hi Raja,

When you apply ONE_TO_MANY partitioning scheme, one instance of the operator 
consumes from many partitions of a kafka topic.

When you look at the consumed data, all the events coming from a given 
partition would be ordered but there are no ordering guarantees across 
partitions since kafka does not guarantee that

eg : If 3 partitions of a topic p1,p2,p3 having 10 messages each are connected 
to one physical partition of the KafkaInputOperator , then the ordering 
guarantee of p1.1 to p1.10 is honored.ie<http://honored.ie> message 10 of p1 be 
consumed only after messages 1 through 9 are consumed but the operator could 
consumer messages in a order like p1.1,p2.1,p1.2,p1.3,p3.1,p2.2..... which 
still follows the guarantees per partition.

Thanks,
Dev

On Tue, Jun 7, 2016 at 5:00 PM, Raja.Aravapalli 
<[email protected]<mailto:[email protected]>> wrote:

Thanks for the response Thomas.

My quick doubt is..

I have around 30 partitions of kafka topic, And all of them have messages 
ordered at partition level.

So, when I consume those messages using single consumer[with ONE_TO_MANY 
strategy set], still the ordering doesn’t work ?


My messages in topic are guaranteed to be ordered at partition level.

Thanks a lot in advance for your response.


Regards,
Raja.

From: Thomas Weise <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Tuesday, June 7, 2016 at 5:52 PM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: kafka input is processing records in a jumbled order

Raja,

Are you expecting ordering across multiple Kafka partitions?

All messages from a given Kafka partition are received by the same consumer and 
thus will be ordered. However, when messages come from multiple partitions 
there is no such guarantee.

Thomas


On Tue, Jun 7, 2016 at 3:34 PM, Raja.Aravapalli 
<[email protected]<mailto:[email protected]>> wrote:

Hi

I have built a DAG, that reads from kafka and in the next operators, does 
lookup to a hbase table and update hbase table based on some business logic.

Some times my operator which does hbase lookup and update in the same 
operator(Custom written), is processing the records it receives from kafka in a 
jumbled order, which is causing, many records being ignored from processing!!

I am not using any parallel partitions/instance, and with KafkaInputOperator I 
am using only partition strategy ONE_TO_MANY.

I am very new to Apex. I expected, Apex will guarantee the ordering.

Can someone pls share your knowledge on the issue…?


Thanks a lot in advance…


Regards,
Raja.






Reply via email to