[
https://issues.apache.org/jira/browse/KAFKA-6350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alin Gheorghe resolved KAFKA-6350.
----------------------------------
Resolution: Not A Bug
> File descriptors leak with persistent KeyValueStore
> ---------------------------------------------------
>
> Key: KAFKA-6350
> URL: https://issues.apache.org/jira/browse/KAFKA-6350
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.2.1, 1.0.0
> Reporter: Alin Gheorghe
>
> When using the low level processor API with persistent KV stores we observed
> continuous increase in the number of SSTs on disk. The file descriptors
> remain open until reaching the configured OS limit (100k in our case), when
> Kafka Streams crashes with "Too many open files" exception. In our case this
> happens regularly in about 17 hours of uptime. The commit interval is set to
> 5 seconds and we never call it from our code.
> Our topology consists in 1 source topic, 7 processors, 2 KV stores and 2 sink
> topics. Retention policy is set to 2 days and the topics have 25 partitions.
> Using the punctuation mechanism in Kafka Streams 1.0.0 we perform a cleanup
> every 30 seconds which checks for keys that have not been updated for at
> least 20 minutes. The KV stores hold temporary user sessions which last for 5
> minutes and have about 50 updates (user actions).
> 2017-12-11 10:57:03
> {code:none}
> ~ # lsof 1 | grep rocksdb.*.sst | wc -l
> 54
> {code}
> 2017-12-11 11:45:31
> {code:none}
> ~ # lsof 1 | grep rocksdb.*.sst | wc -l
> 6742
> {code}
> We use the following state store APIs: *all*, *get*, *delete*, *put*.
> When switching to in memory state stores this obviously doesn't happen.
> We have also tried to override the RocksDB parameter to *max_open_files*
> which defaults to -1, but the configured values seems to be ignored and
> RocksDB surpasses that threshold.
> Sometimes the application crashes with different error which may or may not
> be related. We will file a different Jira issue if it seems unrelated:
> {code:none}
> RocksDBExceptionJni::ThrowNew/StatusJni - Error: unexpected exception!
> 2017-12-12 11:37:25,758
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] WARN
> org.apache.kafka.streams.KafkaStreams - stream-client
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444]All stream threads have
> died. The instance will be in error state and should be closed.
> 2017-12-12 11:37:25,758
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] ERROR
> com.X.Y.Z.ApiStreaming$ - [ApiStreaming] Thread 12 died with exception task
> [0_257] Failed to flush state store eventQueueStore. Shutting down the entire
> Kafka Streams process
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_257] Failed
> to flush state store eventQueueStore
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324)
> at
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289)
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87)
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451)
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> Caused by: java.lang.IllegalArgumentException: Illegal value provided for
> SubCode.
> at org.rocksdb.Status$SubCode.getSubCode(Status.java:109)
> at org.rocksdb.Status.<init>(Status.java:30)
> at org.rocksdb.RocksDB.flush(Native Method)
> at org.rocksdb.RocksDB.flush(RocksDB.java:1743)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:435)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:428)
> at
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.flush(WrappedStateStore.java:84)
> at
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:268)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:153)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:245)
> ... 14 more
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)