Glad that helped!

Honestly I can't say I recognize either that exception but I'm fairly
confident it's
not directly related to rocksdb or Streams. It sounds like a connection
somewhere
got screwed up, which would be more of a configuration issue I probably
can't help with.

Of course it's always possible rocksdb is doing something weird that we're
unaware of.
If you revert the changes you just made and don't see these issues, I'd say
try continuing
to use your own rocksdb config setter and/or reset the app (including
clearing local state).

Cheers,
Sophie

On Mon, Oct 14, 2019 at 4:35 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote:

> Hi Sophie,
>
> Thanks for the information! After setting cache.max.bytes.buffering to
> zero, commenting out my customized rocksDB config and using default
> rocksDB config, I do see the repartition topic bytesout goes up. But I
> noticed that some nodes have IO exception as: An I/O error has
> occurred while writing a response message entity to the container
> output
> stream.(org.glassfish.jersey.server.internal.process.MappableException:
> org.apache.catalina.connector.ClientAbortException:
> java.io.IOException: Broken pipe). Is is also related to the rocksDB
> read and write? Anything I should do to get rid of this exception?
>
> Thanks a lot!
>
> On Mon, Oct 14, 2019 at 6:10 PM Sophie Blee-Goldman <sop...@confluent.io>
> wrote:
> >
> > Ah ok, 2.3.0 has a known performance issue in the caching layer which
> tends
> > to get worse the larger the cache size. That might explain what you're
> > seeing with
> > regards to the traffic correlation.
> >
> > It's fixed in 2.3.1 which should be released very soon, but until then
> you
> > might want
> > to try turning off the Streams cache (by setting
> cache.max.bytes.buffering
> > to zero,
> > although you can also use .withCachingDisabled to turn it off only for
> the
> > large/problem
> > store)
> >
> > Of course any memory you reclaim by turning off the Streams cache can
> just
> > go towards the
> > rocksdb cache instead, just note that the rocksdb cache comes from
> off-heap
> > memory
> > while the Streams cache would be taken from heap memory allocated to the
> > jvm.
> >
> > On Mon, Oct 14, 2019 at 2:48 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote:
> >
> > > Hi Sophie,
> > >
> > > Thanks for the help!
> > > I'm using version 2.3.0.
> > >
> > > The repartition topic with huge lag is the created during the first
> > > reduce method, named "XX-KSTREAM-STATE-STORE-0030-repartition". All
> > > other internal topics have almost zero lags. For my case, how could I
> > > find out if rocksDB causes the lags? One thing I noticed is, when the
> > > source traffic is about 30K/sec, I don't have any lags for the entire
> > > system but when the traffic goes up to 100K/sec, it has a huge lag. As
> > > you mentioned, if the memory usage is high, should I set any rocksDB
> > > memory related config to higher value? Thanks a lot!
> > > My topology is like below:
> > >
> > > final KStream<String, byte[]> source = builder.stream(inputTopic);
> > > KStream<String, Event> deserializedStream = source.mapValues( ... });
> > >
> > > KStream<Windowed<String>, Event> dedupedStream =
> > > deserializedStream.selectKey( ... )
> > > .groupByKey(Grouped.with(Serdes.String(), new
> JsonSerde<>(Event.class)))
> > >
> .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
> > > .reduce((value1, value2) -> value2)
> > > .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > .toStream();
> > >
> > > dedupedStream.selectKey( ... )
> > > .mapValues( ... )
> > > .filter(...)
> > > .groupByKey(Grouped.with(Serdes.String(), new MessagetSerde()))
> > > .reduce((value1, value2) -> {
> > >     long count1 = value1.getCount();
> > >     long count2 = value2.getCount();
> > >     value2.setCount(count1 + count2);
> > >     return value2;
> > > }
> > > )
> > > .toStream()
> > > .selectKey( ... )
> > > .to(outputTopic);
> > >
> > > On Mon, Oct 14, 2019 at 3:53 PM Sophie Blee-Goldman <
> sop...@confluent.io>
> > > wrote:
> > > >
> > > > Out of curiosity, which version are you using?
> > > >
> > > > There's nothing that really jumps out at me as problematic in your
> > > > RocksDBConfigSetter, but note that I think you may need to increase
> > > > the number of threads in the "LOW priority thread pool" in addition
> to
> > > > setting the maxBackgroundCompactions -- this can be done as
> > > >
> > > > options.setEnv(Env.getDefault().setBackgroundThreads(n,
> > > > Env.COMPACTION_POOL));
> > > >
> > > > Is your disk throughput possibly the bottleneck? Note that if the
> > > > repartition topic
> > > > is followed by a subtopology doing heavy processing this will likely
> show
> > > > up as
> > > > lag like you describe. Also, if you have a large number of stateful
> tasks
> > > > (large
> > > > number of stateful operations, and/or large number of partitions)
> each
> > > one
> > > > will
> > > > have its own separate rocksdb instance, and the memory usage could be
> > > quite
> > > > high (which can cause rocks to page in/out things like index blocks
> which
> > > > always
> > > > need to be read before a lookup) -- I'd recommend also setting
> > > >
> > > > tableConfig.setPinL0FilterAndIndexBlocksInCache(true);
> > > >
> > > >
> > > > On Sun, Oct 13, 2019 at 6:40 PM Xiyuan Hu <xiyuan.h...@gmail.com>
> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I'm running a Kafka Streams app with windowing function. I noticed
> > > > > that internal topic -repartition has huge lag while the system CPU
> > > > > usage is low and app is stable(join rate is almost 0).
> > > > >
> > > > > The repartition topic is an internal topic and created by the
> > > > > application automatically. The bytes in per sec for this topic is
> > > > > about 65MB/sec while the bytes out for this topic is only
> 15MB/sec. I
> > > > > have tried a couple configs to customize RocksDB config, but none
> of
> > > > > it could increase the bytes out value.
> > > > >
> > > > > I changed the default RocksDB block case size to 125MB and block
> size
> > > > > to 125MB as well. Also set the max write buffer number to 3. But it
> > > > > didn't help.
> > > > >
> > > > > May I know what I missed here? What's the best way to find why
> > > > > internal repartition topic has huge lags?
> > > > >
> > > > > Thanks for all the helps!!
> > > > >
> > > > > My RocksDB config:
> > > > > public static class CustomRocksDBConfig implements
> RocksDBConfigSetter
> > > {
> > > > >     private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(125
> *
> > > > > 1024L * 1024L);
> > > > >
> > > > >     @Override
> > > > >     public void setConfig(final String storeName, final Options
> > > > > options, final Map<String, Object> configs) {
> > > > >         int n = Runtime.getRuntime().availableProcessors();
> > > > >         options.setMaxBackgroundCompactions(n);
> > > > >         options.setWriteBufferSize(125 * 1024 * 1024);
> > > > >         BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
> > > > > options.tableFormatConfig();
> > > > >         tableConfig.setBlockCache(cache);
> > > > >         tableConfig.setBlockCacheSize(125 * 1024 * 1024L);
> > > > >         tableConfig.setBlockSize(125 * 1024L);
> > > > >         tableConfig.setCacheIndexAndFilterBlocks(true);
> > > > >         options.setTableFormatConfig(tableConfig);
> > > > >         options.setMaxWriteBufferNumber(3);
> > > > >     }
> > > > >
> > > > >     public void close(final String storeName, final Options
> options) {
> > > > >         // See #5 below.
> > > > >         cache.close();
> > > > >     }
> > > > >
> > > > > }
> > > > >
> > > > > Thanks
> > > > > Kathy
> > > > >
> > >
>

Reply via email to