Ah ok, 2.3.0 has a known performance issue in the caching layer which tends
to get worse the larger the cache size. That might explain what you're
seeing with
regards to the traffic correlation.

It's fixed in 2.3.1 which should be released very soon, but until then you
might want
to try turning off the Streams cache (by setting cache.max.bytes.buffering
to zero,
although you can also use .withCachingDisabled to turn it off only for the
large/problem
store)

Of course any memory you reclaim by turning off the Streams cache can just
go towards the
rocksdb cache instead, just note that the rocksdb cache comes from off-heap
memory
while the Streams cache would be taken from heap memory allocated to the
jvm.

On Mon, Oct 14, 2019 at 2:48 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote:

> Hi Sophie,
>
> Thanks for the help!
> I'm using version 2.3.0.
>
> The repartition topic with huge lag is the created during the first
> reduce method, named "XX-KSTREAM-STATE-STORE-0030-repartition". All
> other internal topics have almost zero lags. For my case, how could I
> find out if rocksDB causes the lags? One thing I noticed is, when the
> source traffic is about 30K/sec, I don't have any lags for the entire
> system but when the traffic goes up to 100K/sec, it has a huge lag. As
> you mentioned, if the memory usage is high, should I set any rocksDB
> memory related config to higher value? Thanks a lot!
> My topology is like below:
>
> final KStream<String, byte[]> source = builder.stream(inputTopic);
> KStream<String, Event> deserializedStream = source.mapValues( ... });
>
> KStream<Windowed<String>, Event> dedupedStream =
> deserializedStream.selectKey( ... )
> .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(Event.class)))
> .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
> .reduce((value1, value2) -> value2)
> .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> .toStream();
>
> dedupedStream.selectKey( ... )
> .mapValues( ... )
> .filter(...)
> .groupByKey(Grouped.with(Serdes.String(), new MessagetSerde()))
> .reduce((value1, value2) -> {
>     long count1 = value1.getCount();
>     long count2 = value2.getCount();
>     value2.setCount(count1 + count2);
>     return value2;
> }
> )
> .toStream()
> .selectKey( ... )
> .to(outputTopic);
>
> On Mon, Oct 14, 2019 at 3:53 PM Sophie Blee-Goldman <sop...@confluent.io>
> wrote:
> >
> > Out of curiosity, which version are you using?
> >
> > There's nothing that really jumps out at me as problematic in your
> > RocksDBConfigSetter, but note that I think you may need to increase
> > the number of threads in the "LOW priority thread pool" in addition to
> > setting the maxBackgroundCompactions -- this can be done as
> >
> > options.setEnv(Env.getDefault().setBackgroundThreads(n,
> > Env.COMPACTION_POOL));
> >
> > Is your disk throughput possibly the bottleneck? Note that if the
> > repartition topic
> > is followed by a subtopology doing heavy processing this will likely show
> > up as
> > lag like you describe. Also, if you have a large number of stateful tasks
> > (large
> > number of stateful operations, and/or large number of partitions) each
> one
> > will
> > have its own separate rocksdb instance, and the memory usage could be
> quite
> > high (which can cause rocks to page in/out things like index blocks which
> > always
> > need to be read before a lookup) -- I'd recommend also setting
> >
> > tableConfig.setPinL0FilterAndIndexBlocksInCache(true);
> >
> >
> > On Sun, Oct 13, 2019 at 6:40 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > I'm running a Kafka Streams app with windowing function. I noticed
> > > that internal topic -repartition has huge lag while the system CPU
> > > usage is low and app is stable(join rate is almost 0).
> > >
> > > The repartition topic is an internal topic and created by the
> > > application automatically. The bytes in per sec for this topic is
> > > about 65MB/sec while the bytes out for this topic is only 15MB/sec. I
> > > have tried a couple configs to customize RocksDB config, but none of
> > > it could increase the bytes out value.
> > >
> > > I changed the default RocksDB block case size to 125MB and block size
> > > to 125MB as well. Also set the max write buffer number to 3. But it
> > > didn't help.
> > >
> > > May I know what I missed here? What's the best way to find why
> > > internal repartition topic has huge lags?
> > >
> > > Thanks for all the helps!!
> > >
> > > My RocksDB config:
> > > public static class CustomRocksDBConfig implements RocksDBConfigSetter
> {
> > >     private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(125 *
> > > 1024L * 1024L);
> > >
> > >     @Override
> > >     public void setConfig(final String storeName, final Options
> > > options, final Map<String, Object> configs) {
> > >         int n = Runtime.getRuntime().availableProcessors();
> > >         options.setMaxBackgroundCompactions(n);
> > >         options.setWriteBufferSize(125 * 1024 * 1024);
> > >         BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
> > > options.tableFormatConfig();
> > >         tableConfig.setBlockCache(cache);
> > >         tableConfig.setBlockCacheSize(125 * 1024 * 1024L);
> > >         tableConfig.setBlockSize(125 * 1024L);
> > >         tableConfig.setCacheIndexAndFilterBlocks(true);
> > >         options.setTableFormatConfig(tableConfig);
> > >         options.setMaxWriteBufferNumber(3);
> > >     }
> > >
> > >     public void close(final String storeName, final Options options) {
> > >         // See #5 below.
> > >         cache.close();
> > >     }
> > >
> > > }
> > >
> > > Thanks
> > > Kathy
> > >
>

Reply via email to