Hi all
I saw a very low rate when  message consuming from kafka in our different
jobs.
I order to find the bottleneck i created
a very simple pipeline that reads string messages from kafka and just
prints
the output .
The pipeline runs over flink cluster with the following setup:
1 task manager, 3 slots, parallelism set to 3


PCollection<KV<String, String>> readFromKafka = pipeline.apply(
"readFromKafka",

        KafkaTransform.readStrFromKafka(
                pipelineUtil.getBootstrapServers(), topic_name,
consumer_group));
readFromKafka.apply("Get message contents", Values.<String>create())
        .apply("Log messages", MapElements.into(TypeDescriptor.of(String.class))
                .via(message -> {
                    log.atInfo().log("Received: {}", message);
                    return message;
                }));


the kafka consumer is:

return KafkaIO.<String, String>read()
        .withBootstrapServers(bootstrapServers)
        .withTopic(topic)
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializer(StringDeserializer.class)
        .withConsumerConfigUpdates((ImmutableMap.of(
                "auto.offset.reset", "earliest",
                ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)))

        .withoutMetadata();


according to the metrics it seems that i do  have 3 threads that read
from kafka but each one reads around 56 records per second.

per my opinion this is a very low rate.

I am not sure I understand this behaviour.

I have checked cpu and memory issues and they both look ok.

Any ideas would be really appreciated

Thanks alot

Sigalit

Reply via email to