Hi all I saw a very low rate when message consuming from kafka in our different jobs. I order to find the bottleneck i created a very simple pipeline that reads string messages from kafka and just prints the output . The pipeline runs over flink cluster with the following setup: 1 task manager, 3 slots, parallelism set to 3
PCollection<KV<String, String>> readFromKafka = pipeline.apply( "readFromKafka", KafkaTransform.readStrFromKafka( pipelineUtil.getBootstrapServers(), topic_name, consumer_group)); readFromKafka.apply("Get message contents", Values.<String>create()) .apply("Log messages", MapElements.into(TypeDescriptor.of(String.class)) .via(message -> { log.atInfo().log("Received: {}", message); return message; })); the kafka consumer is: return KafkaIO.<String, String>read() .withBootstrapServers(bootstrapServers) .withTopic(topic) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class) .withConsumerConfigUpdates((ImmutableMap.of( "auto.offset.reset", "earliest", ConsumerConfig.GROUP_ID_CONFIG, consumerGroup))) .withoutMetadata(); according to the metrics it seems that i do have 3 threads that read from kafka but each one reads around 56 records per second. per my opinion this is a very low rate. I am not sure I understand this behaviour. I have checked cpu and memory issues and they both look ok. Any ideas would be really appreciated Thanks alot Sigalit