Hi,
I think MyJsonDecoder is the bottleneck and I'm also afraid there is
nothing to do because parsing Strings to Json is simply slow.

I think you would see the biggest gains if you had a binary representation
that can quickly be serialised/deserialised to objects and you use that
instead of String/JSON.

Cheers,
Aljoscha

On Tue, 24 Jan 2017 at 12:17 Jonas <jo...@huntun.de> wrote:

> Hello! I'm reposting this since the other thread had some formatting
> issues apparently. I hope this time it works. I'm having performance
> problems with a Flink job. If there is anything valuable missing, please
> ask and I will try to answer ASAP. My job looks like this:
>
>     /*
>       Settings
>      */
>     env.setParallelism(4)
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>     /*
>       Operator Graph
>      */
>     env
>       .addSource(new FlinkKafkaConsumer09("raw.my.topic", new 
> SimpleStringSchema(), props)) // 100k msgs msgs/s
>       .map(new MyJsonDecoder) // 25k msgs/s
>       .map(new AddTypeToJsonForSplitting) // 20k msgs/s
>       .split(t => Seq(t._1.name))
>       .select(TYPE_A.name) // 18k msgs/s
>       .flatMap(new MapJsonToEntity) // 13k msgs/s
>       .flatMap(new MapEntityToMultipleEntities) // 10k msgs/s
>       .assignTimestampsAndWatermarks(/* Nothing special */) // 6-8k msgs/s
>
>     /*
>       Run
>      */
>     env.execute()
>
> First, I read data from Kafka. This is very fast at 100k msgs/s. The data
> is decoded, a type is added (we have multiple message types per Kafka
> topic). Then we select the TYPE_A messages, create a Scala entity out of if
> (a case class). Afterwards in the MapEntityToMultipleEntities the Scala
> entities are split into multiple. Finally a watermark is added. As you can
> see the data is not keyed in any way yet. *Is there a way to make this
> faster?*
>
> *Measurements were taken with def writeToSocket[?](d: DataStream[?], port:
> Int): Unit = { d.writeToSocket("localhost", port, new
> SerializationSchema[?] { override def serialize(element: ?): Array[Byte] =
> { "\n".getBytes(CharsetUtil.UTF_8) } }) } and nc -lk PORT | pv --line-mode
> --rate --average-rate --format "Current: %r, Avg:%a, Total: %b" > 
> /dev/null*I'm
> running this on a Intel i5-3470, 16G RAM, Ubuntu 16.04.1 LTS on Flink 1.1.4
> ------------------------------
> View this message in context: Improving Flink Performance
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Reply via email to