[ 
https://issues.apache.org/jira/browse/KAFKA-8675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8675.
----------------------------------
    Resolution: Not A Problem

> "Main" consumers are not unsubsribed on KafkaStreams.close()
> ------------------------------------------------------------
>
>                 Key: KAFKA-8675
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8675
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.2.1
>            Reporter: Modestas Vainius
>            Priority: Major
>
> Hi!
> It seems that {{KafkaStreams.close()}} never unsubscribes "main" kafka 
> consumers. As far as I can tell, 
> {{org.apache.kafka.streams.processor.internals.TaskManager#shutdown}} does 
> unsubscribe only {{restoreConsumer}}. This results into Kafka Group 
> coordinator having to throw away consumer from the consumer group in a 
> non-clean way. {{KafkaStreams.close()}} does {{close()}} those consumers but 
> it seems that is not enough for clean exit.
> Kafka Streams connects to Kafka:
> {code:java}
> kafka        | [2019-07-17 08:02:35,707] INFO [GroupCoordinator 1]: Preparing 
> to rebalance group 1-streams-test in state PreparingRebalance with old 
> generation 0 (__consumer_offsets-44) (reason: Adding new member 
> 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db)
>  (kafka.coordinator.group.GroupCoordinator)
> kafka        | [2019-07-17 08:02:35,717] INFO [GroupCoordinator 1]: 
> Stabilized group 1-streams-test generation 1 (__consumer_offsets-44) 
> (kafka.coordinator.group.GroupCoordinator)
> kafka        | [2019-07-17 08:02:35,730] INFO [GroupCoordinator 1]: 
> Assignment received from leader for group 1-streams-test for generation 1 
> (kafka.coordinator.group.GroupCoordinator)
> {code}
> Processing finishes in 2 secs but after 10 seconds I see this in Kafka logs:
> {code:java}
> kafka        | [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Member 
> 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db
>  in group 1-streams-test has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> kafka        | [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Preparing 
> to rebalance group 1-streams-test in state PreparingRebalance with old 
> generation 1 (__consumer_offsets-44) (reason: removing member 
> 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db
>  on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
> kafka        | [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Group 
> 1-streams-test with generation 2 is now empty (__consumer_offsets-44) 
> (kafka.coordinator.group.GroupCoordinator)
> {code}
> Topology is kind of similar to [kafka testing 
> example|https://kafka.apache.org/22/documentation/streams/developer-guide/testing.html]
>  but I tried on real kafka instance (one node):
> {code:java}
>             new Topology().with {
>                 it.addSource("sourceProcessor", "input-topic")
>                 it.addProcessor("aggregator", new 
> CustomMaxAggregatorSupplier(), "sourceProcessor")
>                 it.addStateStore(
>                     Stores.keyValueStoreBuilder(
>                         Stores.inMemoryKeyValueStore("aggStore"),
>                         Serdes.String(),
>                         Serdes.Long()).withLoggingDisabled(), // need to 
> disable logging to allow aggregatorStore pre-populating
>                     "aggregator")
>                 it.addSink(
>                     "sinkProcessor",
>                     "result-topic",
>                     "aggregator"
>                 )
>                 it
>             }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to