[ 
https://issues.apache.org/jira/browse/KAFKA-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhishek Agarwal reassigned KAFKA-7213:
---------------------------------------

    Assignee:     (was: Abhishek Agarwal)

> NullPointerException during state restoration in kafka streams
> --------------------------------------------------------------
>
>                 Key: KAFKA-7213
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7213
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Abhishek Agarwal
>            Priority: Major
>
> I had written a custom state store which has a batch restoration callback 
> registered. What I have observed, when multiple consumer instances are 
> restarted, the application keeps failing with NullPointerException. The stack 
> trace is 
> {noformat}
> java.lang.NullPointerException: null
>       at 
> org.apache.kafka.streams.state.internals.RocksDBStore.putAll(RocksDBStore.java:351)
>  ~[kafka-streams-1.0.0.jar:?]
>       at 
> org.apache.kafka.streams.state.internals.RocksDBSlotKeyValueBytesStore.putAll(RocksDBSlotKeyValueBytesStore.java:100)
>  ~[streams-core-1.0.0.297.jar:?]
>       at 
> org.apache.kafka.streams.state.internals.RocksDBSlotKeyValueBytesStore$SlotKeyValueBatchRestoreCallback.restoreAll(RocksDBSlotKeyValueBytesStore.java:303)
>  ~[streams-core-1.0.0.297.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreAll(CompositeRestoreListener.java:89)
>  ~[kafka-streams-1.0.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:75)
>  ~[kafka-streams-1.0.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:277)
>  ~[kafka-streams-1.0.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:238)
>  ~[kafka-streams-1.0.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
>  ~[kafka-streams-1.0.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
>  ~[kafka-streams-1.0.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
>  ~[kafka-streams-1.0.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>  ~[kafka-streams-1.0.0.jar:?]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
>  ~[kafka-streams-1.0.0.jar:?]
> {noformat}
> The faulty line in question is 
> {noformat}
> db.write(wOptions, batch);
> {noformat}
> in RocksDBStore.java which would mean that db variable is null. Probably the 
> store has been closed and restoration is still being done on it. After going 
> through the code, I think the problem is when state transitions from 
> PARTITIONS_ASSIGNED to PARTITIONS_REVOKED and restoration is still in 
> progress. 
> In such state transition, while the active tasks themselves are closed, the 
> changelog reader is not reset. It tries to restore the tasks that have 
> already been closed, db is null and results in NPE. 
> I will put in a fix to see if that fixes the issue. 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to