[ 
https://issues.apache.org/jira/browse/KAFKA-7534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7534:
-----------------------------------
    Fix Version/s: 2.1.0

> Error during CachingKeyValueStore.flush may not allow RocksDB to close
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-7534
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7534
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Bill Bejeck
>            Assignee: Bill Bejeck
>            Priority: Major
>             Fix For: 1.0.3, 1.1.2, 2.1.0, 2.0.2
>
>
> @Override
> public void flush() {
>     lock.writeLock().lock();
>     try {
>         cache.flush(cacheName);
>         underlying.flush();
>     } finally {
>         lock.writeLock().unlock();
>     }
> }
> @Override
> public void close() {
>     flush();
>     underlying.close();
>     cache.close(cacheName);
> An exception leading to this, notice that another store is already closed
> and therefore not available:
> 2018-10-04 12:18:44,961 ERROR
> [org.apache.kafka.streams.processor.internals.ProcessorStateManager]
> (...-StreamThread-8) - task [8_11] Failed to close state store
> ...-STATE-STORE-0000000038: :
> org.apache.kafka.streams.errors.InvalidStateStoreException: Store
> KSTREAM-REDUCE-STATE-STORE-0000000025 is currently closed.
> at
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.validateStoreOpen(WrappedStateStore.java:70)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:150)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
> at
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:186)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112)
> at
> org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:124)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.get(KTableFilter.java:132)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:89)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:58)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
> at
> org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:40)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:132)
> at
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.close(WrappedStateStore.java:89)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:269)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:245)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:546)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:624)
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:410)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1172)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
> If the store is not closed we have witnessed that the lock is RocksDB is
> not removed properly which can lead to
> 2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) -   Caused
> by: org.rocksdb.RocksDBException: While lock file:
> ...-STATE-STORE-0000000038/LOCK: No locks available
> 2018-10-04 12:18:59,342 ERROR [stderr] (...-StreamThread-6) -   at
> org.rocksdb.RocksDB.open(Native Method)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to