Hi,

We have a pipeline that reads data from kafka using kafkaIO, activates a
simple transformation and writes the data back to Kafka.

we are using flink runner with the following resources

parallelism: 50

Task manager: 5

Memory per each: 20G

Cpu: 10

This pipeline should handle around 400K messages per seconds.


The memory consumption of the pipeline right after startup and even before
any messages were produced to kafka is reaching to over 60%

When we start loading the pipeline after a very short time the pipeline
reaches 90% memory consumption.

There is no usage of any state and the pipeline is running with a global
window with a trigger per each event.


we activated a java profiler and saw that when the pipeline was idle, most
of the memory allocation  was done by the kafka reader (which is by default
the new sdf imp right?)

so we switched the pipeline to work with the legacy impl by

setting “use_deprecated_read” to true.


This made a really big impact on the memory consumption.

The idle pipeline was only on 30% and once I started the load of 400K per
second I reached 70% max.


Did someone encounter anything similar?

this was tested  both with beam 2.51 and beam 2.56


    PipelineUtil pipelineUtil = new PipelineUtil();

    MyOptions options = pipelineUtil.getFlinkBeamOptions(args,
MyOptions.class);

    options.setFasterCopy(true);

    options.setFailOnCheckpointingErrors(false);

    options.setExperiments(Collections.*singletonList*
("use_deprecated_read"));

    Pipeline pipeline = pipelineUtil.createPipeline(options);


    PCollection<KV<String, X>> ipSessionInput = pipeline.apply(“read”,

KafkaIO.<String, T>*read*()

                      .withBootstrapServers(bootstrapServers)

                     .withTopic(topic)

                    .withConsumerConfigUpdates(Map.*ofEntries*(

                          Map.*entry*(ConsumerConfig.*GROUP_ID_CONFIG*,
consumerGroup)

                   ))

                 .withKeyDeserializer(StringDeserializer.class)

                .withValueDeserializerAndCoder(deserializer, AvroCoder.*of*
(X.class));

            .apply(Window.<KV<String, IpSessionInfo>>into(new
GlobalWindows())

                    .triggering(Repeatedly.*forever*(AfterPane.
*elementCountAtLeast*(1)))

                    .withAllowedLateness(Duration.*ZERO*)

                    .discardingFiredPanes())

            .apply(ParDo.*of*(new TrasnformX())).

            .apply… write to another kafka another topic


    pipeline.run().waitUntilFinish();

}



thanks

Sigalit

Reply via email to