Hi there, Thanks for double checking. Does RocksDB actually crash or produce a crash dump? I’m curious how you know that the issue is https://github.com/facebook/rocksdb/issues/1121 <https://github.com/facebook/rocksdb/issues/1121>, so just double checking with you.
If that’s indeed the case, do you mind opening a JIRA (a copy-paste of the below should suffice)? Alternatively let us know and we’ll open it. Sounds like we should handle this better. Thanks, Eno > On May 3, 2017, at 5:49 AM, João Peixoto <joao.harti...@gmail.com> wrote: > > I believe I found the root cause of my problem. I seem to have hit this > RocksDB bug https://github.com/facebook/rocksdb/issues/1121 > > On my stream configuration I have a custom transformer used for > deduplicating records, highly inspired in the > EventDeduplicationLambdaIntegrationTest > <https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java#L161> > but > adjusted to my use case, special emphasis on the "punctuate" method. > > All the stale instances had the main stream thread "RUNNING" the > "punctuate" method of this transformer, which in term was running RocksDB > "seekToFirst". > > Also during my debugging one such instance finished the "punctuate" method, > which took ~11h, exactly the time the instance was stuck for. > Changing the backing state store from "persistent" to "inMemory" solved my > issue, at least after several days running, no stuck instances. > > This leads me to ask, shouldn't Kafka detect such a situation fairly > quickly? Instead of just stopping polling? My guess is that the heartbeat > thread which now is separate continues working fine, since by definition > the stream runs a message through the whole pipeline this step probably > just looked like it was VERY slow. Not sure what the best approach here > would be. > > PS The linked code clearly states "This code is for demonstration purposes > and was not tested for production usage" so that's on me > > On Tue, May 2, 2017 at 11:20 AM Matthias J. Sax <matth...@confluent.io> > wrote: > >> Did you check the logs? Maybe you need to increase log level to DEBUG to >> get some more information. >> >> Did you double check committed offsets via bin/kafka-consumer-groups.sh? >> >> -Matthias >> >> On 4/28/17 9:22 AM, João Peixoto wrote: >>> My stream gets stale after a while and it simply does not receive any new >>> messages, aka does not poll. >>> >>> I'm using Kafka Streams 0.10.2.1 (same happens with 0.10.2.0) and the >>> brokers are running 0.10.1.1. >>> >>> The stream state is RUNNING and there are no exceptions in the logs. >>> >>> Looking at the JMX metrics, the threads are there and running, just not >>> doing anything. >>> The metric "consumer-coordinator-metrics > heartbeat-response-time-max" >>> (The max time taken to receive a response to a heartbeat request) reads >>> 43,361 seconds (almost 12 hours) which is consistent with the time of the >>> hang. Shouldn't this trigger a failure somehow? >>> >>> The stream configuration looks something like this: >>> >>> Properties props = new Properties(); >>> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, >>> CustomTimestampExtractor.class.getName()); >>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName); >>> props.put(StreamsConfig.CLIENT_ID_CONFIG, streamName); >>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >>> myConfig.getBrokerList()); >>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >>> Serdes.String().getClass().getName()); >>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >>> Serdes.ByteArray().getClass().getName()); >>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, >>> myConfig.getCommitIntervalMs()); // 5000 >>> props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); >>> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, >>> myConfig.getStreamThreadsCount()); // 1 >>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, >>> myConfig.getMaxCacheBytes()); // 524_288_000L >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); >>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); >>> >>> The stream LEFT JOINs 2 topics, one of them being a KTable, and outputs >> to >>> another topic. >>> >>> Thanks in advance for the help! >>> >> >>