I believe I found the root cause of my problem. I seem to have hit this
RocksDB bug https://github.com/facebook/rocksdb/issues/1121

On my stream configuration I have a custom transformer used for
deduplicating records, highly inspired in the
EventDeduplicationLambdaIntegrationTest
<https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java#L161>
but
adjusted to my use case, special emphasis on the "punctuate" method.

All the stale instances had the main stream thread "RUNNING" the
"punctuate" method of this transformer, which in term was running RocksDB
"seekToFirst".

Also during my debugging one such instance finished the "punctuate" method,
which took ~11h, exactly the time the instance was stuck for.
Changing the backing state store from "persistent" to "inMemory" solved my
issue, at least after several days running, no stuck instances.

This leads me to ask, shouldn't Kafka detect such a situation fairly
quickly? Instead of just stopping polling? My guess is that the heartbeat
thread which now is separate continues working fine, since by definition
the stream runs a message through the whole pipeline this step probably
just looked like it was VERY slow. Not sure what the best approach here
would be.

PS The linked code clearly states "This code is for demonstration purposes
and was not tested for production usage" so that's on me

On Tue, May 2, 2017 at 11:20 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> Did you check the logs? Maybe you need to increase log level to DEBUG to
> get some more information.
>
> Did you double check committed offsets via bin/kafka-consumer-groups.sh?
>
> -Matthias
>
> On 4/28/17 9:22 AM, João Peixoto wrote:
> > My stream gets stale after a while and it simply does not receive any new
> > messages, aka does not poll.
> >
> > I'm using Kafka Streams 0.10.2.1 (same happens with 0.10.2.0) and the
> > brokers are running 0.10.1.1.
> >
> > The stream state is RUNNING and there are no exceptions in the logs.
> >
> > Looking at the JMX metrics, the threads are there and running, just not
> > doing anything.
> > The metric "consumer-coordinator-metrics > heartbeat-response-time-max"
> > (The max time taken to receive a response to a heartbeat request) reads
> > 43,361 seconds (almost 12 hours) which is consistent with the time of the
> > hang. Shouldn't this trigger a failure somehow?
> >
> > The stream configuration looks something like this:
> >
> > Properties props = new Properties();
> >     props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> >               CustomTimestampExtractor.class.getName());
> >     props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName);
> >     props.put(StreamsConfig.CLIENT_ID_CONFIG, streamName);
> >     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > myConfig.getBrokerList());
> >     props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass().getName());
> >     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.ByteArray().getClass().getName());
> >     props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> > myConfig.getCommitIntervalMs()); // 5000
> >     props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
> >     props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
> > myConfig.getStreamThreadsCount()); // 1
> >     props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> > myConfig.getMaxCacheBytes()); // 524_288_000L
> >     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> >     props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
> >
> > The stream LEFT JOINs 2 topics, one of them being a KTable, and outputs
> to
> > another topic.
> >
> > Thanks in advance for the help!
> >
>
>

Reply via email to