[ 
https://issues.apache.org/jira/browse/KAFKA-6350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alin Gheorghe resolved KAFKA-6350.
----------------------------------
    Resolution: Not A Bug

> File descriptors leak with persistent KeyValueStore
> ---------------------------------------------------
>
>                 Key: KAFKA-6350
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6350
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.1, 1.0.0
>            Reporter: Alin Gheorghe
>
> When using the low level processor API with persistent KV stores we observed 
> continuous increase in the number of SSTs on disk. The file descriptors 
> remain open until reaching the configured OS limit (100k in our case), when 
> Kafka Streams crashes with "Too many open files" exception. In our case this 
> happens regularly in about 17 hours of uptime. The commit interval is set to 
> 5 seconds and we never call it from our code.
> Our topology consists in 1 source topic, 7 processors, 2 KV stores and 2 sink 
> topics. Retention policy is set to 2 days and the topics have 25 partitions.
> Using the punctuation mechanism in Kafka Streams 1.0.0 we perform a cleanup 
> every 30 seconds which checks for keys that have not been updated for at 
> least 20 minutes. The KV stores hold temporary user sessions which last for 5 
> minutes and have about 50 updates (user actions).
> 2017-12-11 10:57:03 
> {code:none}
> ~ # lsof 1 | grep rocksdb.*.sst | wc -l
> 54
> {code}
> 2017-12-11 11:45:31 
> {code:none}
> ~ # lsof 1 | grep rocksdb.*.sst | wc -l
> 6742
> {code}
> We use the following state store APIs: *all*, *get*, *delete*, *put*.
> When switching to in memory state stores this obviously doesn't happen.
> We have also tried to override the RocksDB parameter to *max_open_files* 
> which defaults to -1, but the configured values seems to be ignored and 
> RocksDB surpasses that threshold. 
> Sometimes the application crashes with different error which may or may not 
> be related. We will file a different Jira issue if it seems unrelated:
> {code:none}
> RocksDBExceptionJni::ThrowNew/StatusJni - Error: unexpected exception!
> 2017-12-12 11:37:25,758 
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] WARN  
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444]All stream threads have 
> died. The instance will be in error state and should be closed.
> 2017-12-12 11:37:25,758 
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] ERROR 
> com.X.Y.Z.ApiStreaming$ - [ApiStreaming] Thread 12 died with exception task 
> [0_257] Failed to flush state store eventQueueStore. Shutting down the entire 
> Kafka Streams process
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_257] Failed 
> to flush state store eventQueueStore
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
>       at 
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304)
>       at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289)
>       at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87)
>       at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451)
>       at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> Caused by: java.lang.IllegalArgumentException: Illegal value provided for 
> SubCode.
>       at org.rocksdb.Status$SubCode.getSubCode(Status.java:109)
>       at org.rocksdb.Status.<init>(Status.java:30)
>       at org.rocksdb.RocksDB.flush(Native Method)
>       at org.rocksdb.RocksDB.flush(RocksDB.java:1743)
>       at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:435)
>       at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:428)
>       at 
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.flush(WrappedStateStore.java:84)
>       at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:268)
>       at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:153)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:245)
>       ... 14 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to