Jacob Gur created KAFKA-4799: -------------------------------- Summary: session timeout during event processing shuts down stream Key: KAFKA-4799 URL: https://issues.apache.org/jira/browse/KAFKA-4799 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.1.1 Environment: kafka streams client running on os x, with docker machine running broker Reporter: Jacob Gur Priority: Critical
I have a simple stream application like this: {code:title=Part of my class|borderStyle=solid} private <T> IConsumerSubscription buildSubscriptionStream( Class<T> clazz, Consumer<T> consumer, String group, Function<KStreamBuilder, KStream<String, String>> topicStreamFunc) { KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> stream = topicStreamFunc.apply(builder); stream.foreach((k, v) -> { try { T value = _jsonObjectMapper.mapFromJsonString(v, clazz); consumer.accept(value); Logger.trace("Consumed message {}", value); } catch (Throwable th) { Logger.warn("Error while consuming message", th); } }); final KafkaStreams streams = new KafkaStreams(builder, constructProperties(group)); streams.start(); return streams::close; } {code} There is just one client running this application stream. If I run the client in a debugger with a breakpoint on the event processor (i.e., inside the foreach lambda) with debugger suspending all threads for perhaps more than 10 seconds, then when I resume the application: Actual behavior - the stream shuts down Expected behavior - the stream should recover, perhaps temporarily removed from partition but then re-added and recovered. It looks like what happens is this: 1) The kafka client session times out. 2) The partition is revoked 3) The streams library has a rebalance listener that tries to commit offsets, but that commit fails due to a rebalance exception. 4) Stream shuts down. Steps 3 and 4 occur in StreamThread's rebalance listener. It seems that it should be more resilient and recover just like a regular KafkaConsumer would. Its partition would be revoked, and then it would get it back again and resume processing at the last offset. Is current behavior expected and I'm not understanding the intention? Or is this a bug? Thanks! -- This message was sent by Atlassian JIRA (v6.3.15#6346)