Re: Partitioner is spending around 2 to 4 minutes while pushing data to next operator

2017-06-30 Thread Aljoscha Krettek
Yes, in the end the requests to HBase are the bottle neck and the latency will 
manifest in different places of the job depending on where there is a queue. If 
there is a queue between map and flatMap elements will sit there and wait and 
you’ll see latency there. If map and flatMap are chained you will see the 
latency in the form of Kafka consumer lag (Kafka itself is the queue here).

Best,
Aljoscha
> On 29. Jun 2017, at 18:30, sohimankotia  wrote:
> 
> Few last doubts :
> 
> 1. So If I increase parallelism latency will decrease because load will get
> distributed  ?
> 2. But if load will increase latency will also increase if parallelism is
> more ?
> 3. Let's say If I remove partitioner , and Hbase Op is still there in Flat
> map . Then also this latency would be there ?
> 4. If yes in point 3 , then latency would from reading elements from kafka ?
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913p14076.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Partitioner is spending around 2 to 4 minutes while pushing data to next operator

2017-06-29 Thread sohimankotia
Few last doubts :

1. So If I increase parallelism latency will decrease because load will get
distributed  ?
2. But if load will increase latency will also increase if parallelism is
more ?
3. Let's say If I remove partitioner , and Hbase Op is still there in Flat
map . Then also this latency would be there ?
4. If yes in point 3 , then latency would from reading elements from kafka ?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913p14076.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Partitioner is spending around 2 to 4 minutes while pushing data to next operator

2017-06-29 Thread Aljoscha Krettek
Yes, this is exactly right!

> On 29. Jun 2017, at 17:42, sohimankotia  wrote:
> 
> So , it means when elements leave 
> 
> map => sit in buffer (due to partitioner) => enter flatmap 
> 
> Since Hbase op in flat map are taking time lets say 1 sec per operation ,
> next element will not be read from buffer until HBase Op is done.
> 
> Due to this Hbase op , time to enter to flat map from map will get
> accumulated for elements waiting in buffer .
> 
> Let me know if I understood correctly ?
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913p14072.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Partitioner is spending around 2 to 4 minutes while pushing data to next operator

2017-06-29 Thread sohimankotia
So , it means when elements leave 

map => sit in buffer (due to partitioner) => enter flatmap 

Since Hbase op in flat map are taking time lets say 1 sec per operation ,
next element will not be read from buffer until HBase Op is done.

Due to this Hbase op , time to enter to flat map from map will get
accumulated for elements waiting in buffer .

Let me know if I understood correctly ?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913p14072.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Partitioner is spending around 2 to 4 minutes while pushing data to next operator

2017-06-29 Thread Aljoscha Krettek
Even if the request time to HBase is just a couple of milliseconds this will 
add up and the elements sitting in the buffer between the map and flatMap will 
have high perceived latency, yes.


> On 28. Jun 2017, at 16:54, sohimankotia  wrote:
> 
> I had same concern regarding HBase . So I also added metric to measure Hbase
> op time in flatmap (Basically complete flatmap op).
> 
> From metrics I see that aprox 96 % time op time was under 1 sec. (Still I
> can do a dummy run without HBase op . But  did these timing make sense?)
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913p14040.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Partitioner is spending around 2 to 4 minutes while pushing data to next operator

2017-06-28 Thread sohimankotia
I had same concern regarding HBase . So I also added metric to measure Hbase
op time in flatmap (Basically complete flatmap op).

>From metrics I see that aprox 96 % time op time was under 1 sec. (Still I
can do a dummy run without HBase op . But  did these timing make sense?)



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913p14040.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Partitioner is spending around 2 to 4 minutes while pushing data to next operator

2017-06-28 Thread Aljoscha Krettek
I see, what I consider highly likely here is that the lookup to HBase is the 
bottleneck. If the lookup takes to long events “sit in a queue” between the map 
and flatMap operations. If you replace the HBase lookup by some dummy code you 
should see the latency go away.

The reason you don’t see latency when you don’t have a custom partitioner is 
that here the map and flatMap are chained together: sending an event from one 
operator to the next is basically just a function call and there is therefore 
no queue that can be filled that makes events “wait”.

Best,
Aljoscha

