[ 
https://issues.apache.org/jira/browse/KAFKA-6892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16471647#comment-16471647
 ] 

Dawid Kulig edited comment on KAFKA-6892 at 5/11/18 8:33 AM:
-------------------------------------------------------------

[~mjsax] Thank you for your comment. Shame on me but I must have missed that 
capacity planning guide. However having look on that guide I'd say we should 
fit in the resources we currently have. 

Speaking about topology, please see below:

We have 4 source topics: [ST1, ST2, ST3, ST4], messages with a string key and 2 
destination topics : [DT1, DT2]

Pipieline A)

{code}
final KStream<String, Aggregate> aggregates = builder.<String, A>stream("ST1")
        .groupByKey()
        .reduce((prev, next) -> prev)
        .mapValues((AuthenticationHandshake value) -> {
                // some custom mapping
        }).toStream();

final KStream<String, B> bStream = builder.<String, 
B>stream("ST2").groupByKey().reduce(((prev, next) -> next)).toStream();
final KStream<String, C> cStream = builder.stream("ST3");
final KStream<String, D> dStream = builder.stream("ST4");

final KTable<String, Aggregate> aggregateTable = aggregates
        .leftJoin(bStream, ExampleAggregator::joinB, joinWindow)
        .leftJoin(cStream, ExampleAggregator::joinC, joinWindow)
        .leftJoin(dStream, ExampleAggregator::joinD, dJoinWindow)
        .groupByKey()
        .reduce((previousValue, newValue) -> newValue);
aggregateTable.to("DT1");


private static Aggregate joinB(final Aggregate aggregate, final B b) {
    LOGGER.info("Joining b: {}", b);
    aggregate.setB(b);
    return aggregate;
}

private static Aggregate joinC(final Aggregate aggregate, final C c) {
    LOGGER.info("Joining c: {}", c);
    aggregate.setC(c);
    return aggregate;
}

private static Aggregate joinD(final Aggregate aggregate, final D d) {
    LOGGER.info("Joining d: {}", d);
    aggregate.setD(d);
    return aggregate;
}
{code}

Pipeline B)
{code}
// note that the DT1 from PipelineA is an input topic for this pipeline
final KStream<String, Aggregate> aggregates = builder.stream(DT1);

final KTable<String, Aggreagate2> aggregatesTable = aggregates
    .filter((key, val) -> // custom filterint (non-null / non-empty))
    .groupBy((key, val) -> val.somePropIdentifier())
    .aggregate(() -> new Aggreagate2("", new ArrayList<>(64)),
            (key, val, aggregate2Val) -> customAggregation(...),
            aggregateSpecificAvroSerde
    );

aggregatesTable.toStream()
    .map((window, aggregate2) -> new KeyValue<>(aggregate2.somePropertyKey(), 
aggregate2))
    .groupByKey()
    .reduce((old, _new) -> _new)
    .to(Serdes.String(), aggregateSpecificAvroSerde, "DT2");
{code}




was (Author: dkulig):
[~mjsax] Thank you for your comment. Shame on me but I must have missed that 
capacity planning guide. However having look on that guide I'd say we should 
fit in the resources we currently have. 

Speaking about topology, please see below:

We have 4 source topics: [ST1, ST2, ST3, ST4], messages with a string key and 2 
destination topics : [DT1, DT2]

Pipieline A)

{code}
final KStream<String, Aggregate> aggregates = builder.<String, A>stream("ST1")
        .groupByKey()
        .reduce((prev, next) -> prev)
        .mapValues((AuthenticationHandshake value) -> {
                // some custom mapping
        }).toStream();

final KStream<String, B> bStream = builder.<String, 
B>stream("ST2").groupByKey().reduce(((prev, next) -> next)).toStream();
final KStream<String, C> cStream = builder.stream("ST3");
final KStream<String, D> dStream = builder.stream("ST4");

