[ https://issues.apache.org/jira/browse/KAFKA-6892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16471647#comment-16471647 ]
Dawid Kulig commented on KAFKA-6892: ------------------------------------ [~mjsax] Thank you for your comment. Shame on me but I must have missed that capacity planning guide. However having look on that guide I'd say we should fit in the resources we currently have. Speaking about topology, please see below: We have 4 source topics: [ST1, ST2, ST3, ST4], messages with a string key and 2 destination topics : [DT1, DT2] Pipieline A) {code} final KStream<String, Aggregate> aggregates = builder.<String, A>stream("ST1") .groupByKey() .reduce((prev, next) -> prev) .mapValues((AuthenticationHandshake value) -> { // some custom mapping }).toStream(); final KStream<String, B> bStream = builder.<String, B>stream("ST2").groupByKey().reduce(((prev, next) -> next)).toStream(); final KStream<String, C> cStream = builder.stream("ST3"); final KStream<String, D> dStream = builder.stream("ST4"); final KTable<String, Aggregate> aggregateTable = aggregates .leftJoin(bStream, ExampleAggregator::joinB, joinWindow) .leftJoin(cStream, ExampleAggregator::joinC, joinWindow) .leftJoin(dStream, ExampleAggregator::joinD, dJoinWindow) .groupByKey() .reduce((previousValue, newValue) -> newValue); aggregateTable.to("DT1"); private static Aggregate joinB(final Aggregate aggregate, final B b) { LOGGER.info("Joining b: {}", b); aggregate.setB(b); return aggregate; } private static Aggregate joinC(final Aggregate aggregate, final C c) { LOGGER.info("Joining c: {}", c); aggregate.setC(c); return aggregate; } private static Aggregate joinD(final Aggregate aggregate, final D d) { LOGGER.info("Joining d: {}", d); aggregate.setD(d); return aggregate; } {code} Pipeline B) {code} // note that the DT1 from PipelineA is an input topic for this pipeline final KStream<String, Aggregate> aggregates = builder.stream(DT1); final KTable<String, Aggreagate2> aggregatesTable = aggregates .filter((key, val) -> // custom filterint (non-null / non-empty)) .groupBy((key, val) -> val.somePropIdentifier()) .aggregate(() -> new Aggreagate2("", new ArrayList<>(64)), (key, val, aggregate2Val) -> customAggregation(...), aggregateSpecificAvroSerde ); aggregatesTable.toStream() .map((window, aggregate2) -> new KeyValue<>(aggregate2.somePropertyKey(), aggregate2)) .groupByKey() .reduce((old, _new) -> _new) .to(Serdes.String(), aggregateSpecificAvroSerde, DT2); {code} > Kafka Streams memory usage grows over the time till OOM > ------------------------------------------------------- > > Key: KAFKA-6892 > URL: https://issues.apache.org/jira/browse/KAFKA-6892 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.0 > Reporter: Dawid Kulig > Priority: Minor > Attachments: kafka-streams-per-pod-resources-usage.png > > > Hi. I am observing indefinite memory growth of my kafka-streams application. > It gets killed by the OS when reaching the memory limit (10gb). > It's running two unrelated pipelines (read from 4 source topics - 100 > partitions each - aggregate data and write to two destination topics) > My environment: > * Kubernetes cluster > * 4 app instances > * 10GB memory limit per pod (instance) > * JRE 8 > JVM / Streams app: > * -Xms2g > * -Xmx4g > * num.stream.threads = 4 > * commit.interval.ms = 1000 > * linger.ms = 1000 > > When my app is running for 24hours it reaches 10GB memory limit. Heap and GC > looks good, non-heap avg memory usage is 120MB. I've read it might be related > to the RocksDB that works underneath streams app, however I tried to tune it > using [confluent > doc|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config] > however with no luck. > RocksDB config #1: > {code:java} > tableConfig.setBlockCacheSize(16 * 1024 * 1024L); > tableConfig.setBlockSize(16 * 1024L); > tableConfig.setCacheIndexAndFilterBlocks(true); > options.setTableFormatConfig(tableConfig); > options.setMaxWriteBufferNumber(2);{code} > RocksDB config #2 > {code:java} > tableConfig.setBlockCacheSize(1024 * 1024L); > tableConfig.setBlockSize(16 * 1024L); > tableConfig.setCacheIndexAndFilterBlocks(true); > options.setTableFormatConfig(tableConfig); > options.setMaxWriteBufferNumber(2); > options.setWriteBufferSize(8 * 1024L);{code} > > This behavior has only been observed with our production traffic, where per > topic input message rate is 10msg/sec and is pretty much constant (no peaks). > I am attaching cluster resources usage from last 24h. > Any help or advice would be much appreciated. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)