Hi, I think MyJsonDecoder is the bottleneck and I'm also afraid there is nothing to do because parsing Strings to Json is simply slow.
I think you would see the biggest gains if you had a binary representation that can quickly be serialised/deserialised to objects and you use that instead of String/JSON. Cheers, Aljoscha On Tue, 24 Jan 2017 at 12:17 Jonas <jo...@huntun.de> wrote: > Hello! I'm reposting this since the other thread had some formatting > issues apparently. I hope this time it works. I'm having performance > problems with a Flink job. If there is anything valuable missing, please > ask and I will try to answer ASAP. My job looks like this: > > /* > Settings > */ > env.setParallelism(4) > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > /* > Operator Graph > */ > env > .addSource(new FlinkKafkaConsumer09("raw.my.topic", new > SimpleStringSchema(), props)) // 100k msgs msgs/s > .map(new MyJsonDecoder) // 25k msgs/s > .map(new AddTypeToJsonForSplitting) // 20k msgs/s > .split(t => Seq(t._1.name)) > .select(TYPE_A.name) // 18k msgs/s > .flatMap(new MapJsonToEntity) // 13k msgs/s > .flatMap(new MapEntityToMultipleEntities) // 10k msgs/s > .assignTimestampsAndWatermarks(/* Nothing special */) // 6-8k msgs/s > > /* > Run > */ > env.execute() > > First, I read data from Kafka. This is very fast at 100k msgs/s. The data > is decoded, a type is added (we have multiple message types per Kafka > topic). Then we select the TYPE_A messages, create a Scala entity out of if > (a case class). Afterwards in the MapEntityToMultipleEntities the Scala > entities are split into multiple. Finally a watermark is added. As you can > see the data is not keyed in any way yet. *Is there a way to make this > faster?* > > *Measurements were taken with def writeToSocket[?](d: DataStream[?], port: > Int): Unit = { d.writeToSocket("localhost", port, new > SerializationSchema[?] { override def serialize(element: ?): Array[Byte] = > { "\n".getBytes(CharsetUtil.UTF_8) } }) } and nc -lk PORT | pv --line-mode > --rate --average-rate --format "Current: %r, Avg:%a, Total: %b" > > /dev/null*I'm > running this on a Intel i5-3470, 16G RAM, Ubuntu 16.04.1 LTS on Flink 1.1.4 > ------------------------------ > View this message in context: Improving Flink Performance > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Improving-Flink-Performance-tp11248.html> > Sent from the Apache Flink User Mailing List archive. mailing list archive > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at > Nabble.com. >