[ https://issues.apache.org/jira/browse/KAFKA-8675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16891459#comment-16891459 ]
Guozhang Wang commented on KAFKA-8675: -------------------------------------- I'd agree with [~mjsax] here, in Streams upon shutting down we do not send leave.group immediately in case this is a transient failure to avoid unnecessary rebalances (though since 2.3.0 it is recommended to use KIP-345's static membership to tolerate transient failure). > "Main" consumers are not unsubsribed on KafkaStreams.close() > ------------------------------------------------------------ > > Key: KAFKA-8675 > URL: https://issues.apache.org/jira/browse/KAFKA-8675 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.2.1 > Reporter: Modestas Vainius > Priority: Major > > Hi! > It seems that {{KafkaStreams.close()}} never unsubscribes "main" kafka > consumers. As far as I can tell, > {{org.apache.kafka.streams.processor.internals.TaskManager#shutdown}} does > unsubscribe only {{restoreConsumer}}. This results into Kafka Group > coordinator having to throw away consumer from the consumer group in a > non-clean way. {{KafkaStreams.close()}} does {{close()}} those consumers but > it seems that is not enough for clean exit. > Kafka Streams connects to Kafka: > {code:java} > kafka | [2019-07-17 08:02:35,707] INFO [GroupCoordinator 1]: Preparing > to rebalance group 1-streams-test in state PreparingRebalance with old > generation 0 (__consumer_offsets-44) (reason: Adding new member > 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db) > (kafka.coordinator.group.GroupCoordinator) > kafka | [2019-07-17 08:02:35,717] INFO [GroupCoordinator 1]: > Stabilized group 1-streams-test generation 1 (__consumer_offsets-44) > (kafka.coordinator.group.GroupCoordinator) > kafka | [2019-07-17 08:02:35,730] INFO [GroupCoordinator 1]: > Assignment received from leader for group 1-streams-test for generation 1 > (kafka.coordinator.group.GroupCoordinator) > {code} > Processing finishes in 2 secs but after 10 seconds I see this in Kafka logs: > {code:java} > kafka | [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Member > 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db > in group 1-streams-test has failed, removing it from the group > (kafka.coordinator.group.GroupCoordinator) > kafka | [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Preparing > to rebalance group 1-streams-test in state PreparingRebalance with old > generation 1 (__consumer_offsets-44) (reason: removing member > 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db > on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator) > kafka | [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Group > 1-streams-test with generation 2 is now empty (__consumer_offsets-44) > (kafka.coordinator.group.GroupCoordinator) > {code} > Topology is kind of similar to [kafka testing > example|https://kafka.apache.org/22/documentation/streams/developer-guide/testing.html] > but I tried on real kafka instance (one node): > {code:java} > new Topology().with { > it.addSource("sourceProcessor", "input-topic") > it.addProcessor("aggregator", new > CustomMaxAggregatorSupplier(), "sourceProcessor") > it.addStateStore( > Stores.keyValueStoreBuilder( > Stores.inMemoryKeyValueStore("aggStore"), > Serdes.String(), > Serdes.Long()).withLoggingDisabled(), // need to > disable logging to allow aggregatorStore pre-populating > "aggregator") > it.addSink( > "sinkProcessor", > "result-topic", > "aggregator" > ) > it > } > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)