Out of curiosity, which version are you using?

There's nothing that really jumps out at me as problematic in your
RocksDBConfigSetter, but note that I think you may need to increase
the number of threads in the "LOW priority thread pool" in addition to
setting the maxBackgroundCompactions -- this can be done as

options.setEnv(Env.getDefault().setBackgroundThreads(n,
Env.COMPACTION_POOL));

Is your disk throughput possibly the bottleneck? Note that if the
repartition topic
is followed by a subtopology doing heavy processing this will likely show
up as
lag like you describe. Also, if you have a large number of stateful tasks
(large
number of stateful operations, and/or large number of partitions) each one
will
have its own separate rocksdb instance, and the memory usage could be quite
high (which can cause rocks to page in/out things like index blocks which
always
need to be read before a lookup) -- I'd recommend also setting

tableConfig.setPinL0FilterAndIndexBlocksInCache(true);


On Sun, Oct 13, 2019 at 6:40 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote:

> Hi,
>
> I'm running a Kafka Streams app with windowing function. I noticed
> that internal topic -repartition has huge lag while the system CPU
> usage is low and app is stable(join rate is almost 0).
>
> The repartition topic is an internal topic and created by the
> application automatically. The bytes in per sec for this topic is
> about 65MB/sec while the bytes out for this topic is only 15MB/sec. I
> have tried a couple configs to customize RocksDB config, but none of
> it could increase the bytes out value.
>
> I changed the default RocksDB block case size to 125MB and block size
> to 125MB as well. Also set the max write buffer number to 3. But it
> didn't help.
>
> May I know what I missed here? What's the best way to find why
> internal repartition topic has huge lags?
>
> Thanks for all the helps!!
>
> My RocksDB config:
> public static class CustomRocksDBConfig implements RocksDBConfigSetter {
>     private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(125 *
> 1024L * 1024L);
>
>     @Override
>     public void setConfig(final String storeName, final Options
> options, final Map<String, Object> configs) {
>         int n = Runtime.getRuntime().availableProcessors();
>         options.setMaxBackgroundCompactions(n);
>         options.setWriteBufferSize(125 * 1024 * 1024);
>         BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
> options.tableFormatConfig();
>         tableConfig.setBlockCache(cache);
>         tableConfig.setBlockCacheSize(125 * 1024 * 1024L);
>         tableConfig.setBlockSize(125 * 1024L);
>         tableConfig.setCacheIndexAndFilterBlocks(true);
>         options.setTableFormatConfig(tableConfig);
>         options.setMaxWriteBufferNumber(3);
>     }
>
>     public void close(final String storeName, final Options options) {
>         // See #5 below.
>         cache.close();
>     }
>
> }
>
> Thanks
> Kathy
>

Reply via email to