Hi, After migrating our project from flink 1.2.1 to flink 1.3.2, we noticed a big performance drop due to a bad vertices balancing between task manager.
In our use case, we set the default parallelism to the number of task managers : val stream: DataStream[Array[Byte]] = env.addSource(new FlinkKafkaConsumer09[Array[Byte]]( ... ) .name("kafkaConsumer").rescale // 1 operator / instance val parallelism = nbTaskManagers * nbTaskSlots val hydratedStream: DataStream[Message] = stream .flatMap(avroDeserializer).name("AvroDeserializer").setParallelism(parallelism) .flatMap(messageParser).name("MessageParser").setParallelism(parallelism) .flatMap(messageHydration).name("Hydration").setParallelism(parallelism) .filter(MessageFilter).name("MessageFilter").setParallelism(parallelism) hydratedStream.rescale // 1 operator / instance .addSink(kafkaSink).name("KafkaSink") If we take an example of 2 task managers with 4 slots by task manager with flink 1.2.1 we had for each instances : - 1 kafkaConsumer -> 4 mapOperators -> 1 kafkaSink But with exactly the same code with flink 1.3.2 the sinks are all located to one instance : first instance : - 1 kafkaConsumer -> 4 mapOperators -> 2 kafkaSink second instance : - 1 kafkaConsumer -> 4 mapOperators -> no kafkaSink (network transfert to the first task manager) This behaviour is the same with more task managers either in a local cluster or in a yarn cluster Is it a bug or should I update my code to have the same behaviour as flink 1.2.1 ?