[
https://issues.apache.org/jira/browse/KAFKA-8675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang resolved KAFKA-8675.
----------------------------------
Resolution: Not A Problem
> "Main" consumers are not unsubsribed on KafkaStreams.close()
> ------------------------------------------------------------
>
> Key: KAFKA-8675
> URL: https://issues.apache.org/jira/browse/KAFKA-8675
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.2.1
> Reporter: Modestas Vainius
> Priority: Major
>
> Hi!
> It seems that {{KafkaStreams.close()}} never unsubscribes "main" kafka
> consumers. As far as I can tell,
> {{org.apache.kafka.streams.processor.internals.TaskManager#shutdown}} does
> unsubscribe only {{restoreConsumer}}. This results into Kafka Group
> coordinator having to throw away consumer from the consumer group in a
> non-clean way. {{KafkaStreams.close()}} does {{close()}} those consumers but
> it seems that is not enough for clean exit.
> Kafka Streams connects to Kafka:
> {code:java}
> kafka | [2019-07-17 08:02:35,707] INFO [GroupCoordinator 1]: Preparing
> to rebalance group 1-streams-test in state PreparingRebalance with old
> generation 0 (__consumer_offsets-44) (reason: Adding new member
> 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db)
> (kafka.coordinator.group.GroupCoordinator)
> kafka | [2019-07-17 08:02:35,717] INFO [GroupCoordinator 1]:
> Stabilized group 1-streams-test generation 1 (__consumer_offsets-44)
> (kafka.coordinator.group.GroupCoordinator)
> kafka | [2019-07-17 08:02:35,730] INFO [GroupCoordinator 1]:
> Assignment received from leader for group 1-streams-test for generation 1
> (kafka.coordinator.group.GroupCoordinator)
> {code}
> Processing finishes in 2 secs but after 10 seconds I see this in Kafka logs:
> {code:java}
> kafka | [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Member
> 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db
> in group 1-streams-test has failed, removing it from the group
> (kafka.coordinator.group.GroupCoordinator)
> kafka | [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Preparing
> to rebalance group 1-streams-test in state PreparingRebalance with old
> generation 1 (__consumer_offsets-44) (reason: removing member
> 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db
> on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
> kafka | [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Group
> 1-streams-test with generation 2 is now empty (__consumer_offsets-44)
> (kafka.coordinator.group.GroupCoordinator)
> {code}
> Topology is kind of similar to [kafka testing
> example|https://kafka.apache.org/22/documentation/streams/developer-guide/testing.html]
> but I tried on real kafka instance (one node):
> {code:java}
> new Topology().with {
> it.addSource("sourceProcessor", "input-topic")
> it.addProcessor("aggregator", new
> CustomMaxAggregatorSupplier(), "sourceProcessor")
> it.addStateStore(
> Stores.keyValueStoreBuilder(
> Stores.inMemoryKeyValueStore("aggStore"),
> Serdes.String(),
> Serdes.Long()).withLoggingDisabled(), // need to
> disable logging to allow aggregatorStore pre-populating
> "aggregator")
> it.addSink(
> "sinkProcessor",
> "result-topic",
> "aggregator"
> )
> it
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)