Re: Improving Flink Performance

2017-02-06 Thread Fabian Hueske
Hi Jonas,

thanks for reporting back!
Glad you solve the issue.

Cheers, Fabian

2017-02-05 22:07 GMT+01:00 Jonas :

> Using a profiler I found out that the main performance problem (80%) was
> spent in a domain specific data structure. After implementing it with a
> more
> efficient one, the performance problems are gone.
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-
> Performance-tp11248p11447.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Improving Flink Performance

2017-02-05 Thread Jonas
Using a profiler I found out that the main performance problem (80%) was
spent in a domain specific data structure. After implementing it with a more
efficient one, the performance problems are gone.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11447.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Improving Flink Performance

2017-01-26 Thread Stephan Ewen
@jonas Flink's Fork-Join Pool drives only the actors, which are doing
coordination. Unless your job is permanently failing/recovering, they don't
do much.


On Thu, Jan 26, 2017 at 2:56 PM, Robert Metzger  wrote:

> Hi Jonas,
>
> The good news is that your job is completely parallelizable. So if you are
> running it on a cluster, you can scale it at least to the number of Kafka
> partitions you have (actually even further, because the Kafka consumers are
> not the issue).
>
> I don't think that the scala (=akka) worker threads are really the thing
> that slows everything done. These threads should usually idle.
> I just tried it with Visualvm (I don't own a Jprofiler license :) ) and
> you can nicely see what's eating up CPU resources in my job:
> http://i.imgur.com/nqXeHdi.png
>
>
>
>
> On Thu, Jan 26, 2017 at 1:23 PM, Jonas  wrote:
>
>> JProfiler
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Improving-Flink-Per
>> formance-tp11248p11311.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>


Re: Improving Flink Performance

2017-01-26 Thread Robert Metzger
Hi Jonas,

The good news is that your job is completely parallelizable. So if you are
running it on a cluster, you can scale it at least to the number of Kafka
partitions you have (actually even further, because the Kafka consumers are
not the issue).

I don't think that the scala (=akka) worker threads are really the thing
that slows everything done. These threads should usually idle.
I just tried it with Visualvm (I don't own a Jprofiler license :) ) and you
can nicely see what's eating up CPU resources in my job:
http://i.imgur.com/nqXeHdi.png




On Thu, Jan 26, 2017 at 1:23 PM, Jonas  wrote:

> JProfiler
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-
> Performance-tp11248p11311.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Improving Flink Performance

2017-01-26 Thread Jonas
JProfiler



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11311.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Improving Flink Performance

2017-01-26 Thread dromitlabs
Offtopic: What profiler is it that you're using?

> On Jan 25, 2017, at 18:11, Jonas  wrote:
> 
> Images:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n11305/Tv6KnR6.png
> and
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n11305/Tv6KnR6.png
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11307.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.


Re: Improving Flink Performance

2017-01-25 Thread Jonas
Images:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n11305/Tv6KnR6.png
and
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n11305/Tv6KnR6.png



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11307.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Improving Flink Performance

2017-01-25 Thread Jonas
I ran a profiler on my Job and it seems that most of the time, its waiting :O
See here:
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n11305/Tv6KnR6.png>
Also, the following code snippet executes unexpectedly slow: as you can see
in this call graph:
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n11305/Tv6KnR6.png>
*Any ideas? *



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11305.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Improving Flink Performance

2017-01-25 Thread Jonas
I tried and it added a little performance (~10%) but nothing outstanding.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11301.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Improving Flink Performance

2017-01-25 Thread Stephan Ewen
Have you tried the object reuse option mentioned above?

On Tue, Jan 24, 2017 at 6:52 PM, Jonas  wrote:

> The performance hit due to decoding the JSON is expected and there is not a
> lot (except for changing the encoding that I can do about that). Alright.
>
> When joining the above stream with another stream I get another performance
> hit by ~80% so that in the end I have only 1k msgs/s remaining. Do you know
> how to improve that? Might setting the buffer size / timeout be worth
> exploring?
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-
> Performance-tp11248p11272.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Improving Flink Performance

2017-01-24 Thread Jonas
The performance hit due to decoding the JSON is expected and there is not a
lot (except for changing the encoding that I can do about that). Alright.

When joining the above stream with another stream I get another performance
hit by ~80% so that in the end I have only 1k msgs/s remaining. Do you know
how to improve that? Might setting the buffer size / timeout be worth
exploring?




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248p11272.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Improving Flink Performance

2017-01-24 Thread Stephan Ewen
One thing you can try and do is to enable object reuse in the execution
config.
That should get rid of the overhead when passing the JSON objects from
function to function.

On Tue, Jan 24, 2017 at 6:00 PM, Aljoscha Krettek 
wrote:

> Hi,
> I think MyJsonDecoder is the bottleneck and I'm also afraid there is
> nothing to do because parsing Strings to Json is simply slow.
>
> I think you would see the biggest gains if you had a binary representation
> that can quickly be serialised/deserialised to objects and you use that
> instead of String/JSON.
>
> Cheers,
> Aljoscha
>
> On Tue, 24 Jan 2017 at 12:17 Jonas  wrote:
>
>> Hello! I'm reposting this since the other thread had some formatting
>> issues apparently. I hope this time it works. I'm having performance
>> problems with a Flink job. If there is anything valuable missing, please
>> ask and I will try to answer ASAP. My job looks like this:
>>
>> /*
>>   Settings
>>  */
>> env.setParallelism(4)
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>
>> /*
>>   Operator Graph
>>  */
>> env
>>   .addSource(new FlinkKafkaConsumer09("raw.my.topic", new 
>> SimpleStringSchema(), props)) // 100k msgs msgs/s
>>   .map(new MyJsonDecoder) // 25k msgs/s
>>   .map(new AddTypeToJsonForSplitting) // 20k msgs/s
>>   .split(t => Seq(t._1.name))
>>   .select(TYPE_A.name) // 18k msgs/s
>>   .flatMap(new MapJsonToEntity) // 13k msgs/s
>>   .flatMap(new MapEntityToMultipleEntities) // 10k msgs/s
>>   .assignTimestampsAndWatermarks(/* Nothing special */) // 6-8k msgs/s
>>
>> /*
>>   Run
>>  */
>> env.execute()
>>
>> First, I read data from Kafka. This is very fast at 100k msgs/s. The data
>> is decoded, a type is added (we have multiple message types per Kafka
>> topic). Then we select the TYPE_A messages, create a Scala entity out of if
>> (a case class). Afterwards in the MapEntityToMultipleEntities the Scala
>> entities are split into multiple. Finally a watermark is added. As you can
>> see the data is not keyed in any way yet. *Is there a way to make this
>> faster?*
>>
>> *Measurements were taken with def writeToSocket[?](d: DataStream[?],
>> port: Int): Unit = { d.writeToSocket("localhost", port, new
>> SerializationSchema[?] { override def serialize(element: ?): Array[Byte] =
>> { "\n".getBytes(CharsetUtil.UTF_8) } }) } and nc -lk PORT | pv --line-mode
>> --rate --average-rate --format "Current: %r, Avg:%a, Total: %b" > 
>> /dev/null*I'm
>> running this on a Intel i5-3470, 16G RAM, Ubuntu 16.04.1 LTS on Flink 1.1.4
>> --
>> View this message in context: Improving Flink Performance
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> at Nabble.com.
>>
>


Re: Improving Flink Performance

2017-01-24 Thread Aljoscha Krettek
Hi,
I think MyJsonDecoder is the bottleneck and I'm also afraid there is
nothing to do because parsing Strings to Json is simply slow.

I think you would see the biggest gains if you had a binary representation
that can quickly be serialised/deserialised to objects and you use that
instead of String/JSON.

Cheers,
Aljoscha

On Tue, 24 Jan 2017 at 12:17 Jonas  wrote:

> Hello! I'm reposting this since the other thread had some formatting
> issues apparently. I hope this time it works. I'm having performance
> problems with a Flink job. If there is anything valuable missing, please
> ask and I will try to answer ASAP. My job looks like this:
>
> /*
>   Settings
>  */
> env.setParallelism(4)
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> /*
>   Operator Graph
>  */
> env
>   .addSource(new FlinkKafkaConsumer09("raw.my.topic", new 
> SimpleStringSchema(), props)) // 100k msgs msgs/s
>   .map(new MyJsonDecoder) // 25k msgs/s
>   .map(new AddTypeToJsonForSplitting) // 20k msgs/s
>   .split(t => Seq(t._1.name))
>   .select(TYPE_A.name) // 18k msgs/s
>   .flatMap(new MapJsonToEntity) // 13k msgs/s
>   .flatMap(new MapEntityToMultipleEntities) // 10k msgs/s
>   .assignTimestampsAndWatermarks(/* Nothing special */) // 6-8k msgs/s
>
> /*
>   Run
>  */
> env.execute()
>
> First, I read data from Kafka. This is very fast at 100k msgs/s. The data
> is decoded, a type is added (we have multiple message types per Kafka
> topic). Then we select the TYPE_A messages, create a Scala entity out of if
> (a case class). Afterwards in the MapEntityToMultipleEntities the Scala
> entities are split into multiple. Finally a watermark is added. As you can
> see the data is not keyed in any way yet. *Is there a way to make this
> faster?*
>
> *Measurements were taken with def writeToSocket[?](d: DataStream[?], port:
> Int): Unit = { d.writeToSocket("localhost", port, new
> SerializationSchema[?] { override def serialize(element: ?): Array[Byte] =
> { "\n".getBytes(CharsetUtil.UTF_8) } }) } and nc -lk PORT | pv --line-mode
> --rate --average-rate --format "Current: %r, Avg:%a, Total: %b" > 
> /dev/null*I'm
> running this on a Intel i5-3470, 16G RAM, Ubuntu 16.04.1 LTS on Flink 1.1.4
> --
> View this message in context: Improving Flink Performance
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>


Improving Flink Performance

2017-01-24 Thread Jonas
Hello!I'm reposting this since the other thread had some formatting issues
apparently. I hope this time it works.I'm having performance problems with a
Flink job. If there is anything valuable missing, please ask and I will try
to answer ASAP. My job looks like this:First, I read data from Kafka. This
is very fast at 100k msgs/s. The data is decoded, a type is added (we have
multiple message types per Kafka topic). Then we select the TYPE_A messages,
create a Scala entity out of if (a case class). Afterwards in the
MapEntityToMultipleEntities the Scala entities are split into multiple.
Finally a watermark is added.As you can see the data is not keyed in any way
yet. *Is there a way to make this faster?*/Measurements were taken withand
/I'm running this on a Intel i5-3470, 16G RAM, Ubuntu 16.04.1 LTS on Flink
1.1.4



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Improving Flink performance

2017-01-24 Thread Jonas
I don't even have images in there :O Will delete this thread and create a new
one.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-performance-tp11211p11245.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Improving Flink performance

2017-01-23 Thread Ted Yu
After "My job looks like this:", it was empty.

Please consider using third party site for images.

Cheers

On Mon, Jan 23, 2017 at 10:03 AM, Jonas  wrote:

> I received it well-formatted. May it be that the issue is your Mail reader?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-
> performance-tp11211p11225.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Improving Flink performance

2017-01-23 Thread Jonas
I received it well-formatted. May it be that the issue is your Mail reader?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-performance-tp11211p11225.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Improving Flink performance

2017-01-23 Thread Greg Hogan
Hi Jonas,

It looks like the mailing list has removed your formatting and/or
attachments.

Greg

On Mon, Jan 23, 2017 at 6:08 AM, Jonas  wrote:

> Hello!
>
> I'm having performance problems with a Flink job. If there is anything
> valuable missing, please ask and I will try to answer ASAP. My job looks
> like this:
>
>
>
> First, I read data from Kafka. This is very fast at 100k msgs/s. The data
> is
> decoded, a type is added (we have multiple message types per Kafka topic).
> Then we select the TYPE_A messages, create a Scala entity out of if (a case
> class). Afterwards in the MapEntityToMultipleEntities the Scala entities
> are
> split into multiple. Finally a watermark is added.
>
> As you can see the data is not keyed in any way yet. *Is there a way to
> make
> this faster?*
>
>
> /Measurements were taken with
>
> and /
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-
> performance-tp11211.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Improving Flink performance

2017-01-23 Thread Jonas
Hello!

I'm having performance problems with a Flink job. If there is anything
valuable missing, please ask and I will try to answer ASAP. My job looks
like this:



First, I read data from Kafka. This is very fast at 100k msgs/s. The data is
decoded, a type is added (we have multiple message types per Kafka topic).
Then we select the TYPE_A messages, create a Scala entity out of if (a case
class). Afterwards in the MapEntityToMultipleEntities the Scala entities are
split into multiple. Finally a watermark is added.

As you can see the data is not keyed in any way yet. *Is there a way to make
this faster?*


/Measurements were taken with

and /



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-performance-tp11211.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.