You can order the messages by event time as they arrive, hold them in the
operator state and then only emit those in the endWindow callback that are
older than <threshold> (note that you are not simply emitting all, only
those that are old enough).

Thomas

On Thu, Jun 9, 2016 at 12:22 PM, Raja.Aravapalli <[email protected]
> wrote:

>
> So,  you want me to add all the incoming tuples into a Map, and complete
> the processing in endwindow() ??
>
> How would this solve my problem as described below with windowing.
>
>
> msg2 ts2
> —————— window ends here
> msg1 ts1
> msg3 ts3
> msg4 ts4
> msg5 ts5
> msg7 ts7
> msg6 ts6
> —————— window ends here
>
> Thanks a lot for your inputs. Your thoughts are valuable!!
>
>
>
> Regards,
> Raja.
>
> From: Thomas Weise <[email protected]>
> Reply-To: "[email protected]" <[email protected]>
> Date: Thursday, June 9, 2016 at 1:49 PM
>
> To: "[email protected]" <[email protected]>
> Subject: Re: kafka input is processing records in a jumbled order
>
> There is no need for an extra thread. In fact, tuples should be emitted in
> the operator thread only. This can be done in endWindow()
>
> --
> sent from mobile
> On Jun 9, 2016 11:46 AM, "Sandesh Hegde" <[email protected]> wrote:
>
>>
>> How about something like this,
>>
>> Store the incoming tuples in the following format:
>>    TreeMap<TimeStamp, List<Tuples>>
>>
>> Create a Flusher thread, which periodically flushes the *fristKey*, after
>> considering the lag.
>>
>>
>> On Thu, Jun 9, 2016 at 11:09 AM Munagala Ramanath <[email protected]>
>> wrote:
>>
>>> You'll need to have some some limit one how a lag is possible for
>>> out-of-order messages.
>>> If that limit is say 30s, then you'll need to buffer tuples for double
>>> the lag -- 60s.
>>>
>>> You can configure the Application Window size suitably to do this.
>>>
>>> Ram
>>>
>>> On Thu, Jun 9, 2016 at 10:40 AM, Raja.Aravapalli <
>>> [email protected]> wrote:
>>>
>>>>
>>>> No aggregation, but I need messages to be played in sequential !!
>>>>
>>>>
>>>> Ex:
>>>>
>>>> Below is the way actually msgs should come from my kafka topic
>>>>
>>>> msg1 ts1
>>>> msg2 ts2
>>>> msg3 ts3
>>>> msg4 ts4
>>>> msg5 ts5
>>>> msg6 ts6
>>>> msg7 ts7
>>>>
>>>>
>>>> But, due to some network issues I am seeing the messages in kafka topic
>>>> something like below:
>>>>
>>>> msg2 ts2  ==> msg2 which actually should come after msg1, but
>>>> unfortunately msg2 is coming to kafka before msg1, losing the sequence!!
>>>> msg1 ts1 ==> delayed by few milli secs to seconds to reach on time!!
>>>> msg3 ts3
>>>> msg4 ts4
>>>> msg5 ts5
>>>> msg7 ts7 ==> msg7 had come early into topics before msg6
>>>> msg6 ts6 ==> delayed !!
>>>>
>>>>
>>>> I am losing the order of messages and business logic gives correct
>>>> results only when msgs played in sequence!!
>>>>
>>>> Now if I define windowing/some buffering and then order on timestamp
>>>> and play msgs…
>>>>
>>>> What if window boundary takes
>>>>
>>>> msg2 ts2
>>>> —————— window ends here
>>>> msg1 ts1
>>>> msg3 ts3
>>>> msg4 ts4
>>>> msg5 ts5
>>>> msg7 ts7
>>>> msg6 ts6
>>>> —————— window ends here
>>>>
>>>> Now, if you see, even though I am trying to do buffering and then
>>>> ordering the msgs based on some timstamp, I still face the problem of msg2
>>>> already processed before msg1 !! Which I don’t want.
>>>>
>>>> Did I really understand windowing correctly…. Pls correct me if I am
>>>> wrong!! Thanks for your thoughts!!
>>>>
>>>>
>>>> Regards,
>>>> Raja.
>>>>
>>>> From: Thomas Weise <[email protected]>
>>>> Reply-To: "[email protected]" <[email protected]>
>>>> Date: Thursday, June 9, 2016 at 10:51 AM
>>>> To: "[email protected]" <[email protected]>
>>>> Subject: Re: kafka input is processing records in a jumbled order
>>>>
>>>> Apex can do stateful processing, you can define a window in which you
>>>> can reorder the messages. It will have the same effect on latency as
>>>> "micro-batching".
>>>>
>>>> Why is the ordering important? What operations do you perform on the
>>>> data? Aggregation?
>>>>
>>>> Thanks,
>>>> Thomas
>>>>
>>>>
>>>> On Thu, Jun 9, 2016 at 8:23 AM, Raja.Aravapalli <
>>>> [email protected]> wrote:
>>>>
>>>>>
>>>>> My bad… we observes our source data in kafka topics is not really in a
>>>>> ordered fashion, where we are seeing the messages with few milli secs
>>>>> delay.!!
>>>>>
>>>>> Source couldn’t ensure the ordering guarantee due to the network!!
>>>>>
>>>>> Is there a right way for me from consumer standpoint, I can ensure
>>>>> ordering ?? Will micro batching work for me here ? Or Does apex support
>>>>> micro batching and order the messages ?
>>>>>
>>>>>
>>>>>
>>>>> Regards,
>>>>> Raja
>>>>>
>>>>> From: Thomas Weise <[email protected]>
>>>>> Reply-To: "[email protected]" <[email protected]>
>>>>> Date: Tuesday, June 7, 2016 at 10:59 PM
>>>>>
>>>>> To: "[email protected]" <[email protected]>
>>>>> Subject: Re: kafka input is processing records in a jumbled order
>>>>>
>>>>> Raja,
>>>>>
>>>>> Please also confirm how you are using partitioning. If, for example,
>>>>> in your DAG you shuffle the data received from Kafka in a way that is
>>>>> different from the original partitioning, then it would be possible that
>>>>> multiple downstream partitions process data that came from a single Kafka
>>>>> partition concurrently and therefore in a different order.
>>>>>
>>>>> Thomas
>>>>>
>>>>>
>>>>> On Tue, Jun 7, 2016 at 6:33 PM, Raja.Aravapalli <
>>>>> [email protected]> wrote:
>>>>>
>>>>>>
>>>>>> Yes Devendra.
>>>>>>
>>>>>> p1.10 is read before p1.1 !!
>>>>>>
>>>>>> Sure I shall check that. Thanks a lot for your response.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Raja.
>>>>>>
>>>>>> From: Devendra Tagare <[email protected]>
>>>>>> Reply-To: "[email protected]" <[email protected]>
>>>>>> Date: Tuesday, June 7, 2016 at 7:59 PM
>>>>>>
>>>>>> To: "[email protected]" <[email protected]>
>>>>>> Subject: Re: kafka input is processing records in a jumbled order
>>>>>>
>>>>>> Hi Raja,
>>>>>>
>>>>>> Just to be clear are you suggesting that p1.10 is being read before
>>>>>> p1.1 ?
>>>>>>
>>>>>> If thats the case can you use a console consumer that comes packed
>>>>>> with kafka and verify the ordering based on timestamps ?
>>>>>>
>>>>>> Thanks,
>>>>>> Dev
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 7, 2016 at 5:31 PM, Raja.Aravapalli <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>>
>>>>>>> Thanks a lot Devendra Tagare for the response.
>>>>>>>
>>>>>>> What you said is very clear and understandable. But, wondering, I am
>>>>>>> NOT getting that partition level order!! My operator is processing the
>>>>>>> records in jumbled order rather than in sequence!
>>>>>>> And, I am saying this because, I am generating timestamps upon tuple
>>>>>>> receipt and emitting that timestamp to my destination, which is clearly
>>>>>>> showing the records are receiving to operator in a shuffled order.
>>>>>>>
>>>>>>> I get records at milli second level differences!! Will that be a
>>>>>>> problem ?
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> Raja.
>>>>>>>
>>>>>>> From: Devendra Tagare <[email protected]>
>>>>>>> Reply-To: "[email protected]" <[email protected]>
>>>>>>> Date: Tuesday, June 7, 2016 at 7:12 PM
>>>>>>>
>>>>>>> To: "[email protected]" <[email protected]>
>>>>>>> Subject: Re: kafka input is processing records in a jumbled order
>>>>>>>
>>>>>>> Hi Raja,
>>>>>>>
>>>>>>> When you apply ONE_TO_MANY partitioning scheme, one instance of the
>>>>>>> operator consumes from many partitions of a kafka topic.
>>>>>>>
>>>>>>> When you look at the consumed data, all the events coming from a
>>>>>>> given partition would be ordered but there are no ordering guarantees
>>>>>>> across partitions since kafka does not guarantee that
>>>>>>>
>>>>>>> eg : If 3 partitions of a topic p1,p2,p3 having 10 messages each are
>>>>>>> connected to one physical partition of the KafkaInputOperator , then the
>>>>>>> ordering guarantee of p1.1 to p1.10 is honored.ie message 10 of p1
>>>>>>> be consumed only after messages 1 through 9 are consumed but the 
>>>>>>> operator
>>>>>>> could consumer messages in a order like 
>>>>>>> p1.1,p2.1,p1.2,p1.3,p3.1,p2.2.....
>>>>>>> which still follows the guarantees per partition.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Dev
>>>>>>>
>>>>>>> On Tue, Jun 7, 2016 at 5:00 PM, Raja.Aravapalli <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> Thanks for the response Thomas.
>>>>>>>>
>>>>>>>> My quick doubt is..
>>>>>>>>
>>>>>>>> I have around 30 partitions of kafka topic, And all of them have
>>>>>>>> messages ordered at partition level.
>>>>>>>>
>>>>>>>> So, when I consume those messages using single consumer[with
>>>>>>>> ONE_TO_MANY strategy set], still the ordering doesn’t work ?
>>>>>>>>
>>>>>>>>
>>>>>>>> My messages in topic are guaranteed to be ordered at partition
>>>>>>>> level.
>>>>>>>>
>>>>>>>> Thanks a lot in advance for your response.
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Raja.
>>>>>>>>
>>>>>>>> From: Thomas Weise <[email protected]>
>>>>>>>> Reply-To: "[email protected]" <[email protected]>
>>>>>>>> Date: Tuesday, June 7, 2016 at 5:52 PM
>>>>>>>> To: "[email protected]" <[email protected]>
>>>>>>>> Subject: Re: kafka input is processing records in a jumbled order
>>>>>>>>
>>>>>>>> Raja,
>>>>>>>>
>>>>>>>> Are you expecting ordering across multiple Kafka partitions?
>>>>>>>>
>>>>>>>> All messages from a given Kafka partition are received by the same
>>>>>>>> consumer and thus will be ordered. However, when messages come from
>>>>>>>> multiple partitions there is no such guarantee.
>>>>>>>>
>>>>>>>> Thomas
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jun 7, 2016 at 3:34 PM, Raja.Aravapalli <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Hi
>>>>>>>>>
>>>>>>>>> I have built a DAG, that reads from kafka and in the next
>>>>>>>>> operators, does lookup to a hbase table and update hbase table based 
>>>>>>>>> on
>>>>>>>>> some business logic.
>>>>>>>>>
>>>>>>>>> Some times my operator which does hbase lookup and update in the
>>>>>>>>> same operator(Custom written), is processing the records it receives 
>>>>>>>>> from
>>>>>>>>> kafka in a jumbled order, which is causing, many records being 
>>>>>>>>> ignored from
>>>>>>>>> processing!!
>>>>>>>>>
>>>>>>>>> I am not using any parallel partitions/instance, and with
>>>>>>>>> KafkaInputOperator I am using only partition strategy ONE_TO_MANY.
>>>>>>>>>
>>>>>>>>> I am very new to Apex. I expected, Apex will guarantee the
>>>>>>>>> ordering.
>>>>>>>>>
>>>>>>>>> Can someone pls share your knowledge on the issue…?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks a lot in advance…
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Raja.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>

Reply via email to