Hey, I am not against opening a JIRA, but I am wondering what we should describe/report there. If I understand the scenario correctly, João uses a custom RocksDB store and calls seek() in user code land. As it is a bug in RocksDB that seek takes so long, I am not sure what we could improve within Streams to prevent this? The only thing I am seeing right now is that we could reduce `max.poll.interval.ms` that we just increased to guard against failure for long stat recreation phases.
Any thoughts? -Matthias On 5/3/17 8:48 AM, João Peixoto wrote: > That'd be great as I'm not familiar with the protocol there > On Wed, May 3, 2017 at 8:41 AM Eno Thereska <eno.there...@gmail.com> wrote: > >> Cool, thanks, shall we open a JIRA? >> >> Eno >>> On 3 May 2017, at 16:16, João Peixoto <joao.harti...@gmail.com> wrote: >>> >>> Actually I need to apologize, I pasted the wrong issue, I meant to paste >>> https://github.com/facebook/rocksdb/issues/261. >>> >>> RocksDB did not produce a crash report since it didn't actually crash. I >>> performed thread dumps on stale and not-stale instances which revealed >> the >>> common behavior and I collect and plot several Kafka metrics, including >>> "punctuate" durations, therefore I know it took a long time and >> eventually >>> finished. >>> >>> Joao >>> >>> On Wed, May 3, 2017 at 6:22 AM Eno Thereska <eno.there...@gmail.com> >> wrote: >>> >>>> Hi there, >>>> >>>> Thanks for double checking. Does RocksDB actually crash or produce a >> crash >>>> dump? I’m curious how you know that the issue is >>>> https://github.com/facebook/rocksdb/issues/1121 < >>>> https://github.com/facebook/rocksdb/issues/1121>, so just double >> checking >>>> with you. >>>> >>>> If that’s indeed the case, do you mind opening a JIRA (a copy-paste of >> the >>>> below should suffice)? Alternatively let us know and we’ll open it. >> Sounds >>>> like we should handle this better. >>>> >>>> Thanks, >>>> Eno >>>> >>>> >>>>> On May 3, 2017, at 5:49 AM, João Peixoto <joao.harti...@gmail.com> >>>> wrote: >>>>> >>>>> I believe I found the root cause of my problem. I seem to have hit this >>>>> RocksDB bug https://github.com/facebook/rocksdb/issues/1121 >>>>> >>>>> On my stream configuration I have a custom transformer used for >>>>> deduplicating records, highly inspired in the >>>>> EventDeduplicationLambdaIntegrationTest >>>>> < >>>> >> https://github.com/confluentinc/examples/blob/3.2.x/kafka-streams/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java#L161 >>>>> >>>>> but >>>>> adjusted to my use case, special emphasis on the "punctuate" method. >>>>> >>>>> All the stale instances had the main stream thread "RUNNING" the >>>>> "punctuate" method of this transformer, which in term was running >> RocksDB >>>>> "seekToFirst". >>>>> >>>>> Also during my debugging one such instance finished the "punctuate" >>>> method, >>>>> which took ~11h, exactly the time the instance was stuck for. >>>>> Changing the backing state store from "persistent" to "inMemory" solved >>>> my >>>>> issue, at least after several days running, no stuck instances. >>>>> >>>>> This leads me to ask, shouldn't Kafka detect such a situation fairly >>>>> quickly? Instead of just stopping polling? My guess is that the >> heartbeat >>>>> thread which now is separate continues working fine, since by >> definition >>>>> the stream runs a message through the whole pipeline this step probably >>>>> just looked like it was VERY slow. Not sure what the best approach here >>>>> would be. >>>>> >>>>> PS The linked code clearly states "This code is for demonstration >>>> purposes >>>>> and was not tested for production usage" so that's on me >>>>> >>>>> On Tue, May 2, 2017 at 11:20 AM Matthias J. Sax <matth...@confluent.io >>> >>>>> wrote: >>>>> >>>>>> Did you check the logs? Maybe you need to increase log level to DEBUG >> to >>>>>> get some more information. >>>>>> >>>>>> Did you double check committed offsets via >> bin/kafka-consumer-groups.sh? >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 4/28/17 9:22 AM, João Peixoto wrote: >>>>>>> My stream gets stale after a while and it simply does not receive any >>>> new >>>>>>> messages, aka does not poll. >>>>>>> >>>>>>> I'm using Kafka Streams 0.10.2.1 (same happens with 0.10.2.0) and the >>>>>>> brokers are running 0.10.1.1. >>>>>>> >>>>>>> The stream state is RUNNING and there are no exceptions in the logs. >>>>>>> >>>>>>> Looking at the JMX metrics, the threads are there and running, just >> not >>>>>>> doing anything. >>>>>>> The metric "consumer-coordinator-metrics > >> heartbeat-response-time-max" >>>>>>> (The max time taken to receive a response to a heartbeat request) >> reads >>>>>>> 43,361 seconds (almost 12 hours) which is consistent with the time of >>>> the >>>>>>> hang. Shouldn't this trigger a failure somehow? >>>>>>> >>>>>>> The stream configuration looks something like this: >>>>>>> >>>>>>> Properties props = new Properties(); >>>>>>> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, >>>>>>> CustomTimestampExtractor.class.getName()); >>>>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName); >>>>>>> props.put(StreamsConfig.CLIENT_ID_CONFIG, streamName); >>>>>>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >>>>>>> myConfig.getBrokerList()); >>>>>>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >>>>>>> Serdes.String().getClass().getName()); >>>>>>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >>>>>>> Serdes.ByteArray().getClass().getName()); >>>>>>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, >>>>>>> myConfig.getCommitIntervalMs()); // 5000 >>>>>>> props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); >>>>>>> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, >>>>>>> myConfig.getStreamThreadsCount()); // 1 >>>>>>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, >>>>>>> myConfig.getMaxCacheBytes()); // 524_288_000L >>>>>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); >>>>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); >>>>>>> >>>>>>> The stream LEFT JOINs 2 topics, one of them being a KTable, and >> outputs >>>>>> to >>>>>>> another topic. >>>>>>> >>>>>>> Thanks in advance for the help! >>>>>>> >>>>>> >>>>>> >>>> >>>> >> >> >
signature.asc
Description: OpenPGP digital signature