[ https://issues.apache.org/jira/browse/KAFKA-6350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alin Gheorghe resolved KAFKA-6350. ---------------------------------- Resolution: Not A Bug > File descriptors leak with persistent KeyValueStore > --------------------------------------------------- > > Key: KAFKA-6350 > URL: https://issues.apache.org/jira/browse/KAFKA-6350 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.1, 1.0.0 > Reporter: Alin Gheorghe > > When using the low level processor API with persistent KV stores we observed > continuous increase in the number of SSTs on disk. The file descriptors > remain open until reaching the configured OS limit (100k in our case), when > Kafka Streams crashes with "Too many open files" exception. In our case this > happens regularly in about 17 hours of uptime. The commit interval is set to > 5 seconds and we never call it from our code. > Our topology consists in 1 source topic, 7 processors, 2 KV stores and 2 sink > topics. Retention policy is set to 2 days and the topics have 25 partitions. > Using the punctuation mechanism in Kafka Streams 1.0.0 we perform a cleanup > every 30 seconds which checks for keys that have not been updated for at > least 20 minutes. The KV stores hold temporary user sessions which last for 5 > minutes and have about 50 updates (user actions). > 2017-12-11 10:57:03 > {code:none} > ~ # lsof 1 | grep rocksdb.*.sst | wc -l > 54 > {code} > 2017-12-11 11:45:31 > {code:none} > ~ # lsof 1 | grep rocksdb.*.sst | wc -l > 6742 > {code} > We use the following state store APIs: *all*, *get*, *delete*, *put*. > When switching to in memory state stores this obviously doesn't happen. > We have also tried to override the RocksDB parameter to *max_open_files* > which defaults to -1, but the configured values seems to be ignored and > RocksDB surpasses that threshold. > Sometimes the application crashes with different error which may or may not > be related. We will file a different Jira issue if it seems unrelated: > {code:none} > RocksDBExceptionJni::ThrowNew/StatusJni - Error: unexpected exception! > 2017-12-12 11:37:25,758 > [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] WARN > org.apache.kafka.streams.KafkaStreams - stream-client > [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444]All stream threads have > died. The instance will be in error state and should be closed. > 2017-12-12 11:37:25,758 > [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] ERROR > com.X.Y.Z.ApiStreaming$ - [ApiStreaming] Thread 12 died with exception task > [0_257] Failed to flush state store eventQueueStore. Shutting down the entire > Kafka Streams process > org.apache.kafka.streams.errors.ProcessorStateException: task [0_257] Failed > to flush state store eventQueueStore > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248) > at > org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324) > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289) > at > org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380) > at > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) > Caused by: java.lang.IllegalArgumentException: Illegal value provided for > SubCode. > at org.rocksdb.Status$SubCode.getSubCode(Status.java:109) > at org.rocksdb.Status.<init>(Status.java:30) > at org.rocksdb.RocksDB.flush(Native Method) > at org.rocksdb.RocksDB.flush(RocksDB.java:1743) > at > org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:435) > at > org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:428) > at > org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.flush(WrappedStateStore.java:84) > at > org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:268) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:153) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:245) > ... 14 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)