Hi Sigalit, Could you try to run your test pipeline with “--experiments=use_deprecated_read” option and see if there is a difference?
— Alexey > On 10 Apr 2022, at 21:14, Sigalit Eliazov <e.siga...@gmail.com> wrote: > > Hi all > I saw a very low rate when message consuming from kafka in our different > jobs. > I order to find the bottleneck i created > a very simple pipeline that reads string messages from kafka and just prints > the output . > The pipeline runs over flink cluster with the following setup: > 1 task manager, 3 slots, parallelism set to 3 > > > PCollection<KV<String, String>> readFromKafka = > pipeline.apply("readFromKafka", > KafkaTransform.readStrFromKafka( > pipelineUtil.getBootstrapServers(), topic_name, > consumer_group)); > readFromKafka.apply("Get message contents", Values.<String>create()) > .apply("Log messages", > MapElements.into(TypeDescriptor.of(String.class)) > .via(message -> { > log.atInfo().log("Received: {}", message); > return message; > })); > > the kafka consumer is: > return KafkaIO.<String, String>read() > .withBootstrapServers(bootstrapServers) > .withTopic(topic) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class) > .withConsumerConfigUpdates((ImmutableMap.of( > "auto.offset.reset", "earliest", > ConsumerConfig.GROUP_ID_CONFIG, consumerGroup))) > .withoutMetadata(); > > according to the metrics it seems that i do have 3 threads that read from > kafka but each one reads around 56 records per second. > per my opinion this is a very low rate. > I am not sure I understand this behaviour. > I have checked cpu and memory issues and they both look ok. > Any ideas would be really appreciated > Thanks alot > Sigalit >