final KTable<String, Aggregate> aggregateTable = aggregates
        .leftJoin(bStream, ExampleAggregator::joinB, joinWindow)
        .leftJoin(cStream, ExampleAggregator::joinC, joinWindow)
        .leftJoin(dStream, ExampleAggregator::joinD, dJoinWindow)
        .groupByKey()
        .reduce((previousValue, newValue) -> newValue);
aggregateTable.to("DT1");


private static Aggregate joinB(final Aggregate aggregate, final B b) {
    LOGGER.info("Joining b: {}", b);
    aggregate.setB(b);
    return aggregate;
}

private static Aggregate joinC(final Aggregate aggregate, final C c) {
    LOGGER.info("Joining c: {}", c);
    aggregate.setC(c);
    return aggregate;
}

private static Aggregate joinD(final Aggregate aggregate, final D d) {
    LOGGER.info("Joining d: {}", d);
    aggregate.setD(d);
    return aggregate;
}
{code}

Pipeline B)
{code}
// note that the DT1 from PipelineA is an input topic for this pipeline
final KStream<String, Aggregate> aggregates = builder.stream(DT1);

final KTable<String, Aggreagate2> aggregatesTable = aggregates
    .filter((key, val) -> // custom filterint (non-null / non-empty))
    .groupBy((key, val) -> val.somePropIdentifier())
    .aggregate(() -> new Aggreagate2("", new ArrayList<>(64)),
            (key, val, aggregate2Val) -> customAggregation(...),
            aggregateSpecificAvroSerde
    );

aggregatesTable.toStream()
    .map((window, aggregate2) -> new KeyValue<>(aggregate2.somePropertyKey(), 
aggregate2))
    .groupByKey()
    .reduce((old, _new) -> _new)
    .to(Serdes.String(), aggregateSpecificAvroSerde, DT2);
{code}



> Kafka Streams memory usage grows over the time till OOM
> -------------------------------------------------------
>
>                 Key: KAFKA-6892
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6892
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Dawid Kulig
>            Priority: Minor
>         Attachments: kafka-streams-per-pod-resources-usage.png
>
>
> Hi. I am observing indefinite memory growth of my kafka-streams application. 
> It gets killed by the OS when reaching the memory limit (10gb). 
> It's running two unrelated pipelines (read from 4 source topics - 100 
> partitions each - aggregate data and write to two destination topics) 
> My environment: 
>  * Kubernetes cluster
>  * 4 app instances
>  * 10GB memory limit per pod (instance)
>  * JRE 8
> JVM / Streams app:
>  * -Xms2g
>  * -Xmx4g
>  * num.stream.threads = 4
>  * commit.interval.ms = 1000
>  * linger.ms = 1000
>  
> When my app is running for 24hours it reaches 10GB memory limit. Heap and GC 
> looks good, non-heap avg memory usage is 120MB. I've read it might be related 
> to the RocksDB that works underneath streams app, however I tried to tune it 
> using [confluent 
> doc|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config]
>  however with no luck. 
> RocksDB config #1:
> {code:java}
> tableConfig.setBlockCacheSize(16 * 1024 * 1024L);
> tableConfig.setBlockSize(16 * 1024L);
> tableConfig.setCacheIndexAndFilterBlocks(true);
> options.setTableFormatConfig(tableConfig);
> options.setMaxWriteBufferNumber(2);{code}
> RocksDB config #2 
> {code:java}
> tableConfig.setBlockCacheSize(1024 * 1024L);
> tableConfig.setBlockSize(16 * 1024L);
> tableConfig.setCacheIndexAndFilterBlocks(true);
> options.setTableFormatConfig(tableConfig);
> options.setMaxWriteBufferNumber(2);
> options.setWriteBufferSize(8 * 1024L);{code}
>  
> This behavior has only been observed with our production traffic, where per 
> topic input message rate is 10msg/sec and is pretty much constant (no peaks). 
> I am attaching cluster resources usage from last 24h.
> Any help or advice would be much appreciated. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to