[jira] [Resolved] (KAFKA-6350) File descriptors leak with persistent KeyValueStore

2017-12-14 Thread Alin Gheorghe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alin Gheorghe resolved KAFKA-6350.
--
Resolution: Not A Bug

> File descriptors leak with persistent KeyValueStore
> ---
>
> Key: KAFKA-6350
> URL: https://issues.apache.org/jira/browse/KAFKA-6350
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 1.0.0
>Reporter: Alin Gheorghe
>
> When using the low level processor API with persistent KV stores we observed 
> continuous increase in the number of SSTs on disk. The file descriptors 
> remain open until reaching the configured OS limit (100k in our case), when 
> Kafka Streams crashes with "Too many open files" exception. In our case this 
> happens regularly in about 17 hours of uptime. The commit interval is set to 
> 5 seconds and we never call it from our code.
> Our topology consists in 1 source topic, 7 processors, 2 KV stores and 2 sink 
> topics. Retention policy is set to 2 days and the topics have 25 partitions.
> Using the punctuation mechanism in Kafka Streams 1.0.0 we perform a cleanup 
> every 30 seconds which checks for keys that have not been updated for at 
> least 20 minutes. The KV stores hold temporary user sessions which last for 5 
> minutes and have about 50 updates (user actions).
> 2017-12-11 10:57:03 
> {code:none}
> ~ # lsof 1 | grep rocksdb.*.sst | wc -l
> 54
> {code}
> 2017-12-11 11:45:31 
> {code:none}
> ~ # lsof 1 | grep rocksdb.*.sst | wc -l
> 6742
> {code}
> We use the following state store APIs: *all*, *get*, *delete*, *put*.
> When switching to in memory state stores this obviously doesn't happen.
> We have also tried to override the RocksDB parameter to *max_open_files* 
> which defaults to -1, but the configured values seems to be ignored and 
> RocksDB surpasses that threshold. 
> Sometimes the application crashes with different error which may or may not 
> be related. We will file a different Jira issue if it seems unrelated:
> {code:none}
> RocksDBExceptionJni::ThrowNew/StatusJni - Error: unexpected exception!
> 2017-12-12 11:37:25,758 
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] WARN  
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444]All stream threads have 
> died. The instance will be in error state and should be closed.
> 2017-12-12 11:37:25,758 
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] ERROR 
> com.X.Y.Z.ApiStreaming$ - [ApiStreaming] Thread 12 died with exception task 
> [0_257] Failed to flush state store eventQueueStore. Shutting down the entire 
> Kafka Streams process
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_257] Failed 
> to flush state store eventQueueStore
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> Caused by: java.lang.IllegalArgumentException: Illegal value provided for 
> SubCode.
>   at org.rocksdb.Status$SubCode.getSubCode(Status.java:109)
>   at org.rocksdb.Status.(Status.java:30)
>   at org.rocksdb.RocksDB.flush(Native Method)
>   at org.rocksdb.RocksDB.flush(RocksDB.java:1743)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:435)
>   at 
> 

[jira] [Created] (KAFKA-6350) File descriptors leak with persistent KeyValueStore

2017-12-12 Thread Alin Gheorghe (JIRA)
Alin Gheorghe created KAFKA-6350:


 Summary: File descriptors leak with persistent KeyValueStore
 Key: KAFKA-6350
 URL: https://issues.apache.org/jira/browse/KAFKA-6350
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0, 0.10.2.1
Reporter: Alin Gheorghe


When using the low level processor API with persistent KV stores we observed 
continuous increase in the number of SSTs on disk. The file descriptors remain 
open until reaching the configured OS limit (100k in our case), when Kafka 
Streams crashes with "Too many open files" exception. In our case this happens 
regularly in about 17 hours of uptime. The commit interval is set to 5 seconds 
and we never call it from our code.

Our topology consists in 1 source topic, 7 processors, 2 KV stores and 2 sink 
topics. Retention policy is set to 2 days and the topics have 25 partitions.

Using the punctuation mechanism in Kafka Streams 1.0.0 we perform a cleanup 
every 30 seconds which checks for keys that have not been updated for at least 
20 minutes. The KV stores hold temporary user sessions which last for 5 minutes 
and have about 50 updates (user actions).

2017-12-11 10:57:03 
{code:none}
~ # lsof 1 | grep rocksdb.*.sst | wc -l
54
{code}
2017-12-11 11:45:31 
{code:none}
~ # lsof 1 | grep rocksdb.*.sst | wc -l
6742
{code}

We use the following state store APIs: *all*, *get*, *delete*, *put*.
When switching to in memory state stores this obviously doesn't happen.

We have also tried to override the RocksDB parameter to *max_open_files* which 
defaults to -1, but the configured values seems to be ignored and RocksDB 
surpasses that threshold. 


Sometimes the application crashes with different error which may or may not be 
related. We will file a different Jira issue if it seems unrelated:

{code:none}
RocksDBExceptionJni::ThrowNew/StatusJni - Error: unexpected exception!
2017-12-12 11:37:25,758 
[processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] WARN  
org.apache.kafka.streams.KafkaStreams - stream-client 
[processing-08dd0a1b-a423-4a41-b7e0-995c9d985444]All stream threads have died. 
The instance will be in error state and should be closed.
2017-12-12 11:37:25,758 
[processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] ERROR 
com.X.Y.Z.ApiStreaming$ - [ApiStreaming] Thread 12 died with exception task 
[0_257] Failed to flush state store eventQueueStore. Shutting down the entire 
Kafka Streams process
org.apache.kafka.streams.errors.ProcessorStateException: task [0_257] Failed to 
flush state store eventQueueStore
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
at 
org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324)
at 
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304)
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299)
at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289)
at 
org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87)
at 
org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451)
at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380)
at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309)
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: java.lang.IllegalArgumentException: Illegal value provided for 
SubCode.
at org.rocksdb.Status$SubCode.getSubCode(Status.java:109)
at org.rocksdb.Status.(Status.java:30)
at org.rocksdb.RocksDB.flush(Native Method)
at org.rocksdb.RocksDB.flush(RocksDB.java:1743)
at 
org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:435)
at 
org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:428)
at 
org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractStateStore.flush(WrappedStateStore.java:84)
at 
org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:268)