[jira] [Commented] (KAFKA-5055) Kafka Streams skipped-records-rate sensor producing nonzero values even when FailOnInvalidTimestamp is used as extractor
[ https://issues.apache.org/jira/browse/KAFKA-5055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15990086#comment-15990086 ] Davor Poldrugo commented on KAFKA-5055: --- Hi guys! I think the problem is because of a bug in the method {{org.apache.kafka.streams.processor.internals.StreamTask#addRecords}}: {code:java} public int addRecords(TopicPartition partition, Iterable> records) { final int oldQueueSize = partitionGroup.numBuffered(); final int newQueueSize = partitionGroup.addRawRecords(partition, records); log.trace("{} Added records into the buffered queue of partition {}, new queue size is {}", logPrefix, partition, newQueueSize); // if after adding these records, its partition queue's buffered size has been // increased beyond the threshold, we can then pause the consumption for this partition if (newQueueSize > this.maxBufferedSize) { consumer.pause(singleton(partition)); } return newQueueSize - oldQueueSize; } {code} This line is the problem: {{final int oldQueueSize = partitionGroup.numBuffered();}} Instead of getting {{oldQueueSize}} for current TopicPartition it gets queue size for all the partitions, because {{partitionGroup.numBuffered()}} returns the queue size across all partitions. Then {{return newQueueSize - oldQueueSize}} returns a negative value. Because it's more probable that the sum of all Queue sizes across partitions is bigger then the Queue size of this partition. This goes back to {{org.apache.kafka.streams.processor.internals.StreamThread#runLoop}}: {code:java} for (TopicPartition partition : records.partitions()) { StreamTask task = activeTasksByPartition.get(partition); numAddedRecords += task.addRecords(partition, records.records(partition)); } streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs); {code} There it gets summed into {{numAddedRecords}} - by actually decreasing the value (because it's negative), and this leads to wrong sensor value in {{records.count() - numAddedRecords}}, because {{numAddedRecords}} is now smaller because of a bug, not because of an invalid timestamp. BugFix proposal: In the method {{org.apache.kafka.streams.processor.internals.StreamTask#addRecords}} change: {{final int oldQueueSize = partitionGroup.numBuffered();}} to {{final int oldQueueSize = partitionGroup.numBuffered(partition);}} If this is the cause of the bug, can I do a pull request? > Kafka Streams skipped-records-rate sensor producing nonzero values even when > FailOnInvalidTimestamp is used as extractor > > > Key: KAFKA-5055 > URL: https://issues.apache.org/jira/browse/KAFKA-5055 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Nikki Thean >Assignee: Guozhang Wang > > According to the code and the documentation for this metric, the only reason > for a skipped record is an invalid timestamp, except that a) I am reading > from a topic that is populated solely by Kafka Connect and b) I am using > `FailOnInvalidTimestamp` as the timestamp extractor. > Either I'm missing something in the documentation (i.e. another reason for > skipped records) or there is a bug in the code that calculates this metric. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4455) CommitFailedException during rebalance doesn't release resources in tasks/processors
[ https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15812326#comment-15812326 ] Davor Poldrugo commented on KAFKA-4455: --- This should be resolved by KAFKA-4561, so this issue can probably be closed. > CommitFailedException during rebalance doesn't release resources in > tasks/processors > > > Key: KAFKA-4455 > URL: https://issues.apache.org/jira/browse/KAFKA-4455 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0 > Environment: Kafka Streams were running on CentOS - I have observed > this - after some time the locks were released even if the jvm/process wasn't > restarted, so I guess CentOS has some lock cleaning policy. >Reporter: Davor Poldrugo > Labels: stacktrace > Attachments: RocksDBException_IO-error_stacktrace.txt > > > h2. Problem description > From time to time a rebalance in Kafka Streams causes the commit to throw > CommitFailedException. When this exception is thrown, the tasks and > processors are not closed. If some processor contains a state store > (RocksDB), the RocksDB is not closed, which leads to not relasead LOCK's on > OS level, and when the Kafka Streams app is trying to open tasks and their > respective processors and state stores the {{org.rocksdb.RocksDBException: IO > error: lock .../LOCK: No locks available}} is thrown. If the the jvm/process > is restarted the locks are released. > h2. Additional info > I have been running 3 Kafka Streams instances on separate machines with > {{num.stream.threads=1}} and each with it's own state directory. Other Kafka > Streams apps were running on the same machines but they had separate > directories for state stores. In the attached logs you can see > {{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented > some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on > some transient exceptions restarts the > {{org.apache.kafka.streams.KafkaStreams}} topology, by calling > {{org.apache.kafka.streams.KafkaStreams.stop()}} and then > {{org.apache.kafka.streams.KafkaStreams.start()}}. This causes the thread > names to have bigger numbers. > h2. Stacktrace > [^RocksDBException_IO-error_stacktrace.txt] > h2. Suggested solution > To avoid restarting the jvm, modify Kafka Streams to close tasks, which will > lead to release of resources - in this case - filesystem LOCK files. > h2. Possible solution code > Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork > Commit: [BUGFIX: When commit fails during rebalance - release > resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3] > I have been running this fork in production for 3 days and the error doesn't > come-up. > h2. Note > This could be related this issues: KAFKA-3708 and KAFKA-3938 > Additinal conversation can be found here: [stream shut down due to no locks > for state > store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4455) CommitFailedException during rebalance doesn't release resources in tasks/processors
[ https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Davor Poldrugo updated KAFKA-4455: -- Summary: CommitFailedException during rebalance doesn't release resources in tasks/processors (was: CommitFailedException during rebalance doesn't release resources in processors) > CommitFailedException during rebalance doesn't release resources in > tasks/processors > > > Key: KAFKA-4455 > URL: https://issues.apache.org/jira/browse/KAFKA-4455 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0 > Environment: Kafka Streams were running on CentOS - I have observed > this - after some time the locks were released even if the jvm/process wasn't > restarted, so I guess CentOS has some lock cleaning policy. >Reporter: Davor Poldrugo > Labels: stacktrace > Attachments: RocksDBException_IO-error_stacktrace.txt > > > h2. Problem description > From time to time a rebalance in Kafka Streams causes the commit to throw > CommitFailedException. When this exception is thrown, the tasks and > processors are not closed. If some processor contains a state store > (RocksDB), the RocksDB is not closed, which leads to not relasead LOCK's on > OS level, and when the Kafka Streams app is trying to open tasks and their > respective processors and state stores the {{org.rocksdb.RocksDBException: IO > error: lock .../LOCK: No locks available}} is thrown. If the the jvm/process > is restarted the locks are released. > h2. Additional info > I have been running 3 Kafka Streams instances on separate machines with > {{num.stream.threads=1}} and each with it's own state directory. Other Kafka > Streams apps were running on the same machines but they had separate > directories for state stores. In the attached logs you can see > {{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented > some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on > some transient exceptions restarts the > {{org.apache.kafka.streams.KafkaStreams}} topology, by calling > {{org.apache.kafka.streams.KafkaStreams.stop()}} and then > {{org.apache.kafka.streams.KafkaStreams.start()}}. This causes the thread > names to have bigger numbers. > h2. Stacktrace > [^RocksDBException_IO-error_stacktrace.txt] > h2. Suggested solution > To avoid restarting the jvm, modify Kafka Streams to close tasks, which will > lead to release of resources - in this case - filesystem LOCK files. > h2. Possible solution code > Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork > Commit: [BUGFIX: When commit fails during rebalance - release > resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3] > I have been running this fork in production for 3 days and the error doesn't > come-up. > h2. Note > This could be related this issues: KAFKA-3708 and KAFKA-3938 > Additinal conversation can be found here: [stream shut down due to no locks > for state > store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4455) CommitFailedException during rebalance doesn't release resources in processors
[ https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Davor Poldrugo updated KAFKA-4455: -- Summary: CommitFailedException during rebalance doesn't release resources in processors (was: Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available) > CommitFailedException during rebalance doesn't release resources in processors > -- > > Key: KAFKA-4455 > URL: https://issues.apache.org/jira/browse/KAFKA-4455 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0 > Environment: Kafka Streams were running on CentOS - I have observed > this - after some time the locks were released even if the jvm/process wasn't > restarted, so I guess CentOS has some lock cleaning policy. >Reporter: Davor Poldrugo > Labels: stacktrace > Attachments: RocksDBException_IO-error_stacktrace.txt > > > h2. Problem description > From time to time a rebalance in Kafka Streams causes the commit to throw > CommitFailedException. When this exception is thrown, the tasks and > processors are not closed. If some processor contains a state store > (RocksDB), the RocksDB is not closed, which leads to not relasead LOCK's on > OS level, and when the Kafka Streams app is trying to open tasks and their > respective processors and state stores the {{org.rocksdb.RocksDBException: IO > error: lock .../LOCK: No locks available}} is thrown. If the the jvm/process > is restarted the locks are released. > h2. Additional info > I have been running 3 Kafka Streams instances on separate machines with > {{num.stream.threads=1}} and each with it's own state directory. Other Kafka > Streams apps were running on the same machines but they had separate > directories for state stores. In the attached logs you can see > {{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented > some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on > some transient exceptions restarts the > {{org.apache.kafka.streams.KafkaStreams}} topology, by calling > {{org.apache.kafka.streams.KafkaStreams.stop()}} and then > {{org.apache.kafka.streams.KafkaStreams.start()}}. This causes the thread > names to have bigger numbers. > h2. Stacktrace > [^RocksDBException_IO-error_stacktrace.txt] > h2. Suggested solution > To avoid restarting the jvm, modify Kafka Streams to close tasks, which will > lead to release of resources - in this case - filesystem LOCK files. > h2. Possible solution code > Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork > Commit: [BUGFIX: When commit fails during rebalance - release > resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3] > I have been running this fork in production for 3 days and the error doesn't > come-up. > h2. Note > This could be related this issues: KAFKA-3708 and KAFKA-3938 > Additinal conversation can be found here: [stream shut down due to no locks > for state > store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
[ https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Davor Poldrugo updated KAFKA-4455: -- Description: h2. Problem description >From time to time a rebalance in Kafka Streams causes the commit to throw >CommitFailedException. When this exception is thrown, the tasks and processors >are not closed. If some processor contains a state store (RocksDB), the >RocksDB is not closed, which leads to not relasead LOCK's on OS level, and >when the Kafka Streams app is trying to open tasks and their respective >processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock >.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted >the locks are released. h2. Additional info I have been running 3 Kafka Streams instances on separate machines with {{num.stream.threads=1}} and each with it's own state directory. Other Kafka Streams apps were running on the same machines but they had separate directories for state stores. In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on some transient exceptions restarts the {{org.apache.kafka.streams.KafkaStreams}} topology, by calling {{org.apache.kafka.streams.KafkaStreams.stop()}} and then {{org.apache.kafka.streams.KafkaStreams.start()}}. This causes the thread names to have bigger numbers. h2. Stacktrace [^RocksDBException_IO-error_stacktrace.txt] h2. Suggested solution To avoid restarting the jvm, modify Kafka Streams to close tasks, which will lead to release of resources - in this case - filesystem LOCK files. h2. Possible solution code Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork Commit: [BUGFIX: When commit fails during rebalance - release resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3] I have been running this fork in production for 3 days and the error doesn't come-up. h2. Note This could be related this issues: KAFKA-3708 and KAFKA-3938 Additinal conversation can be found here: [stream shut down due to no locks for state store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4] was: h2. Problem description >From time to time a rebalance in Kafka Streams causes the commit to throw >CommitFailedException. When this exception is thrown, the tasks and processors >are not closed. If some processor contains a state store (RocksDB), the >RocksDB is not closed, which leads to not relasead LOCK's on OS level, and >when the Kafka Streams app is trying to open tasks and their respective >processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock >.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted >the locks are released. h2. Additional info I have been running 3 Kafka Streams instances on separate machines with {{num.stream.threads=1}} and each with it's own state directory. Other Kafka Streams apps were running on the same machines but they had separate directories for state stores. In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on some transient exceptions restarts the {{org.apache.kafka.streams.KafkaStreams}} topology, by calling {{org.apache.kafka.streams.KafkaStreams.stop()}} and then {{org.apache.kafka.streams.KafkaStreams.start()}}. h2. Stacktrace [^RocksDBException_IO-error_stacktrace.txt] h2. Suggested solution To avoid restarting the jvm, modify Kafka Streams to close tasks, which will lead to release of resources - in this case - filesystem LOCK files. h2. Possible solution code Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork Commit: [BUGFIX: When commit fails during rebalance - release resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3] I have been running this fork in production for 3 days and the error doesn't come-up. h2. Note This could be related this issues: KAFKA-3708 and KAFKA-3938 Additinal conversation can be found here: [stream shut down due to no locks for state store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4] > Commit during rebalance does not close RocksDB which later causes: > org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available > > > Key: KAFKA-4455 > URL: https://issues.apache.org/jira/browse/KAFKA-4455 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0 > Environment: Kafka Streams were running on
[jira] [Updated] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
[ https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Davor Poldrugo updated KAFKA-4455: -- Description: h2. Problem description >From time to time a rebalance in Kafka Streams causes the commit to throw >CommitFailedException. When this exception is thrown, the tasks and processors >are not closed. If some processor contains a state store (RocksDB), the >RocksDB is not closed, which leads to not relasead LOCK's on OS level, and >when the Kafka Streams app is trying to open tasks and their respective >processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock >.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted >the locks are released. h2. Additional info I have been running 3 Kafka Streams instances on separate machines with {{num.stream.threads=1}} and each with it's own state directory. Other Kafka Streams apps were running on the same machines but they had separate directories for state stores. In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on some transient exceptions restarts the {{org.apache.kafka.streams.KafkaStreams}} topology, by calling {{org.apache.kafka.streams.KafkaStreams.stop()}} and then {{org.apache.kafka.streams.KafkaStreams.start()}}. h2. Stacktrace [^RocksDBException_IO-error_stacktrace.txt] h2. Suggested solution To avoid restarting the jvm, modify Kafka Streams to close tasks, which will lead to release of resources - in this case - filesystem LOCK files. h2. Possible solution code Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork Commit: [BUGFIX: When commit fails during rebalance - release resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3] I have been running this fork in production for 3 days and the error doesn't come-up. h2. Note This could be related this issues: KAFKA-3708 and KAFKA-3938 Additinal conversation can be found here: [stream shut down due to no locks for state store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4] was: h2. Problem description >From time to time a rebalance in Kafka Streams causes the commit to throw >CommitFailedException. When this exception is thrown, the tasks and processors >are not closed. If some processor contains a state store (RocksDB), the >RocksDB is not closed, which leads to not relasead LOCK's on OS level, and >when the Kafka Streams app is trying to open tasks and their respective >processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock >.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted >the locks are released. h2. Additional info I have been running 3 Kafka Streams instances on separate machines with {{num.stream.threads=1}} and each with it's own state directory. Other Kafka Streams apps were running but they had separate directories for state stores. In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on some transient exceptions restarts the {{org.apache.kafka.streams.KafkaStreams}} topology, by calling {{org.apache.kafka.streams.KafkaStreams.stop()}} and then {{org.apache.kafka.streams.KafkaStreams.start()}}. h2. Stacktrace [^RocksDBException_IO-error_stacktrace.txt] h2. Suggested solution To avoid restarting the jvm, modify Kafka Streams to close tasks, which will lead to release of resources - in this case - filesystem LOCK files. h2. Possible solution code Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork Commit: [BUGFIX: When commit fails during rebalance - release resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3] I have been running this fork in production for 3 days and the error doesn't come-up. h2. Note This could be related this issues: KAFKA-3708 and KAFKA-3938 Additinal conversation can be found here: [stream shut down due to no locks for state store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4] > Commit during rebalance does not close RocksDB which later causes: > org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available > > > Key: KAFKA-4455 > URL: https://issues.apache.org/jira/browse/KAFKA-4455 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0 > Environment: Kafka Streams were running on CentOS - I have observed > this - after some time the locks were released e
[jira] [Updated] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
[ https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Davor Poldrugo updated KAFKA-4455: -- Description: h2. Problem description >From time to time a rebalance in Kafka Streams causes the commit to throw >CommitFailedException. When this exception is thrown, the tasks and processors >are not closed. If some processor contains a state store (RocksDB), the >RocksDB is not closed, which leads to not relasead LOCK's on OS level, and >when the Kafka Streams app is trying to open tasks and their respective >processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock >.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted >the locks are released. h2. Additional info I have been running 3 Kafka Streams instances on separate machines with {{num.stream.threads=1}} and each with it's own state directory. Other Kafka Streams apps were running but they had separate directories for state stores. In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on some transient exceptions restarts the {{org.apache.kafka.streams.KafkaStreams}} topology, by calling {{org.apache.kafka.streams.KafkaStreams.stop()}} and then {{org.apache.kafka.streams.KafkaStreams.start()}}. h2. Stacktrace [^RocksDBException_IO-error_stacktrace.txt] h2. Suggested solution To avoid restarting the jvm, modify Kafka Streams to close tasks, which will lead to release of resources - in this case - filesystem LOCK files. h2. Possible solution code Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork Commit: [BUGFIX: When commit fails during rebalance - release resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3] I have been running this fork in production for 3 days and the error doesn't come-up. h2. Note This could be related this issues: KAFKA-3708 and KAFKA-3938 Additinal conversation can be found here: [stream shut down due to no locks for state store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4] was: h2. Problem description >From time to time a rebalance in Kafka Streams causes the commit to throw >CommitFailedException. When this exception is thrown, the tasks and processors >are not closed. If some processor contains a state store (RocksDB), the >RocksDB is not closed, which leads to not relasead LOCK's on OS level, and >when the Kafka Streams app is trying to open tasks and their respective >processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock >.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted >the locks are released. h2. Additional info I have been running 3 Kafka Streams instances on separate machines with {{num.stream.threads=1}} and each with it's own state directory. Other Kafka Streams apps were running but they had separate directories for state stores. In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on some transient exceptions restarts the {{org.apache.kafka.streams.KafkaStreams}} topology, by calling {{org.apache.kafka.streams.KafkaStreams.stop()}} and then {{org.apache.kafka.streams.KafkaStreams.start()}}. h2. Stacktrace [^RocksDBException_IO-error_stacktrace.txt] h2. Suggested solution To avoid restarting the jvm, modify Kafka Streams to close tasks, which will lead to release of resources - in this case - filesystem LOCK files. h2. Possible solution code Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork Commit: [BUGFIX: When commit fails during rebalance - release resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3] I have been running this fork in production for 3 days and the error doesn't come-up. h2. Note This could be related this issues: KAFKA-3708 and KAFKA-3938 > Commit during rebalance does not close RocksDB which later causes: > org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available > > > Key: KAFKA-4455 > URL: https://issues.apache.org/jira/browse/KAFKA-4455 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0 > Environment: Kafka Streams were running on CentOS - I have observed > this - after some time the locks were released even if the jvm/process wasn't > restarted, so I guess CentOS has some lock cleaning policy. >Reporter: Davor Poldrugo > Labels: stacktrace > Attachments:
[jira] [Updated] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
[ https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Davor Poldrugo updated KAFKA-4455: -- Description: h2. Problem description >From time to time a rebalance in Kafka Streams causes the commit to throw >CommitFailedException. When this exception is thrown, the tasks and processors >are not closed. If some processor contains a state store (RocksDB), the >RocksDB is not closed, which leads to not relasead LOCK's on OS level, and >when the Kafka Streams app is trying to open tasks and their respective >processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock >.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted >the locks are released. h2. Additional info I have been running 3 Kafka Streams instances on separate machines with {{num.stream.threads=1}} and each with it's own state directory. Other Kafka Streams apps were running but they had separate directories for state stores. In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on some transient exceptions restarts the {{org.apache.kafka.streams.KafkaStreams}} topology, by calling {{org.apache.kafka.streams.KafkaStreams.stop()}} and then {{org.apache.kafka.streams.KafkaStreams.start()}}. h2. Stacktrace [^RocksDBException_IO-error_stacktrace.txt] h2. Suggested solution To avoid restarting the jvm, modify Kafka Streams to close tasks, which will lead to release of resources - in this case - filesystem LOCK files. h2. Possible solution code Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork Commit: [BUGFIX: When commit fails during rebalance - release resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3] I have been running this fork in production for 3 days and the error doesn't come-up. h2. Note This could be related this issues: KAFKA-3708 and KAFKA-3938 was: h2. Problem description >From time to time a rebalance in Kafka Streams causes the commit to throw >CommitFailedException. When this exception is thrown, the tasks and processors >are not closed. If some processor contains a state store (RocksDB), the >RocksDB is not closed, which leads to not relasead LOCK's on OS level, and >when the Kafka Streams app is trying to open tasks and their respective >processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock >.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted >the locks are released. h2. Additional info I have been running 3 Kafka Streams instances on separate machines with {{num.stream.threads=1}} and each with it's own state directory. Other Kafka Streams apps were running but they had separate directories for state stores. In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on some transient exceptions restarts the {{org.apache.kafka.streams.KafkaStreams}} topology, by calling {{org.apache.kafka.streams.KafkaStreams.stop()}} and then {{org.apache.kafka.streams.KafkaStreams.start()}}. h2. Stacktrace [^RocksDBException_IO-error_stacktrace.txt] h2. Suggested solution To avoid restarting the jvm, modify Kafka Streams to close tasks, which will lead to release of resources - in this case - filesystem LOCK files. h2. Possible solution code Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork Commit: [BUGFIX: When commit fails during rebalance - release resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3] h2. Note This could be related this issues: KAFKA-3708 and KAFKA-3938 > Commit during rebalance does not close RocksDB which later causes: > org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available > > > Key: KAFKA-4455 > URL: https://issues.apache.org/jira/browse/KAFKA-4455 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0 > Environment: Kafka Streams were running on CentOS - I have observed > this - after some time the locks were released even if the jvm/process wasn't > restarted, so I guess CentOS has some lock cleaning policy. >Reporter: Davor Poldrugo > Labels: stacktrace > Attachments: RocksDBException_IO-error_stacktrace.txt > > > h2. Problem description > From time to time a rebalance in Kafka Streams causes the commit to throw > CommitFailedException. When this exception is thrown, the tasks and > processors are not closed. If so
[jira] [Updated] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
[ https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Davor Poldrugo updated KAFKA-4455: -- Description: h2. Problem description >From time to time a rebalance in Kafka Streams causes the commit to throw >CommitFailedException. When this exception is thrown, the tasks and processors >are not closed. If some processor contains a state store (RocksDB), the >RocksDB is not closed, which leads to not relasead LOCK's on OS level, and >when the Kafka Streams app is trying to open tasks and their respective >processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock >.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted >the locks are released. h2. Additional info I have been running 3 Kafka Streams instances on separate machines with {{num.stream.threads=1}} and each with it's own state directory. Other Kafka Streams apps were running but they had separate directories for state stores. In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on some transient exceptions restarts the {{org.apache.kafka.streams.KafkaStreams}} topology, by calling {{org.apache.kafka.streams.KafkaStreams.stop()}} and then {{org.apache.kafka.streams.KafkaStreams.start()}}. h2. Stacktrace [^RocksDBException_IO-error_stacktrace.txt] h2. Suggested solution To avoid restarting the jvm, modify Kafka Streams to close tasks, which will lead to release of resources - in this case - filesystem LOCK files. h2. Possible solution code Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork Commit: [BUGFIX: When commit fails during rebalance - release resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3] h2. Note This could be related this issues: KAFKA-3708 and KAFKA-3938 was: h2. Problem description >From time to time a rebalance in Kafka Streams causes the commit to throw >CommitFailedException. When this exception is thrown, the tasks and processors >are not closed. If some processor contains a state store (RocksDB), the >RocksDB is not closed, which leads to not relasead LOCK's on OS level, and >when the Kafka Streams app is trying to open tasks and their respective >processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock >.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted >the locks are released. h2. Additional info I have been running 3 Kafka Streams instances on separate machines with {{num.stream.threads=1}} and each with it's own state directory. Other Kafka Streams apps were running but they had separate directories for state stores. h2. Stacktrace [^RocksDBException_IO-error_stacktrace.txt] h2. Suggested solution To avoid restarting the jvm, modify Kafka Streams to close tasks, which will lead to release of resources - in this case - filesystem LOCK files. h2. Possible solution code Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork Commit: [BUGFIX: When commit fails during rebalance - release resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3] h2. Note This could be related this issues: KAFKA-3708 and KAFKA-3938 > Commit during rebalance does not close RocksDB which later causes: > org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available > > > Key: KAFKA-4455 > URL: https://issues.apache.org/jira/browse/KAFKA-4455 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0 > Environment: Kafka Streams were running on CentOS - I have observed > this - after some time the locks were released even if the jvm/process wasn't > restarted, so I guess CentOS has some lock cleaning policy. >Reporter: Davor Poldrugo > Labels: stacktrace > Attachments: RocksDBException_IO-error_stacktrace.txt > > > h2. Problem description > From time to time a rebalance in Kafka Streams causes the commit to throw > CommitFailedException. When this exception is thrown, the tasks and > processors are not closed. If some processor contains a state store > (RocksDB), the RocksDB is not closed, which leads to not relasead LOCK's on > OS level, and when the Kafka Streams app is trying to open tasks and their > respective processors and state stores the {{org.rocksdb.RocksDBException: IO > error: lock .../LOCK: No locks available}} is thrown. If the the jvm/process > is restarted the locks are released. > h2. Additional info > I have been running 3 Kafka Streams instances on separate machines with >
[jira] [Updated] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
[ https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Davor Poldrugo updated KAFKA-4455: -- Attachment: RocksDBException_IO-error_stacktrace.txt > Commit during rebalance does not close RocksDB which later causes: > org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available > > > Key: KAFKA-4455 > URL: https://issues.apache.org/jira/browse/KAFKA-4455 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0 > Environment: Kafka Streams were running on CentOS - I have observed > this - after some time the locks were released even if the jvm/process wasn't > restarted, so I guess CentOS has some lock cleaning policy. >Reporter: Davor Poldrugo > Labels: stacktrace > Attachments: RocksDBException_IO-error_stacktrace.txt > > > h2. Problem description > From time to time a rebalance in Kafka Streams causes the commit to throw > CommitFailedException. When this exception is thrown, the tasks and > processors are not closed. If some processor contains a state store > (RocksDB), the RocksDB is not closed, which leads to not relasead LOCK's on > OS level, and when the Kafka Streams app is trying to open tasks and their > respective processors and state stores the {{org.rocksdb.RocksDBException: IO > error: lock .../LOCK: No locks available}} is thrown. If the the jvm/process > is restarted the locks are released. > h2. Additional info > I have been running 3 Kafka Streams instances on separate machines with > {{num.stream.threads=1}} and each with it's own state directory. Other Kafka > Streams apps were running but they had separate directories for state stores. > h2. Stacktrace > [^RocksDBException_IO-error_stacktrace.txt] > h2. Suggested solution > To avoid restarting the jvm, modify Kafka Streams to close tasks, which will > lead to release of resources - in this case - filesystem LOCK files. > h2. Possible solution code > Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork > Commit: [BUGFIX: When commit fails during rebalance - release > resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3] > h2. Note > This could be related this issues: KAFKA-3708 and KAFKA-3938 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
Davor Poldrugo created KAFKA-4455: - Summary: Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available Key: KAFKA-4455 URL: https://issues.apache.org/jira/browse/KAFKA-4455 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.1.0 Environment: Kafka Streams were running on CentOS - I have observed this - after some time the locks were released even if the jvm/process wasn't restarted, so I guess CentOS has some lock cleaning policy. Reporter: Davor Poldrugo h2. Problem description >From time to time a rebalance in Kafka Streams causes the commit to throw >CommitFailedException. When this exception is thrown, the tasks and processors >are not closed. If some processor contains a state store (RocksDB), the >RocksDB is not closed, which leads to not relasead LOCK's on OS level, and >when the Kafka Streams app is trying to open tasks and their respective >processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock >.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted >the locks are released. h2. Additional info I have been running 3 Kafka Streams instances on separate machines with {{num.stream.threads=1}} and each with it's own state directory. Other Kafka Streams apps were running but they had separate directories for state stores. h2. Stacktrace [^RocksDBException_IO-error_stacktrace.txt] h2. Suggested solution To avoid restarting the jvm, modify Kafka Streams to close tasks, which will lead to release of resources - in this case - filesystem LOCK files. h2. Possible solution code Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork Commit: [BUGFIX: When commit fails during rebalance - release resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3] h2. Note This could be related this issues: KAFKA-3708 and KAFKA-3938 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores
[ https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15602159#comment-15602159 ] Davor Poldrugo edited comment on KAFKA-4273 at 10/24/16 2:36 PM: - Is this possibly resolved in 0.10.1.0 (KAFKA-3740) with streams config: *rocksdb.config.setter* ? https://kafka.apache.org/0101/documentation.html#streamsconfigs Description of the config: _A Rocks DB config setter class that implements the RocksDBConfigSetter interface_ Interface is here: https://github.com/apache/kafka/blob/0.10.1.0/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java was (Author: dpoldrugo): Is this possibly resolved in 0.10.1.0 with streams config: *rocksdb.config.setter* ? https://kafka.apache.org/0101/documentation.html#streamsconfigs Description of the config: _A Rocks DB config setter class that implements the RocksDBConfigSetter interface_ Interface is here: https://github.com/apache/kafka/blob/0.10.1.0/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java > Streams DSL - Add TTL / retention period support for intermediate topics and > state stores > - > > Key: KAFKA-4273 > URL: https://issues.apache.org/jira/browse/KAFKA-4273 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Davor Poldrugo >Assignee: Guozhang Wang > > Hi! > I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state > as far as I know - it's not configurable. > In my use case my data has TTL / retnetion period. It's 48 hours. After that > - data can be discarded. > I join two topics: "messages" and "prices" using windowed inner join. > The two intermediate Kafka topics for this join are named: > * messages-prices-join-this-changelog > * messages-prices-join-other-changelog > Since these topics are created as compacted by Kafka Streams, and I don't > wan't to keep data forever, I have altered them to not use compaction. Right > now my RocksDB state stores grow indefinitely, and I don't have any options > to define TTL, or somehow periodically clean the older data. > A "hack" that I use to keep my disk usage low - I have schedulled a job to > periodically stop Kafka Streams instances - one at the time. This triggers a > rebalance, and partitions migrate to other instances. When the instance is > started again, there's another rebalance, and sometimes this instance starts > processing partitions that wasn't processing before the stop - which leads to > deletion of the RocksDB state store for those partitions > (state.cleanup.delay.ms). In the next rebalance the local store is recreated > with a restore consumer - which reads data from - as previously mentioned - a > non compacted topic. And this effectively leads to a "hacked TTL support" in > Kafka Streams DSL. > Questions: > * Do you think would be reasonable to add support in the DSL api to define > TTL for local store? > * Which opens another question - there are use cases which don't need the > intermediate topics to be created as "compact". Could also this be added to > the DSL api? Maybe only this could be added, and this flag should also be > used for the RocksDB TTL. Of course in this case another config would be > mandatory - the retention period or TTL for the intermediate topics and the > state stores. I saw there is a new cleanup.policy - compact_and_delete - > added with KAFKA-4015. > * Which also leads to another question, maybe some intermediate topics / > state stores need different TTL, so a it's not as simple as that. But after > KAFKA-3870, it will be easier. > RocksDB supports TTL: > * > https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166 > * https://github.com/facebook/rocksdb/wiki/Time-to-Live > * > https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java > A somehow similar issue: KAFKA-4212 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores
[ https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15602159#comment-15602159 ] Davor Poldrugo edited comment on KAFKA-4273 at 10/24/16 2:34 PM: - Is this possibly resolved in 0.10.1.0 with streams config: *rocksdb.config.setter* ? https://kafka.apache.org/0101/documentation.html#streamsconfigs Description of the config: _A Rocks DB config setter class that implements the RocksDBConfigSetter interface_ Interface is here: https://github.com/apache/kafka/blob/0.10.1.0/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java was (Author: dpoldrugo): Is this possibly resolved in 0.10.1.0 with streams config: *rocksdb.config.setter* ? http://kafka.apache.org/documentation#streamsconfigs Description of the config: _A Rocks DB config setter class that implements the RocksDBConfigSetter interface_ Interface is here: https://github.com/apache/kafka/blob/0.10.1.0/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java > Streams DSL - Add TTL / retention period support for intermediate topics and > state stores > - > > Key: KAFKA-4273 > URL: https://issues.apache.org/jira/browse/KAFKA-4273 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Davor Poldrugo >Assignee: Guozhang Wang > > Hi! > I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state > as far as I know - it's not configurable. > In my use case my data has TTL / retnetion period. It's 48 hours. After that > - data can be discarded. > I join two topics: "messages" and "prices" using windowed inner join. > The two intermediate Kafka topics for this join are named: > * messages-prices-join-this-changelog > * messages-prices-join-other-changelog > Since these topics are created as compacted by Kafka Streams, and I don't > wan't to keep data forever, I have altered them to not use compaction. Right > now my RocksDB state stores grow indefinitely, and I don't have any options > to define TTL, or somehow periodically clean the older data. > A "hack" that I use to keep my disk usage low - I have schedulled a job to > periodically stop Kafka Streams instances - one at the time. This triggers a > rebalance, and partitions migrate to other instances. When the instance is > started again, there's another rebalance, and sometimes this instance starts > processing partitions that wasn't processing before the stop - which leads to > deletion of the RocksDB state store for those partitions > (state.cleanup.delay.ms). In the next rebalance the local store is recreated > with a restore consumer - which reads data from - as previously mentioned - a > non compacted topic. And this effectively leads to a "hacked TTL support" in > Kafka Streams DSL. > Questions: > * Do you think would be reasonable to add support in the DSL api to define > TTL for local store? > * Which opens another question - there are use cases which don't need the > intermediate topics to be created as "compact". Could also this be added to > the DSL api? Maybe only this could be added, and this flag should also be > used for the RocksDB TTL. Of course in this case another config would be > mandatory - the retention period or TTL for the intermediate topics and the > state stores. I saw there is a new cleanup.policy - compact_and_delete - > added with KAFKA-4015. > * Which also leads to another question, maybe some intermediate topics / > state stores need different TTL, so a it's not as simple as that. But after > KAFKA-3870, it will be easier. > RocksDB supports TTL: > * > https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166 > * https://github.com/facebook/rocksdb/wiki/Time-to-Live > * > https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java > A somehow similar issue: KAFKA-4212 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores
[ https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15602159#comment-15602159 ] Davor Poldrugo commented on KAFKA-4273: --- Is this possibly resolved in 0.10.1.0 with streams config: *rocksdb.config.setter* ? http://kafka.apache.org/documentation#streamsconfigs Description of the config: _A Rocks DB config setter class that implements the RocksDBConfigSetter interface_ Interface is here: https://github.com/apache/kafka/blob/0.10.1.0/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java > Streams DSL - Add TTL / retention period support for intermediate topics and > state stores > - > > Key: KAFKA-4273 > URL: https://issues.apache.org/jira/browse/KAFKA-4273 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Davor Poldrugo >Assignee: Guozhang Wang > > Hi! > I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state > as far as I know - it's not configurable. > In my use case my data has TTL / retnetion period. It's 48 hours. After that > - data can be discarded. > I join two topics: "messages" and "prices" using windowed inner join. > The two intermediate Kafka topics for this join are named: > * messages-prices-join-this-changelog > * messages-prices-join-other-changelog > Since these topics are created as compacted by Kafka Streams, and I don't > wan't to keep data forever, I have altered them to not use compaction. Right > now my RocksDB state stores grow indefinitely, and I don't have any options > to define TTL, or somehow periodically clean the older data. > A "hack" that I use to keep my disk usage low - I have schedulled a job to > periodically stop Kafka Streams instances - one at the time. This triggers a > rebalance, and partitions migrate to other instances. When the instance is > started again, there's another rebalance, and sometimes this instance starts > processing partitions that wasn't processing before the stop - which leads to > deletion of the RocksDB state store for those partitions > (state.cleanup.delay.ms). In the next rebalance the local store is recreated > with a restore consumer - which reads data from - as previously mentioned - a > non compacted topic. And this effectively leads to a "hacked TTL support" in > Kafka Streams DSL. > Questions: > * Do you think would be reasonable to add support in the DSL api to define > TTL for local store? > * Which opens another question - there are use cases which don't need the > intermediate topics to be created as "compact". Could also this be added to > the DSL api? Maybe only this could be added, and this flag should also be > used for the RocksDB TTL. Of course in this case another config would be > mandatory - the retention period or TTL for the intermediate topics and the > state stores. I saw there is a new cleanup.policy - compact_and_delete - > added with KAFKA-4015. > * Which also leads to another question, maybe some intermediate topics / > state stores need different TTL, so a it's not as simple as that. But after > KAFKA-3870, it will be easier. > RocksDB supports TTL: > * > https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166 > * https://github.com/facebook/rocksdb/wiki/Time-to-Live > * > https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java > A somehow similar issue: KAFKA-4212 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores
[ https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Davor Poldrugo updated KAFKA-4273: -- Description: Hi! I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state as far as I know - it's not configurable. In my use case my data has TTL / retnetion period. It's 48 hours. After that - data can be discarded. I join two topics: "messages" and "prices" using windowed inner join. The two intermediate Kafka topics for this join are named: * messages-prices-join-this-changelog * messages-prices-join-other-changelog Since these topics are created as compacted by Kafka Streams, and I don't wan't to keep data forever, I have altered them to not use compaction. Right now my RocksDB state stores grow indefinitely, and I don't have any options to define TTL, or somehow periodically clean the older data. A "hack" that I use to keep my disk usage low - I have schedulled a job to periodically stop Kafka Streams instances - one at the time. This triggers a rebalance, and partitions migrate to other instances. When the instance is started again, there's another rebalance, and sometimes this instance starts processing partitions that wasn't processing before the stop - which leads to deletion of the RocksDB state store for those partitions (state.cleanup.delay.ms). In the next rebalance the local store is recreated with a restore consumer - which reads data from - as previously mentioned - a non compacted topic. And this effectively leads to a "hacked TTL support" in Kafka Streams DSL. Questions: * Do you think would be reasonable to add support in the DSL api to define TTL for local store? * Which opens another question - there are use cases which don't need the intermediate topics to be created as "compact". Could also this be added to the DSL api? Maybe only this could be added, and this flag should also be used for the RocksDB TTL. Of course in this case another config would be mandatory - the retention period or TTL for the intermediate topics and the state stores. I saw there is a new cleanup.policy - compact_and_delete - added with KAFKA-4015. * Which also leads to another question, maybe some intermediate topics / state stores need different TTL, so a it's not as simple as that. But after KAFKA-3870, it will be easier. RocksDB supports TTL: * https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166 * https://github.com/facebook/rocksdb/wiki/Time-to-Live * https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java A somehow similar issue: KAFKA-4212 was: Hi! I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state as far as I know - it's not configurable. In my use case my data has TTL / retnetion period. It's 48 hours. After that - data can be discarded. I join two topics: "messages" and "prices" using windowed inner join. The two intermediate Kafka topics for this join are named: * messages-prices-join-this-changelog * messages-prices-join-other-changelog Since these topics are created as compacted by Kafka Streams, and I don't wan't to keep data forever, I have altered them to not use compaction. Right now my RocksDB state stores grow indefinitely, and I don't have any options to define TTL, or somehow periodically clean the older data. I A "hack" that I use to keep my disk usage low - I have schedulled a job to periodically stop Kafka Streams instances - one at the time. This triggers a rebalance, and partitions migrate to other instances. When the instance is started again, there's another rebalance, and sometimes this instance starts processing partitions that wasn't processing before the stop - which leads to deletion of the RocksDB state store for those partitions (state.cleanup.delay.ms). In the next rebalance the local store is recreated with a restore consumer - which reads data from - as previously mentioned - a non compacted topic. And this effectively leads to a "hacked TTL support" in Kafka Streams DSL. Questions: * Do you think would be reasonable to add support in the DSL api to define TTL for local store? * Which opens another question - there are use cases which don't need the intermediate topics to be created as "compact". Could also this be added to the DSL api? Maybe only this could be added, and this flag should also be used for the RocksDB TTL. Of course in this case another config would be mandatory - the retention period or TTL for the intermediate topics and the state stores. I saw there is a new cleanup.policy - compact_and_delete - added with KAFKA-4015. * Which also leads to another question, maybe some intermediate topics / state stores need different TTL, so a it's not as simple as that. But after KAFKA-3870, it will be easier. RocksDB supports TTL: * http
[jira] [Updated] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores
[ https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Davor Poldrugo updated KAFKA-4273: -- Description: Hi! I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state as far as I know - it's not configurable. In my use case my data has TTL / retnetion period. It's 48 hours. After that - data can be discarded. I join two topics: "messages" and "prices" using windowed inner join. The two intermediate Kafka topics for this join are named: * messages-prices-join-this-changelog * messages-prices-join-other-changelog Since these topics are created as compacted by Kafka Streams, and I don't wan't to keep data forever, I have altered them to not use compaction. Right now my RocksDB state stores grow indefinitely, and I don't have any options to define TTL, or somehow periodically clean the older data. I A "hack" that I use to keep my disk usage low - I have schedulled a job to periodically stop Kafka Streams instances - one at the time. This triggers a rebalance, and partitions migrate to other instances. When the instance is started again, there's another rebalance, and sometimes this instance starts processing partitions that wasn't processing before the stop - which leads to deletion of the RocksDB state store for those partitions (state.cleanup.delay.ms). In the next rebalance the local store is recreated with a restore consumer - which reads data from - as previously mentioned - a non compacted topic. And this effectively leads to a "hacked TTL support" in Kafka Streams DSL. Questions: * Do you think would be reasonable to add support in the DSL api to define TTL for local store? * Which opens another question - there are use cases which don't need the intermediate topics to be created as "compact". Could also this be added to the DSL api? Maybe only this could be added, and this flag should also be used for the RocksDB TTL. Of course in this case another config would be mandatory - the retention period or TTL for the intermediate topics and the state stores. I saw there is a new cleanup.policy - compact_and_delete - added with KAFKA-4015. * Which also leads to another question, maybe some intermediate topics / state stores need different TTL, so a it's not as simple as that. But after KAFKA-3870, it will be easier. RocksDB supports TTL: * https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166 * https://github.com/facebook/rocksdb/wiki/Time-to-Live * https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java A somehow similar issue: KAFKA-4212 was: Hi! I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state as far as I know - it's not configurable. In my use case my data has TTL / retnetion period. It's 48 hours. After that - data can be discarded. I join two topics: "messages" and "prices" using windowed inner join. The two intermediate Kafka topics for this join are named: * messages-prices-join-this-changelog * messages-prices-join-other-changelog Since these topics are created as compacted by Kafka Streams, and I don't wan't to keep data forever, I have altered them to not use compaction. Right now my RocksDB state stores grow indefinitely, and I don't have any options to define TTL, or somehow periodically clean the older data. I A "hack" that I use to keep my disk usage low - I have schedulled a job to periodically stop Kafka Streams instances - one at the time. This triggers a rebalance, and partitions migrate to other instances. When the instance is started again, there's another rebalance, and sometimes this instance starts processing partitions that wasn't processing before the stop - which leads to deletion of the RocksDB state store for those partitions (state.cleanup.delay.ms). In the next rebalance the local store is recreated with a restore consumer - which reads data from - as previously mentioned - a non compacted topic. And this effectively leads to a "hacked TTL support" in Kafka Streams DSL. Questions: * Do you think would be reasonable to add support in the DSL api to define TTL for local store? * Which opens another question - there are use cases which don't need the intermediate topics to be created as "compact". Could also this be added to the DSL api? Maybe only this could be added, and this flag should also be used for the RocksDB TTL. Of course in this case another config would be mandatory - the retention period or TTL for the intermediate topics and the state stores. * Which also leads to another question, maybe some intermediate topics / state stores need different TTL, so a it's not as simple as that. RocksDB supports TTL: * https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
[jira] [Updated] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores
[ https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Davor Poldrugo updated KAFKA-4273: -- Description: Hi! I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state as far as I know - it's not configurable. In my use case my data has TTL / retnetion period. It's 48 hours. After that - data can be discarded. I join two topics: "messages" and "prices" using windowed inner join. The two intermediate Kafka topics for this join are named: * messages-prices-join-this-changelog * messages-prices-join-other-changelog Since these topics are created as compacted by Kafka Streams, and I don't wan't to keep data forever, I have altered them to not use compaction. Right now my RocksDB state stores grow indefinitely, and I don't have any options to define TTL, or somehow periodically clean the older data. I A "hack" that I use to keep my disk usage low - I have schedulled a job to periodically stop Kafka Streams instances - one at the time. This triggers a rebalance, and partitions migrate to other instances. When the instance is started again, there's another rebalance, and sometimes this instance starts processing partitions that wasn't processing before the stop - which leads to deletion of the RocksDB state store for those partitions (state.cleanup.delay.ms). In the next rebalance the local store is recreated with a restore consumer - which reads data from - as previously mentioned - a non compacted topic. And this effectively leads to a "hacked TTL support" in Kafka Streams DSL. Questions: * Do you think would be reasonable to add support in the DSL api to define TTL for local store? * Which opens another question - there are use cases which don't need the intermediate topics to be created as "compact". Could also this be added to the DSL api? Maybe only this could be added, and this flag should also be used for the RocksDB TTL. Of course in this case another config would be mandatory - the retention period or TTL for the intermediate topics and the state stores. * Which also leads to another question, maybe some intermediate topics / state stores need different TTL, so a it's not as simple as that. RocksDB supports TTL: * https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166 * https://github.com/facebook/rocksdb/wiki/Time-to-Live * https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java A somehow similar issue: KAFKA-4212 was: Hi! I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state as far as I know - it's not configurable. In my use case my data has TTL / retnetion period. It's 48 hours. After that - data can be discarded. I join two topics: "messages" and "prices" using windowed inner join. The two intermediate Kafka topics for this join are named: * messages-prices-join-this-changelog * messages-prices-join-other-changelog Since these topics are created as compacted by Kafka Streams, and I don't wan't to keep data forever, I have altered them to not use compaction. Right now my RocksDB state stores grow indefinitely, and I don't have any options to define TTL, or somehow periodically clean the older data. I A "hack" that I use to keep my disk usage low - I have schedulled a job to periodically stop Kafka Streams instances - one at the time. This triggers a rebalance, and partitions migrate to other instances. When the instance is started again, there's another rebalance, and sometimes this instance starts processing partitions that wasn't processing before the stop - which leads to deletion of the RocksDB state store for those partitions (state.cleanup.delay.ms). In the next rebalance the local store is recreated with a restore consumer - which reads data from - as previously mentioned - a non compacted topic. And this effectively leads to a "hacked TTL support" in Kafka Streams DSL. Questions: * Do you think would be reasonable to add support in the DSL api to define TTL for local store? * Which opens another question - there are use cases which don't need the intermediate topics to be created as "compact". Could also this be added to the DSL api? Maybe only this could be added, and this flag should also be used for the RocksDB TTL. Of course in this case another config would be mandatory - the retention period or TTL for the intermediate topics and the state stores. * Which also leads to another question, maybe some intermediate topics / state stores need different TTL, so a it's not as simple as that. RocksDB supports TTL: * https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166 * https://github.com/facebook/rocksdb/wiki/Time-to-Live * https://github.com/facebook/rocksdb/blob/master/java/src/m
[jira] [Created] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores
Davor Poldrugo created KAFKA-4273: - Summary: Streams DSL - Add TTL / retention period support for intermediate topics and state stores Key: KAFKA-4273 URL: https://issues.apache.org/jira/browse/KAFKA-4273 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.0.1 Reporter: Davor Poldrugo Assignee: Guozhang Wang Hi! I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state as far as I know - it's not configurable. In my use case my data has TTL / retnetion period. It's 48 hours. After that - data can be discarded. I join two topics: "messages" and "prices" using windowed inner join. The two intermediate Kafka topics for this join are named: * messages-prices-join-this-changelog * messages-prices-join-other-changelog Since these topics are created as compacted by Kafka Streams, and I don't wan't to keep data forever, I have altered them to not use compaction. Right now my RocksDB state stores grow indefinitely, and I don't have any options to define TTL, or somehow periodically clean the older data. I A "hack" that I use to keep my disk usage low - I have schedulled a job to periodically stop Kafka Streams instances - one at the time. This triggers a rebalance, and partitions migrate to other instances. When the instance is started again, there's another rebalance, and sometimes this instance starts processing partitions that wasn't processing before the stop - which leads to deletion of the RocksDB state store for those partitions (state.cleanup.delay.ms). In the next rebalance the local store is recreated with a restore consumer - which reads data from - as previously mentioned - a non compacted topic. And this effectively leads to a "hacked TTL support" in Kafka Streams DSL. Questions: * Do you think would be reasonable to add support in the DSL api to define TTL for local store? * Which opens another question - there are use cases which don't need the intermediate topics to be created as "compact". Could also this be added to the DSL api? Maybe only this could be added, and this flag should also be used for the RocksDB TTL. Of course in this case another config would be mandatory - the retention period or TTL for the intermediate topics and the state stores. * Which also leads to another question, maybe some intermediate topics / state stores need different TTL, so a it's not as simple as that. RocksDB supports TTL: * https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166 * https://github.com/facebook/rocksdb/wiki/Time-to-Live * https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java -- This message was sent by Atlassian JIRA (v6.3.4#6332)