There is no need for an extra thread. In fact, tuples should be emitted in
the operator thread only. This can be done in endWindow()

--
sent from mobile
On Jun 9, 2016 11:46 AM, "Sandesh Hegde" <[email protected]> wrote:

>
> How about something like this,
>
> Store the incoming tuples in the following format:
>    TreeMap<TimeStamp, List<Tuples>>
>
> Create a Flusher thread, which periodically flushes the *fristKey*, after
> considering the lag.
>
>
> On Thu, Jun 9, 2016 at 11:09 AM Munagala Ramanath <[email protected]>
> wrote:
>
>> You'll need to have some some limit one how a lag is possible for
>> out-of-order messages.
>> If that limit is say 30s, then you'll need to buffer tuples for double
>> the lag -- 60s.
>>
>> You can configure the Application Window size suitably to do this.
>>
>> Ram
>>
>> On Thu, Jun 9, 2016 at 10:40 AM, Raja.Aravapalli <
>> [email protected]> wrote:
>>
>>>
>>> No aggregation, but I need messages to be played in sequential !!
>>>
>>>
>>> Ex:
>>>
>>> Below is the way actually msgs should come from my kafka topic
>>>
>>> msg1 ts1
>>> msg2 ts2
>>> msg3 ts3
>>> msg4 ts4
>>> msg5 ts5
>>> msg6 ts6
>>> msg7 ts7
>>>
>>>
>>> But, due to some network issues I am seeing the messages in kafka topic
>>> something like below:
>>>
>>> msg2 ts2  ==> msg2 which actually should come after msg1, but
>>> unfortunately msg2 is coming to kafka before msg1, losing the sequence!!
>>> msg1 ts1 ==> delayed by few milli secs to seconds to reach on time!!
>>> msg3 ts3
>>> msg4 ts4
>>> msg5 ts5
>>> msg7 ts7 ==> msg7 had come early into topics before msg6
>>> msg6 ts6 ==> delayed !!
>>>
>>>
>>> I am losing the order of messages and business logic gives correct
>>> results only when msgs played in sequence!!
>>>
>>> Now if I define windowing/some buffering and then order on timestamp and
>>> play msgs…
>>>
>>> What if window boundary takes
>>>
>>> msg2 ts2
>>> —————— window ends here
>>> msg1 ts1
>>> msg3 ts3
>>> msg4 ts4
>>> msg5 ts5
>>> msg7 ts7
>>> msg6 ts6
>>> —————— window ends here
>>>
>>> Now, if you see, even though I am trying to do buffering and then
>>> ordering the msgs based on some timstamp, I still face the problem of msg2
>>> already processed before msg1 !! Which I don’t want.
>>>
>>> Did I really understand windowing correctly…. Pls correct me if I am
>>> wrong!! Thanks for your thoughts!!
>>>
>>>
>>> Regards,
>>> Raja.
>>>
>>> From: Thomas Weise <[email protected]>
>>> Reply-To: "[email protected]" <[email protected]>
>>> Date: Thursday, June 9, 2016 at 10:51 AM
>>> To: "[email protected]" <[email protected]>
>>> Subject: Re: kafka input is processing records in a jumbled order
>>>
>>> Apex can do stateful processing, you can define a window in which you
>>> can reorder the messages. It will have the same effect on latency as
>>> "micro-batching".
>>>
>>> Why is the ordering important? What operations do you perform on the
>>> data? Aggregation?
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>> On Thu, Jun 9, 2016 at 8:23 AM, Raja.Aravapalli <
>>> [email protected]> wrote:
>>>
>>>>
>>>> My bad… we observes our source data in kafka topics is not really in a
>>>> ordered fashion, where we are seeing the messages with few milli secs
>>>> delay.!!
>>>>
>>>> Source couldn’t ensure the ordering guarantee due to the network!!
>>>>
>>>> Is there a right way for me from consumer standpoint, I can ensure
>>>> ordering ?? Will micro batching work for me here ? Or Does apex support
>>>> micro batching and order the messages ?
>>>>
>>>>
>>>>
>>>> Regards,
>>>> Raja
>>>>
>>>> From: Thomas Weise <[email protected]>
>>>> Reply-To: "[email protected]" <[email protected]>
>>>> Date: Tuesday, June 7, 2016 at 10:59 PM
>>>>
>>>> To: "[email protected]" <[email protected]>
>>>> Subject: Re: kafka input is processing records in a jumbled order
>>>>
>>>> Raja,
>>>>
>>>> Please also confirm how you are using partitioning. If, for example, in
>>>> your DAG you shuffle the data received from Kafka in a way that is
>>>> different from the original partitioning, then it would be possible that
>>>> multiple downstream partitions process data that came from a single Kafka
>>>> partition concurrently and therefore in a different order.
>>>>
>>>> Thomas
>>>>
>>>>
>>>> On Tue, Jun 7, 2016 at 6:33 PM, Raja.Aravapalli <
>>>> [email protected]> wrote:
>>>>
>>>>>
>>>>> Yes Devendra.
>>>>>
>>>>> p1.10 is read before p1.1 !!
>>>>>
>>>>> Sure I shall check that. Thanks a lot for your response.
>>>>>
>>>>>
>>>>> Regards,
>>>>> Raja.
>>>>>
>>>>> From: Devendra Tagare <[email protected]>
>>>>> Reply-To: "[email protected]" <[email protected]>
>>>>> Date: Tuesday, June 7, 2016 at 7:59 PM
>>>>>
>>>>> To: "[email protected]" <[email protected]>
>>>>> Subject: Re: kafka input is processing records in a jumbled order
>>>>>
>>>>> Hi Raja,
>>>>>
>>>>> Just to be clear are you suggesting that p1.10 is being read before
>>>>> p1.1 ?
>>>>>
>>>>> If thats the case can you use a console consumer that comes packed
>>>>> with kafka and verify the ordering based on timestamps ?
>>>>>
>>>>> Thanks,
>>>>> Dev
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Jun 7, 2016 at 5:31 PM, Raja.Aravapalli <
>>>>> [email protected]> wrote:
>>>>>
>>>>>>
>>>>>> Thanks a lot Devendra Tagare for the response.
>>>>>>
>>>>>> What you said is very clear and understandable. But, wondering, I am
>>>>>> NOT getting that partition level order!! My operator is processing the
>>>>>> records in jumbled order rather than in sequence!
>>>>>> And, I am saying this because, I am generating timestamps upon tuple
>>>>>> receipt and emitting that timestamp to my destination, which is clearly
>>>>>> showing the records are receiving to operator in a shuffled order.
>>>>>>
>>>>>> I get records at milli second level differences!! Will that be a
>>>>>> problem ?
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Raja.
>>>>>>
>>>>>> From: Devendra Tagare <[email protected]>
>>>>>> Reply-To: "[email protected]" <[email protected]>
>>>>>> Date: Tuesday, June 7, 2016 at 7:12 PM
>>>>>>
>>>>>> To: "[email protected]" <[email protected]>
>>>>>> Subject: Re: kafka input is processing records in a jumbled order
>>>>>>
>>>>>> Hi Raja,
>>>>>>
>>>>>> When you apply ONE_TO_MANY partitioning scheme, one instance of the
>>>>>> operator consumes from many partitions of a kafka topic.
>>>>>>
>>>>>> When you look at the consumed data, all the events coming from a
>>>>>> given partition would be ordered but there are no ordering guarantees
>>>>>> across partitions since kafka does not guarantee that
>>>>>>
>>>>>> eg : If 3 partitions of a topic p1,p2,p3 having 10 messages each are
>>>>>> connected to one physical partition of the KafkaInputOperator , then the
>>>>>> ordering guarantee of p1.1 to p1.10 is honored.ie message 10 of p1
>>>>>> be consumed only after messages 1 through 9 are consumed but the operator
>>>>>> could consumer messages in a order like 
>>>>>> p1.1,p2.1,p1.2,p1.3,p3.1,p2.2.....
>>>>>> which still follows the guarantees per partition.
>>>>>>
>>>>>> Thanks,
>>>>>> Dev
>>>>>>
>>>>>> On Tue, Jun 7, 2016 at 5:00 PM, Raja.Aravapalli <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>>
>>>>>>> Thanks for the response Thomas.
>>>>>>>
>>>>>>> My quick doubt is..
>>>>>>>
>>>>>>> I have around 30 partitions of kafka topic, And all of them have
>>>>>>> messages ordered at partition level.
>>>>>>>
>>>>>>> So, when I consume those messages using single consumer[with
>>>>>>> ONE_TO_MANY strategy set], still the ordering doesn’t work ?
>>>>>>>
>>>>>>>
>>>>>>> My messages in topic are guaranteed to be ordered at partition
>>>>>>> level.
>>>>>>>
>>>>>>> Thanks a lot in advance for your response.
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> Raja.
>>>>>>>
>>>>>>> From: Thomas Weise <[email protected]>
>>>>>>> Reply-To: "[email protected]" <[email protected]>
>>>>>>> Date: Tuesday, June 7, 2016 at 5:52 PM
>>>>>>> To: "[email protected]" <[email protected]>
>>>>>>> Subject: Re: kafka input is processing records in a jumbled order
>>>>>>>
>>>>>>> Raja,
>>>>>>>
>>>>>>> Are you expecting ordering across multiple Kafka partitions?
>>>>>>>
>>>>>>> All messages from a given Kafka partition are received by the same
>>>>>>> consumer and thus will be ordered. However, when messages come from
>>>>>>> multiple partitions there is no such guarantee.
>>>>>>>
>>>>>>> Thomas
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 7, 2016 at 3:34 PM, Raja.Aravapalli <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> Hi
>>>>>>>>
>>>>>>>> I have built a DAG, that reads from kafka and in the next
>>>>>>>> operators, does lookup to a hbase table and update hbase table based on
>>>>>>>> some business logic.
>>>>>>>>
>>>>>>>> Some times my operator which does hbase lookup and update in the
>>>>>>>> same operator(Custom written), is processing the records it receives 
>>>>>>>> from
>>>>>>>> kafka in a jumbled order, which is causing, many records being ignored 
>>>>>>>> from
>>>>>>>> processing!!
>>>>>>>>
>>>>>>>> I am not using any parallel partitions/instance, and with
>>>>>>>> KafkaInputOperator I am using only partition strategy ONE_TO_MANY.
>>>>>>>>
>>>>>>>> I am very new to Apex. I expected, Apex will guarantee the ordering.
>>>>>>>>
>>>>>>>> Can someone pls share your knowledge on the issue…?
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks a lot in advance…
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Raja.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to