[ https://issues.apache.org/jira/browse/KAFKA-7534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax updated KAFKA-7534: ----------------------------------- Fix Version/s: 2.1.0 > Error during CachingKeyValueStore.flush may not allow RocksDB to close > ---------------------------------------------------------------------- > > Key: KAFKA-7534 > URL: https://issues.apache.org/jira/browse/KAFKA-7534 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Bill Bejeck > Assignee: Bill Bejeck > Priority: Major > Fix For: 1.0.3, 1.1.2, 2.1.0, 2.0.2 > > > @Override > public void flush() { > lock.writeLock().lock(); > try { > cache.flush(cacheName); > underlying.flush(); > } finally { > lock.writeLock().unlock(); > } > } > @Override > public void close() { > flush(); > underlying.close(); > cache.close(cacheName); > An exception leading to this, notice that another store is already closed > and therefore not available: > 2018-10-04 12:18:44,961 ERROR > [org.apache.kafka.streams.processor.internals.ProcessorStateManager] > (...-StreamThread-8) - task [8_11] Failed to close state store > ...-STATE-STORE-0000000038: : > org.apache.kafka.streams.errors.InvalidStateStoreException: Store > KSTREAM-REDUCE-STATE-STORE-0000000025 is currently closed. > at > org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.validateStoreOpen(WrappedStateStore.java:70) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:150) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:186) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112) > at > org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:124) > at > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.get(KTableFilter.java:132) > at > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:89) > at > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:58) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90) > at > org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:40) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) > at > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:132) > at > org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.close(WrappedStateStore.java:89) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:269) > at > org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:245) > at > org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:546) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) > If the store is not closed we have witnessed that the lock is RocksDB is > not removed properly which can lead to > 2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) - Caused > by: org.rocksdb.RocksDBException: While lock file: > ...-STATE-STORE-0000000038/LOCK: No locks available > 2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) - at > org.rocksdb.RocksDB.open(Native Method) -- This message was sent by Atlassian JIRA (v7.6.3#76005)