Hi,
We have a pipeline that reads data from kafka using kafkaIO, activates a simple transformation and writes the data back to Kafka. we are using flink runner with the following resources parallelism: 50 Task manager: 5 Memory per each: 20G Cpu: 10 This pipeline should handle around 400K messages per seconds. The memory consumption of the pipeline right after startup and even before any messages were produced to kafka is reaching to over 60% When we start loading the pipeline after a very short time the pipeline reaches 90% memory consumption. There is no usage of any state and the pipeline is running with a global window with a trigger per each event. we activated a java profiler and saw that when the pipeline was idle, most of the memory allocation was done by the kafka reader (which is by default the new sdf imp right?) so we switched the pipeline to work with the legacy impl by setting “use_deprecated_read” to true. This made a really big impact on the memory consumption. The idle pipeline was only on 30% and once I started the load of 400K per second I reached 70% max. Did someone encounter anything similar? this was tested both with beam 2.51 and beam 2.56 PipelineUtil pipelineUtil = new PipelineUtil(); MyOptions options = pipelineUtil.getFlinkBeamOptions(args, MyOptions.class); options.setFasterCopy(true); options.setFailOnCheckpointingErrors(false); options.setExperiments(Collections.*singletonList* ("use_deprecated_read")); Pipeline pipeline = pipelineUtil.createPipeline(options); PCollection<KV<String, X>> ipSessionInput = pipeline.apply(“read”, KafkaIO.<String, T>*read*() .withBootstrapServers(bootstrapServers) .withTopic(topic) .withConsumerConfigUpdates(Map.*ofEntries*( Map.*entry*(ConsumerConfig.*GROUP_ID_CONFIG*, consumerGroup) )) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializerAndCoder(deserializer, AvroCoder.*of* (X.class)); .apply(Window.<KV<String, IpSessionInfo>>into(new GlobalWindows()) .triggering(Repeatedly.*forever*(AfterPane. *elementCountAtLeast*(1))) .withAllowedLateness(Duration.*ZERO*) .discardingFiredPanes()) .apply(ParDo.*of*(new TrasnformX())). .apply… write to another kafka another topic pipeline.run().waitUntilFinish(); } thanks Sigalit