Any fields within an operator that are not declared "transient" are
considered part of the operator state and
are checkpointed; on failure, they are restored from the checkpoint
automatically by the platform.

Ram

On Thu, Jun 9, 2016 at 1:04 PM, Raja.Aravapalli <[email protected]>
wrote:

>
> Really a great thought!!
>
> Wondering how will application handle the failures ?
>
> Also, what does this phrase mean “hold them in the operator state” ?  Hold
> incoming messages in some data structure, map etc ?
>
> -Regards,
> Raja.
>
> From: Thomas Weise <[email protected]>
> Reply-To: "[email protected]" <[email protected]>
> Date: Thursday, June 9, 2016 at 2:57 PM
> To: "[email protected]" <[email protected]>
> Subject: Re: kafka input is processing records in a jumbled order
>
> You can order the messages by event time as they arrive, hold them in the
> operator state and then only emit those in the endWindow callback that are
> older than <threshold> (note that you are not simply emitting all, only
> those that are old enough).
>
> Thomas
>
> On Thu, Jun 9, 2016 at 12:22 PM, Raja.Aravapalli <
> [email protected]> wrote:
>
>>
>> So,  you want me to add all the incoming tuples into a Map, and complete
>> the processing in endwindow() ??
>>
>> How would this solve my problem as described below with windowing.
>>
>>
>> msg2 ts2
>> —————— window ends here
>> msg1 ts1
>> msg3 ts3
>> msg4 ts4
>> msg5 ts5
>> msg7 ts7
>> msg6 ts6
>> —————— window ends here
>>
>> Thanks a lot for your inputs. Your thoughts are valuable!!
>>
>>
>>
>> Regards,
>> Raja.
>>
>> From: Thomas Weise <[email protected]>
>> Reply-To: "[email protected]" <[email protected]>
>> Date: Thursday, June 9, 2016 at 1:49 PM
>>
>> To: "[email protected]" <[email protected]>
>> Subject: Re: kafka input is processing records in a jumbled order
>>
>> There is no need for an extra thread. In fact, tuples should be emitted
>> in the operator thread only. This can be done in endWindow()
>>
>> --
>> sent from mobile
>> On Jun 9, 2016 11:46 AM, "Sandesh Hegde" <[email protected]> wrote:
>>
>>>
>>> How about something like this,
>>>
>>> Store the incoming tuples in the following format:
>>>    TreeMap<TimeStamp, List<Tuples>>
>>>
>>> Create a Flusher thread, which periodically flushes the *fristKey*,
>>> after considering the lag.
>>>
>>>
>>> On Thu, Jun 9, 2016 at 11:09 AM Munagala Ramanath <[email protected]>
>>> wrote:
>>>
>>>> You'll need to have some some limit one how a lag is possible for
>>>> out-of-order messages.
>>>> If that limit is say 30s, then you'll need to buffer tuples for double
>>>> the lag -- 60s.
>>>>
>>>> You can configure the Application Window size suitably to do this.
>>>>
>>>> Ram
>>>>
>>>> On Thu, Jun 9, 2016 at 10:40 AM, Raja.Aravapalli <
>>>> [email protected]> wrote:
>>>>
>>>>>
>>>>> No aggregation, but I need messages to be played in sequential !!
>>>>>
>>>>>
>>>>> Ex:
>>>>>
>>>>> Below is the way actually msgs should come from my kafka topic
>>>>>
>>>>> msg1 ts1
>>>>> msg2 ts2
>>>>> msg3 ts3
>>>>> msg4 ts4
>>>>> msg5 ts5
>>>>> msg6 ts6
>>>>> msg7 ts7
>>>>>
>>>>>
>>>>> But, due to some network issues I am seeing the messages in kafka
>>>>> topic something like below:
>>>>>
>>>>> msg2 ts2  ==> msg2 which actually should come after msg1, but
>>>>> unfortunately msg2 is coming to kafka before msg1, losing the sequence!!
>>>>> msg1 ts1 ==> delayed by few milli secs to seconds to reach on time!!
>>>>> msg3 ts3
>>>>> msg4 ts4
>>>>> msg5 ts5
>>>>> msg7 ts7 ==> msg7 had come early into topics before msg6
>>>>> msg6 ts6 ==> delayed !!
>>>>>
>>>>>
>>>>> I am losing the order of messages and business logic gives correct
>>>>> results only when msgs played in sequence!!
>>>>>
>>>>> Now if I define windowing/some buffering and then order on timestamp
>>>>> and play msgs…
>>>>>
>>>>> What if window boundary takes
>>>>>
>>>>> msg2 ts2
>>>>> —————— window ends here
>>>>> msg1 ts1
>>>>> msg3 ts3
>>>>> msg4 ts4
>>>>> msg5 ts5
>>>>> msg7 ts7
>>>>> msg6 ts6
>>>>> —————— window ends here
>>>>>
>>>>> Now, if you see, even though I am trying to do buffering and then
>>>>> ordering the msgs based on some timstamp, I still face the problem of msg2
>>>>> already processed before msg1 !! Which I don’t want.
>>>>>
>>>>> Did I really understand windowing correctly…. Pls correct me if I am
>>>>> wrong!! Thanks for your thoughts!!
>>>>>
>>>>>
>>>>> Regards,
>>>>> Raja.
>>>>>
>>>>> From: Thomas Weise <[email protected]>
>>>>> Reply-To: "[email protected]" <[email protected]>
>>>>> Date: Thursday, June 9, 2016 at 10:51 AM
>>>>> To: "[email protected]" <[email protected]>
>>>>> Subject: Re: kafka input is processing records in a jumbled order
>>>>>
>>>>> Apex can do stateful processing, you can define a window in which you
>>>>> can reorder the messages. It will have the same effect on latency as
>>>>> "micro-batching".
>>>>>
>>>>> Why is the ordering important? What operations do you perform on the
>>>>> data? Aggregation?
>>>>>
>>>>> Thanks,
>>>>> Thomas
>>>>>
>>>>>
>>>>> On Thu, Jun 9, 2016 at 8:23 AM, Raja.Aravapalli <
>>>>> [email protected]> wrote:
>>>>>
>>>>>>
>>>>>> My bad… we observes our source data in kafka topics is not really in
>>>>>> a ordered fashion, where we are seeing the messages with few milli secs
>>>>>> delay.!!
>>>>>>
>>>>>> Source couldn’t ensure the ordering guarantee due to the network!!
>>>>>>
>>>>>> Is there a right way for me from consumer standpoint, I can ensure
>>>>>> ordering ?? Will micro batching work for me here ? Or Does apex support
>>>>>> micro batching and order the messages ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Raja
>>>>>>
>>>>>> From: Thomas Weise <[email protected]>
>>>>>> Reply-To: "[email protected]" <[email protected]>
>>>>>> Date: Tuesday, June 7, 2016 at 10:59 PM
>>>>>>
>>>>>> To: "[email protected]" <[email protected]>
>>>>>> Subject: Re: kafka input is processing records in a jumbled order
>>>>>>
>>>>>> Raja,
>>>>>>
>>>>>> Please also confirm how you are using partitioning. If, for example,
>>>>>> in your DAG you shuffle the data received from Kafka in a way that is
>>>>>> different from the original partitioning, then it would be possible that
>>>>>> multiple downstream partitions process data that came from a single Kafka
>>>>>> partition concurrently and therefore in a different order.
>>>>>>
>>>>>> Thomas
>>>>>>
>>>>>>
>>>>>> On Tue, Jun 7, 2016 at 6:33 PM, Raja.Aravapalli <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>>
>>>>>>> Yes Devendra.
>>>>>>>
>>>>>>> p1.10 is read before p1.1 !!
>>>>>>>
>>>>>>> Sure I shall check that. Thanks a lot for your response.
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> Raja.
>>>>>>>
>>>>>>> From: Devendra Tagare <[email protected]>
>>>>>>> Reply-To: "[email protected]" <[email protected]>
>>>>>>> Date: Tuesday, June 7, 2016 at 7:59 PM
>>>>>>>
>>>>>>> To: "[email protected]" <[email protected]>
>>>>>>> Subject: Re: kafka input is processing records in a jumbled order
>>>>>>>
>>>>>>> Hi Raja,
>>>>>>>
>>>>>>> Just to be clear are you suggesting that p1.10 is being read before
>>>>>>> p1.1 ?
>>>>>>>
>>>>>>> If thats the case can you use a console consumer that comes packed
>>>>>>> with kafka and verify the ordering based on timestamps ?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Dev
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 7, 2016 at 5:31 PM, Raja.Aravapalli <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> Thanks a lot Devendra Tagare for the response.
>>>>>>>>
>>>>>>>> What you said is very clear and understandable. But, wondering, I
>>>>>>>> am NOT getting that partition level order!! My operator is processing 
>>>>>>>> the
>>>>>>>> records in jumbled order rather than in sequence!
>>>>>>>> And, I am saying this because, I am generating timestamps upon
>>>>>>>> tuple receipt and emitting that timestamp to my destination, which is
>>>>>>>> clearly showing the records are receiving to operator in a shuffled 
>>>>>>>> order.
>>>>>>>>
>>>>>>>> I get records at milli second level differences!! Will that be a
>>>>>>>> problem ?
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Raja.
>>>>>>>>
>>>>>>>> From: Devendra Tagare <[email protected]>
>>>>>>>> Reply-To: "[email protected]" <[email protected]>
>>>>>>>> Date: Tuesday, June 7, 2016 at 7:12 PM
>>>>>>>>
>>>>>>>> To: "[email protected]" <[email protected]>
>>>>>>>> Subject: Re: kafka input is processing records in a jumbled order
>>>>>>>>
>>>>>>>> Hi Raja,
>>>>>>>>
>>>>>>>> When you apply ONE_TO_MANY partitioning scheme, one instance of
>>>>>>>> the operator consumes from many partitions of a kafka topic.
>>>>>>>>
>>>>>>>> When you look at the consumed data, all the events coming from a
>>>>>>>> given partition would be ordered but there are no ordering guarantees
>>>>>>>> across partitions since kafka does not guarantee that
>>>>>>>>
>>>>>>>> eg : If 3 partitions of a topic p1,p2,p3 having 10 messages each
>>>>>>>> are connected to one physical partition of the KafkaInputOperator , 
>>>>>>>> then
>>>>>>>> the ordering guarantee of p1.1 to p1.10 is honored.ie message 10
>>>>>>>> of p1 be consumed only after messages 1 through 9 are consumed but the
>>>>>>>> operator could consumer messages in a order like
>>>>>>>> p1.1,p2.1,p1.2,p1.3,p3.1,p2.2..... which still follows the guarantees 
>>>>>>>> per
>>>>>>>> partition.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Dev
>>>>>>>>
>>>>>>>> On Tue, Jun 7, 2016 at 5:00 PM, Raja.Aravapalli <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks for the response Thomas.
>>>>>>>>>
>>>>>>>>> My quick doubt is..
>>>>>>>>>
>>>>>>>>> I have around 30 partitions of kafka topic, And all of them have
>>>>>>>>> messages ordered at partition level.
>>>>>>>>>
>>>>>>>>> So, when I consume those messages using single consumer[with
>>>>>>>>> ONE_TO_MANY strategy set], still the ordering doesn’t work ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> My messages in topic are guaranteed to be ordered at partition
>>>>>>>>> level.
>>>>>>>>>
>>>>>>>>> Thanks a lot in advance for your response.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Raja.
>>>>>>>>>
>>>>>>>>> From: Thomas Weise <[email protected]>
>>>>>>>>> Reply-To: "[email protected]" <[email protected]>
>>>>>>>>> Date: Tuesday, June 7, 2016 at 5:52 PM
>>>>>>>>> To: "[email protected]" <[email protected]>
>>>>>>>>> Subject: Re: kafka input is processing records in a jumbled order
>>>>>>>>>
>>>>>>>>> Raja,
>>>>>>>>>
>>>>>>>>> Are you expecting ordering across multiple Kafka partitions?
>>>>>>>>>
>>>>>>>>> All messages from a given Kafka partition are received by the same
>>>>>>>>> consumer and thus will be ordered. However, when messages come from
>>>>>>>>> multiple partitions there is no such guarantee.
>>>>>>>>>
>>>>>>>>> Thomas
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Jun 7, 2016 at 3:34 PM, Raja.Aravapalli <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Hi
>>>>>>>>>>
>>>>>>>>>> I have built a DAG, that reads from kafka and in the next
>>>>>>>>>> operators, does lookup to a hbase table and update hbase table based 
>>>>>>>>>> on
>>>>>>>>>> some business logic.
>>>>>>>>>>
>>>>>>>>>> Some times my operator which does hbase lookup and update in the
>>>>>>>>>> same operator(Custom written), is processing the records it receives 
>>>>>>>>>> from
>>>>>>>>>> kafka in a jumbled order, which is causing, many records being 
>>>>>>>>>> ignored from
>>>>>>>>>> processing!!
>>>>>>>>>>
>>>>>>>>>> I am not using any parallel partitions/instance, and with
>>>>>>>>>> KafkaInputOperator I am using only partition strategy ONE_TO_MANY.
>>>>>>>>>>
>>>>>>>>>> I am very new to Apex. I expected, Apex will guarantee the
>>>>>>>>>> ordering.
>>>>>>>>>>
>>>>>>>>>> Can someone pls share your knowledge on the issue…?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks a lot in advance…
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Raja.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>

Reply via email to