Out of curiosity, which version are you using? There's nothing that really jumps out at me as problematic in your RocksDBConfigSetter, but note that I think you may need to increase the number of threads in the "LOW priority thread pool" in addition to setting the maxBackgroundCompactions -- this can be done as
options.setEnv(Env.getDefault().setBackgroundThreads(n, Env.COMPACTION_POOL)); Is your disk throughput possibly the bottleneck? Note that if the repartition topic is followed by a subtopology doing heavy processing this will likely show up as lag like you describe. Also, if you have a large number of stateful tasks (large number of stateful operations, and/or large number of partitions) each one will have its own separate rocksdb instance, and the memory usage could be quite high (which can cause rocks to page in/out things like index blocks which always need to be read before a lookup) -- I'd recommend also setting tableConfig.setPinL0FilterAndIndexBlocksInCache(true); On Sun, Oct 13, 2019 at 6:40 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote: > Hi, > > I'm running a Kafka Streams app with windowing function. I noticed > that internal topic -repartition has huge lag while the system CPU > usage is low and app is stable(join rate is almost 0). > > The repartition topic is an internal topic and created by the > application automatically. The bytes in per sec for this topic is > about 65MB/sec while the bytes out for this topic is only 15MB/sec. I > have tried a couple configs to customize RocksDB config, but none of > it could increase the bytes out value. > > I changed the default RocksDB block case size to 125MB and block size > to 125MB as well. Also set the max write buffer number to 3. But it > didn't help. > > May I know what I missed here? What's the best way to find why > internal repartition topic has huge lags? > > Thanks for all the helps!! > > My RocksDB config: > public static class CustomRocksDBConfig implements RocksDBConfigSetter { > private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(125 * > 1024L * 1024L); > > @Override > public void setConfig(final String storeName, final Options > options, final Map<String, Object> configs) { > int n = Runtime.getRuntime().availableProcessors(); > options.setMaxBackgroundCompactions(n); > options.setWriteBufferSize(125 * 1024 * 1024); > BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) > options.tableFormatConfig(); > tableConfig.setBlockCache(cache); > tableConfig.setBlockCacheSize(125 * 1024 * 1024L); > tableConfig.setBlockSize(125 * 1024L); > tableConfig.setCacheIndexAndFilterBlocks(true); > options.setTableFormatConfig(tableConfig); > options.setMaxWriteBufferNumber(3); > } > > public void close(final String storeName, final Options options) { > // See #5 below. > cache.close(); > } > > } > > Thanks > Kathy >