[
https://issues.apache.org/jira/browse/KAFKA-8602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bill Bejeck resolved KAFKA-8602.
--------------------------------
Resolution: Fixed
> StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic
> -------------------------------------------------------------------------
>
> Key: KAFKA-8602
> URL: https://issues.apache.org/jira/browse/KAFKA-8602
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.1.1
> Reporter: Bruno Cadonna
> Assignee: Bruno Cadonna
> Priority: Critical
>
> StreamThread dies with the following exception:
> {code:java}
> java.lang.IllegalStateException: Consumer is not subscribed to any topics or
> assigned any partitions
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1199)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> {code}
> The reason is that the restore consumer is not subscribed to any topic. This
> happens when a StreamThread gets assigned standby tasks for sub-topologies
> with just state stores with disabled logging.
> To reproduce the bug start two applications with one StreamThread and one
> standby replica each and the following topology. The input topic should have
> two partitions:
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final String stateStoreName = "myTransformState";
> final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
>
> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName),
> Serdes.Integer(),
> Serdes.Integer())
> .withLoggingDisabled();
> builder.addStateStore(keyValueStoreBuilder);
> builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()))
> .transform(() -> new Transformer<Integer, Integer, KeyValue<Integer,
> Integer>>() {
> private KeyValueStore<Integer, Integer> state;
> @SuppressWarnings("unchecked")
> @Override
> public void init(final ProcessorContext context) {
> state = (KeyValueStore<Integer, Integer>)
> context.getStateStore(stateStoreName);
> }
> @Override
> public KeyValue<Integer, Integer> transform(final Integer key,
> final Integer value) {
> final KeyValue<Integer, Integer> result = new KeyValue<>(key,
> value);
> return result;
> }
> @Override
> public void close() {}
> }, stateStoreName)
> .to(OUTPUT_TOPIC);
> {code}
> Both StreamThreads should die with the above exception.
> The root cause is that standby tasks are created although all state stores of
> the sub-topology have logging disabled.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)