Hi Sigalit,

Could you try to run your test pipeline with 
“--experiments=use_deprecated_read” option and see if there is a difference?

—
Alexey

> On 10 Apr 2022, at 21:14, Sigalit Eliazov <e.siga...@gmail.com> wrote:
> 
> Hi all
> I saw a very low rate when  message consuming from kafka in our different 
> jobs.
> I order to find the bottleneck i created 
> a very simple pipeline that reads string messages from kafka and just prints 
> the output .
> The pipeline runs over flink cluster with the following setup:
> 1 task manager, 3 slots, parallelism set to 3
> 
> 
> PCollection<KV<String, String>> readFromKafka = 
> pipeline.apply("readFromKafka",
>         KafkaTransform.readStrFromKafka(
>                 pipelineUtil.getBootstrapServers(), topic_name, 
> consumer_group));
> readFromKafka.apply("Get message contents", Values.<String>create())
>         .apply("Log messages", 
> MapElements.into(TypeDescriptor.of(String.class))
>                 .via(message -> {
>                     log.atInfo().log("Received: {}", message);
>                     return message;
>                 }));
> 
> the kafka consumer is:
> return KafkaIO.<String, String>read()
>         .withBootstrapServers(bootstrapServers)
>         .withTopic(topic)
>         .withKeyDeserializer(StringDeserializer.class)
>         .withValueDeserializer(StringDeserializer.class)
>         .withConsumerConfigUpdates((ImmutableMap.of(
>                 "auto.offset.reset", "earliest",
>                 ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)))
>         .withoutMetadata();
> 
> according to the metrics it seems that i do  have 3 threads that read from 
> kafka but each one reads around 56 records per second.
> per my opinion this is a very low rate.
> I am not sure I understand this behaviour.
> I have checked cpu and memory issues and they both look ok.
> Any ideas would be really appreciated
> Thanks alot
> Sigalit
> 

Reply via email to