[ https://issues.apache.org/jira/browse/KAFKA-8602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax updated KAFKA-8602: ----------------------------------- Fix Version/s: 1.0.3 > StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic > ------------------------------------------------------------------------- > > Key: KAFKA-8602 > URL: https://issues.apache.org/jira/browse/KAFKA-8602 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.0.3, 1.1.0, 1.1.1, 2.0.0, 2.0.1, > 2.1.0, 2.2.0, 2.1.1, 2.3.0, 2.2.1 > Reporter: Bruno Cadonna > Assignee: Bruno Cadonna > Priority: Critical > Fix For: 1.0.3, 2.4.0 > > > StreamThread dies with the following exception: > {code:java} > java.lang.IllegalStateException: Consumer is not subscribed to any topics or > assigned any partitions > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1199) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) > {code} > The reason is that the restore consumer is not subscribed to any topic. This > happens when a StreamThread gets assigned standby tasks for sub-topologies > with just state stores with disabled logging. > To reproduce the bug start two applications with one StreamThread and one > standby replica each and the following topology. The input topic should have > two partitions: > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final String stateStoreName = "myTransformState"; > final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder = > > Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName), > Serdes.Integer(), > Serdes.Integer()) > .withLoggingDisabled(); > builder.addStateStore(keyValueStoreBuilder); > builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer())) > .transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, > Integer>>() { > private KeyValueStore<Integer, Integer> state; > @SuppressWarnings("unchecked") > @Override > public void init(final ProcessorContext context) { > state = (KeyValueStore<Integer, Integer>) > context.getStateStore(stateStoreName); > } > @Override > public KeyValue<Integer, Integer> transform(final Integer key, > final Integer value) { > final KeyValue<Integer, Integer> result = new KeyValue<>(key, > value); > return result; > } > @Override > public void close() {} > }, stateStoreName) > .to(OUTPUT_TOPIC); > {code} > Both StreamThreads should die with the above exception. > The root cause is that standby tasks are created although all state stores of > the sub-topology have logging disabled. -- This message was sent by Atlassian JIRA (v7.6.14#76016)