> On 28. Jun 2017, at 15:17, sohimankotia  wrote:
> 
> Source is KafKa .
> FlatMap has HBase Lookup
> Sink is Kafka .
> 
> I tried to get stats over the days . I see that almost 40 % were having
> latency of 0 seconds , 10 % 0-30 sec, approx 10% 30-60 sec and 10 % around
> 60 - 120 sec and 30 % around 120 - 210 secs .
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913p14036.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Partitioner is spending around 2 to 4 minutes while pushing data to next operator

2017-06-28 Thread sohimankotia
Source is KafKa .
FlatMap has HBase Lookup
Sink is Kafka .

I tried to get stats over the days . I see that almost 40 % were having
latency of 0 seconds , 10 % 0-30 sec, approx 10% 30-60 sec and 10 % around
60 - 120 sec and 30 % around 120 - 210 secs .



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913p14036.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Partitioner is spending around 2 to 4 minutes while pushing data to next operator

2017-06-28 Thread Aljoscha Krettek
I think then there is something going wrong somewhere. Usually people get 
millisecond latencies even when they have a “keyBy” or shuffle in-between 
operations (which are not different to a custom partitioner at the system 
level).

What kind of sources/sinks is your program using?

Best,
Aljoscha

> On 27. Jun 2017, at 17:04, sohimankotia  wrote:
> 
> So In following execution flow :
> 
> source -> map -> partitioner -> flatmap -> sink 
> 
> I am attaching current time to tuple while emitting from map function , and
> then extracting that timestamp value from tuple in flatmap at a very first
> step . Then I am calculating difference between time attached while emitting
> from map and entering into flatmap .
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913p14025.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Partitioner is spending around 2 to 4 minutes while pushing data to next operator

2017-06-27 Thread sohimankotia
So In following execution flow :

 source -> map -> partitioner -> flatmap -> sink 

I am attaching current time to tuple while emitting from map function , and
then extracting that timestamp value from tuple in flatmap at a very first
step . Then I am calculating difference between time attached while emitting
from map and entering into flatmap .



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913p14025.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Partitioner is spending around 2 to 4 minutes while pushing data to next operator

2017-06-27 Thread Aljoscha Krettek
Hi,

What do you mean by latency and how are you measuring this in your job?

Best,
Aljoscha

> On 22. Jun 2017, at 14:23, sohimankotia  wrote:
> 
> Hi Chesnay,
> 
> I have data categorized on some attribute(Key in partition ) which will be
> having n possible values.  As of now job is enabled for only one value of
> that attribute . In couple of days we will enable all values of attribute
> with more parallelism so each attribute's type data get processed in single
> instance .
> 
> So, while running with parallelism 1 I just observed the 2 to 4 minutes
> latency from map -> p -> flatmap
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913p13916.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Partitioner is spending around 2 to 4 minutes while pushing data to next operator

2017-06-22 Thread sohimankotia
Hi Chesnay,

I have data categorized on some attribute(Key in partition ) which will be
having n possible values.  As of now job is enabled for only one value of
that attribute . In couple of days we will enable all values of attribute
with more parallelism so each attribute's type data get processed in single
instance .

So, while running with parallelism 1 I just observed the 2 to 4 minutes
latency from map -> p -> flatmap



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913p13916.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Partitioner is spending around 2 to 4 minutes while pushing data to next operator

2017-06-22 Thread Chesnay Schepler

So let's get the obvious question out of the way:

Why are you adding a partitioner when your parallelism is 1?

On 22.06.2017 11:58, sohimankotia wrote:

I have a execution flow (Streaming Job) with parallelism 1.

  source -> map -> partitioner -> flatmap -> sink

Since adding partitioner will start new thread but partitioner is spending
average of 2 to 4 minutes while moving data from map to flat map .

For more details about this  :
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Using-Custom-Partitioner-in-Streaming-with-parallelism-1-adding-latency-td13766.html

In some link here :
https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks

they have mentioned that the

  PipelinedSubpartition is a pipelined implementation to support streaming
data exchange. The SpillableSubpartition is a blocking implementation to
support batch data exchange.

I am not sure how would i use these or reduce latency from map ->
partitioner -> flatmap .






--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Partitioner-is-spending-around-2-to-4-minutes-while-pushing-data-to-next-operator-tp13913.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.