Yuto Kawamura created KAFKA-3642: ------------------------------------ Summary: Fix NPE from ProcessorStateManager when the changelog topic not exists Key: KAFKA-3642 URL: https://issues.apache.org/jira/browse/KAFKA-3642 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.0.1 Reporter: Yuto Kawamura Assignee: Yuto Kawamura Fix For: 0.10.1.0
# Fix NPE from ProcessorStateManager when the changelog topic not exists When the following two conditions satisifed, ProcessorStateManager throws NPE: - A state configured with logging enabled but the corresponding -changelog topic not exists, - zookeeper.connect wasn't supplied in streams config. so Streams should, - expected that the -changelog topic is not exists and throw much meaningful exception. - warn users if there's no -changelog topic prepared but zookeeper.connect wasn't also supplied. BTW, I think making zookeeper.connect as mandatory argument should be another option if it doens't hurts. {code} $ git diff diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java index 34c35b7..c5339f1 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java @@ -108,7 +108,7 @@ public class WordCountProcessorDemo { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); + // props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); $ ./bin/kafka-topics.sh --zookeeper localhost:2181 --list 2>/dev/null | grep '\-changelog' $ ./bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo ... [2016-04-30 02:25:04,960] ERROR User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group streams-wordcount-processor failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) java.lang.NullPointerException at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:189) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:116) at org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:64) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85) at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:115) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:582) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:609) at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:71) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:126) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:226) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:221) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:430) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:416) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:673) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:652) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:397) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:338) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:191) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:161) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:237) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:343) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:902) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:864) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:327) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:250) Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to rebalance at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:331) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:250) Caused by: java.lang.NullPointerException at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:189) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:116) at org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:64) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85) at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:115) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:582) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:609) at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:71) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:126) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:226) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:221) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:430) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:416) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:673) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:652) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:397) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:338) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:191) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:161) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:237) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:343) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:902) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:864) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:327) ... 1 more {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)