Did you check the logs? Maybe you need to increase log level to DEBUG to get some more information.
Did you double check committed offsets via bin/kafka-consumer-groups.sh? -Matthias On 4/28/17 9:22 AM, João Peixoto wrote: > My stream gets stale after a while and it simply does not receive any new > messages, aka does not poll. > > I'm using Kafka Streams 0.10.2.1 (same happens with 0.10.2.0) and the > brokers are running 0.10.1.1. > > The stream state is RUNNING and there are no exceptions in the logs. > > Looking at the JMX metrics, the threads are there and running, just not > doing anything. > The metric "consumer-coordinator-metrics > heartbeat-response-time-max" > (The max time taken to receive a response to a heartbeat request) reads > 43,361 seconds (almost 12 hours) which is consistent with the time of the > hang. Shouldn't this trigger a failure somehow? > > The stream configuration looks something like this: > > Properties props = new Properties(); > props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > CustomTimestampExtractor.class.getName()); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName); > props.put(StreamsConfig.CLIENT_ID_CONFIG, streamName); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > myConfig.getBrokerList()); > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.ByteArray().getClass().getName()); > props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > myConfig.getCommitIntervalMs()); // 5000 > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, > myConfig.getStreamThreadsCount()); // 1 > props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > myConfig.getMaxCacheBytes()); // 524_288_000L > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); > > The stream LEFT JOINs 2 topics, one of them being a KTable, and outputs to > another topic. > > Thanks in advance for the help! >
signature.asc
Description: OpenPGP digital signature