[ 
https://issues.apache.org/jira/browse/KAFKA-4799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16139456#comment-16139456
 ] 

Guozhang Wang commented on KAFKA-4799:
--------------------------------------

Hi [~jacobg] This issue has been fixed in 0.11.0.1. Could you try it out and 
validate (you would not need to upgrade your brokers in order to upgrade the 
streams client as long as brokers are already on 0.10.1+)

> session timeout during event processing shuts down stream
> ---------------------------------------------------------
>
>                 Key: KAFKA-4799
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4799
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.1
>         Environment: kafka streams client running on os x, with docker 
> machine running broker
>            Reporter: Jacob Gur
>            Priority: Critical
>
> I have a simple stream application like this:
> {code:title=Part of my class|borderStyle=solid}
>       private <T> IConsumerSubscription buildSubscriptionStream(
>                       Class<T> clazz, Consumer<T> consumer, String group,
>                       Function<KStreamBuilder, KStream<String, String>> 
> topicStreamFunc)
>       {
>               KStreamBuilder builder = new KStreamBuilder();
>               KStream<String, String> stream = topicStreamFunc.apply(builder);
>               stream.foreach((k, v) -> {
>                       try {
>                               T value = 
> _jsonObjectMapper.mapFromJsonString(v, clazz);
>                               consumer.accept(value);
>                               Logger.trace("Consumed message {}", value);
>                       } catch (Throwable th) {
>                               Logger.warn("Error while consuming message", 
> th);
>                       }
>               });
>               final KafkaStreams streams = new KafkaStreams(builder, 
> constructProperties(group));
>               streams.start();
>               return streams::close;
>       }
> {code}
> There is just one client running this application stream.
> If I run the client in a debugger with a breakpoint on the event processor 
> (i.e., inside the foreach lambda) with debugger suspending all threads for 
> perhaps more than 10 seconds, then when I resume the application:
> Actual behavior - the stream shuts down
> Expected behavior - the stream should recover, perhaps temporarily removed 
> from partition but then re-added and recovered.
> It looks like what happens is this:
> 1) The kafka client session times out.
> 2) The partition is revoked
> 3) The streams library has a rebalance listener that tries to commit offsets, 
> but that commit fails due to a rebalance exception.
> 4) Stream shuts down.
> Steps 3 and 4 occur in StreamThread's rebalance listener.
> It seems that it should be more resilient and recover just like a regular 
> KafkaConsumer would. Its partition would be revoked, and then it would get it 
> back again and resume processing at the last offset.
> Is current behavior expected and I'm not understanding the intention? Or is 
> this a bug?
> Thanks!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to