[jira] [Commented] (KAFKA-5055) Kafka Streams skipped-records-rate sensor producing nonzero values even when FailOnInvalidTimestamp is used as extractor

2017-04-29 Thread Davor Poldrugo (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15990086#comment-15990086
 ] 

Davor Poldrugo commented on KAFKA-5055:
---

Hi guys!
I think the problem is because of a bug in the method 
{{org.apache.kafka.streams.processor.internals.StreamTask#addRecords}}:

{code:java}
public int addRecords(TopicPartition partition, 
Iterable> records) {
final int oldQueueSize = partitionGroup.numBuffered();
final int newQueueSize = partitionGroup.addRawRecords(partition, 
records);

log.trace("{} Added records into the buffered queue of partition {}, 
new queue size is {}", logPrefix, partition, newQueueSize);

// if after adding these records, its partition queue's buffered size 
has been
// increased beyond the threshold, we can then pause the consumption 
for this partition
if (newQueueSize > this.maxBufferedSize) {
consumer.pause(singleton(partition));
}

return newQueueSize - oldQueueSize;
}
{code}

This line is the problem:
{{final int oldQueueSize = partitionGroup.numBuffered();}}

Instead of getting {{oldQueueSize}} for current TopicPartition it gets queue 
size for all the partitions, because {{partitionGroup.numBuffered()}} returns 
the queue size across all partitions.

Then {{return newQueueSize - oldQueueSize}} returns a negative value. Because 
it's more probable that the sum of all Queue sizes across partitions is bigger 
then the Queue size of this partition.

This goes back to 
{{org.apache.kafka.streams.processor.internals.StreamThread#runLoop}}:
{code:java}
for (TopicPartition partition : records.partitions()) {
StreamTask task = activeTasksByPartition.get(partition);
numAddedRecords += task.addRecords(partition, records.records(partition));
}
streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, 
timerStartedMs);
{code}

There it gets summed into {{numAddedRecords}} - by actually decreasing the 
value (because it's negative), and this leads to wrong sensor value in 
{{records.count() - numAddedRecords}}, because {{numAddedRecords}} is now 
smaller because of a bug, not because of an invalid timestamp.

BugFix proposal:
In the method 
{{org.apache.kafka.streams.processor.internals.StreamTask#addRecords}}
change:
{{final int oldQueueSize = partitionGroup.numBuffered();}}
to
{{final int oldQueueSize = partitionGroup.numBuffered(partition);}}

If this is the cause of the bug, can I do a pull request?

> Kafka Streams skipped-records-rate sensor producing nonzero values even when 
> FailOnInvalidTimestamp is used as extractor
> 
>
> Key: KAFKA-5055
> URL: https://issues.apache.org/jira/browse/KAFKA-5055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Nikki Thean
>Assignee: Guozhang Wang
>
> According to the code and the documentation for this metric, the only reason 
> for a skipped record is an invalid timestamp, except that a) I am reading 
> from a topic that is populated solely by Kafka Connect and b) I am using 
> `FailOnInvalidTimestamp` as the timestamp extractor.
> Either I'm missing something in the documentation (i.e. another reason for 
> skipped records) or there is a bug in the code that calculates this metric.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4455) CommitFailedException during rebalance doesn't release resources in tasks/processors

2017-01-09 Thread Davor Poldrugo (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15812326#comment-15812326
 ] 

Davor Poldrugo commented on KAFKA-4455:
---

This should be resolved by KAFKA-4561, so this issue can probably be closed.

> CommitFailedException during rebalance doesn't release resources in 
> tasks/processors
> 
>
> Key: KAFKA-4455
> URL: https://issues.apache.org/jira/browse/KAFKA-4455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Kafka Streams were running on CentOS - I have observed 
> this - after some time the locks were released even if the jvm/process wasn't 
> restarted, so I guess CentOS has some lock cleaning policy.
>Reporter: Davor Poldrugo
>  Labels: stacktrace
> Attachments: RocksDBException_IO-error_stacktrace.txt
>
>
> h2. Problem description
> From time to time a rebalance in Kafka Streams causes the commit to throw 
> CommitFailedException. When this exception is thrown, the tasks and 
> processors are not closed. If some processor contains a state store 
> (RocksDB), the RocksDB is not closed, which leads to not relasead LOCK's on 
> OS level, and when the Kafka Streams app is trying to open tasks and their 
> respective processors and state stores the {{org.rocksdb.RocksDBException: IO 
> error: lock .../LOCK: No locks available}} is thrown. If the the jvm/process 
> is restarted the locks are released.
> h2. Additional info
> I have been running 3 Kafka Streams instances on separate machines with 
> {{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
> Streams apps were running on the same machines but they had separate 
> directories for state stores. In the attached logs you can see 
> {{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented 
> some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on 
> some transient exceptions restarts the 
> {{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
> {{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
> {{org.apache.kafka.streams.KafkaStreams.start()}}. This causes the thread 
> names to have bigger numbers.
> h2. Stacktrace
> [^RocksDBException_IO-error_stacktrace.txt] 
> h2. Suggested solution
> To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
> lead to release of resources - in this case - filesystem LOCK files.
> h2. Possible solution code
> Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
> Commit: [BUGFIX: When commit fails during rebalance - release 
> resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
> I have been running this fork in production for 3 days and the error doesn't 
> come-up.
> h2. Note
> This could be related this issues: KAFKA-3708 and KAFKA-3938
> Additinal conversation can be found here: [stream shut down due to no locks 
> for state 
> store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4455) CommitFailedException during rebalance doesn't release resources in tasks/processors

2016-11-29 Thread Davor Poldrugo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davor Poldrugo updated KAFKA-4455:
--
Summary: CommitFailedException during rebalance doesn't release resources 
in tasks/processors  (was: CommitFailedException during rebalance doesn't 
release resources in processors)

> CommitFailedException during rebalance doesn't release resources in 
> tasks/processors
> 
>
> Key: KAFKA-4455
> URL: https://issues.apache.org/jira/browse/KAFKA-4455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Kafka Streams were running on CentOS - I have observed 
> this - after some time the locks were released even if the jvm/process wasn't 
> restarted, so I guess CentOS has some lock cleaning policy.
>Reporter: Davor Poldrugo
>  Labels: stacktrace
> Attachments: RocksDBException_IO-error_stacktrace.txt
>
>
> h2. Problem description
> From time to time a rebalance in Kafka Streams causes the commit to throw 
> CommitFailedException. When this exception is thrown, the tasks and 
> processors are not closed. If some processor contains a state store 
> (RocksDB), the RocksDB is not closed, which leads to not relasead LOCK's on 
> OS level, and when the Kafka Streams app is trying to open tasks and their 
> respective processors and state stores the {{org.rocksdb.RocksDBException: IO 
> error: lock .../LOCK: No locks available}} is thrown. If the the jvm/process 
> is restarted the locks are released.
> h2. Additional info
> I have been running 3 Kafka Streams instances on separate machines with 
> {{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
> Streams apps were running on the same machines but they had separate 
> directories for state stores. In the attached logs you can see 
> {{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented 
> some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on 
> some transient exceptions restarts the 
> {{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
> {{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
> {{org.apache.kafka.streams.KafkaStreams.start()}}. This causes the thread 
> names to have bigger numbers.
> h2. Stacktrace
> [^RocksDBException_IO-error_stacktrace.txt] 
> h2. Suggested solution
> To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
> lead to release of resources - in this case - filesystem LOCK files.
> h2. Possible solution code
> Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
> Commit: [BUGFIX: When commit fails during rebalance - release 
> resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
> I have been running this fork in production for 3 days and the error doesn't 
> come-up.
> h2. Note
> This could be related this issues: KAFKA-3708 and KAFKA-3938
> Additinal conversation can be found here: [stream shut down due to no locks 
> for state 
> store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4455) CommitFailedException during rebalance doesn't release resources in processors

2016-11-29 Thread Davor Poldrugo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davor Poldrugo updated KAFKA-4455:
--
Summary: CommitFailedException during rebalance doesn't release resources 
in processors  (was: Commit during rebalance does not close RocksDB which later 
causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks 
available)

> CommitFailedException during rebalance doesn't release resources in processors
> --
>
> Key: KAFKA-4455
> URL: https://issues.apache.org/jira/browse/KAFKA-4455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Kafka Streams were running on CentOS - I have observed 
> this - after some time the locks were released even if the jvm/process wasn't 
> restarted, so I guess CentOS has some lock cleaning policy.
>Reporter: Davor Poldrugo
>  Labels: stacktrace
> Attachments: RocksDBException_IO-error_stacktrace.txt
>
>
> h2. Problem description
> From time to time a rebalance in Kafka Streams causes the commit to throw 
> CommitFailedException. When this exception is thrown, the tasks and 
> processors are not closed. If some processor contains a state store 
> (RocksDB), the RocksDB is not closed, which leads to not relasead LOCK's on 
> OS level, and when the Kafka Streams app is trying to open tasks and their 
> respective processors and state stores the {{org.rocksdb.RocksDBException: IO 
> error: lock .../LOCK: No locks available}} is thrown. If the the jvm/process 
> is restarted the locks are released.
> h2. Additional info
> I have been running 3 Kafka Streams instances on separate machines with 
> {{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
> Streams apps were running on the same machines but they had separate 
> directories for state stores. In the attached logs you can see 
> {{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented 
> some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on 
> some transient exceptions restarts the 
> {{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
> {{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
> {{org.apache.kafka.streams.KafkaStreams.start()}}. This causes the thread 
> names to have bigger numbers.
> h2. Stacktrace
> [^RocksDBException_IO-error_stacktrace.txt] 
> h2. Suggested solution
> To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
> lead to release of resources - in this case - filesystem LOCK files.
> h2. Possible solution code
> Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
> Commit: [BUGFIX: When commit fails during rebalance - release 
> resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
> I have been running this fork in production for 3 days and the error doesn't 
> come-up.
> h2. Note
> This could be related this issues: KAFKA-3708 and KAFKA-3938
> Additinal conversation can be found here: [stream shut down due to no locks 
> for state 
> store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available

2016-11-28 Thread Davor Poldrugo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davor Poldrugo updated KAFKA-4455:
--
Description: 
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running on the same machines but they had separate 
directories for state stores. In the attached logs you can see 
{{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented 
some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on 
some transient exceptions restarts the 
{{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
{{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
{{org.apache.kafka.streams.KafkaStreams.start()}}. This causes the thread names 
to have bigger numbers.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
I have been running this fork in production for 3 days and the error doesn't 
come-up.

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938
Additinal conversation can be found here: [stream shut down due to no locks for 
state 
store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4]

  was:
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running on the same machines but they had separate 
directories for state stores. In the attached logs you can see 
{{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented 
some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on 
some transient exceptions restarts the 
{{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
{{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
{{org.apache.kafka.streams.KafkaStreams.start()}}.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
I have been running this fork in production for 3 days and the error doesn't 
come-up.

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938
Additinal conversation can be found here: [stream shut down due to no locks for 
state 
store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4]


> Commit during rebalance does not close RocksDB which later causes: 
> org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
> 
>
> Key: KAFKA-4455
> URL: https://issues.apache.org/jira/browse/KAFKA-4455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Kafka Streams were running on 

[jira] [Updated] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available

2016-11-28 Thread Davor Poldrugo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davor Poldrugo updated KAFKA-4455:
--
Description: 
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running on the same machines but they had separate 
directories for state stores. In the attached logs you can see 
{{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented 
some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on 
some transient exceptions restarts the 
{{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
{{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
{{org.apache.kafka.streams.KafkaStreams.start()}}.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
I have been running this fork in production for 3 days and the error doesn't 
come-up.

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938
Additinal conversation can be found here: [stream shut down due to no locks for 
state 
store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4]

  was:
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running but they had separate directories for state stores. 
In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 
StreamThreads, I have implemented some Kafka Streams restart policy in my 
{{UncaughtExceptionHandler}} which on some transient exceptions restarts the 
{{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
{{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
{{org.apache.kafka.streams.KafkaStreams.start()}}.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
I have been running this fork in production for 3 days and the error doesn't 
come-up.

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938
Additinal conversation can be found here: [stream shut down due to no locks for 
state 
store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4]


> Commit during rebalance does not close RocksDB which later causes: 
> org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
> 
>
> Key: KAFKA-4455
> URL: https://issues.apache.org/jira/browse/KAFKA-4455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Kafka Streams were running on CentOS - I have observed 
> this - after some time the locks were released e

[jira] [Updated] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available

2016-11-28 Thread Davor Poldrugo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davor Poldrugo updated KAFKA-4455:
--
Description: 
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running but they had separate directories for state stores. 
In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 
StreamThreads, I have implemented some Kafka Streams restart policy in my 
{{UncaughtExceptionHandler}} which on some transient exceptions restarts the 
{{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
{{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
{{org.apache.kafka.streams.KafkaStreams.start()}}.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
I have been running this fork in production for 3 days and the error doesn't 
come-up.

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938
Additinal conversation can be found here: [stream shut down due to no locks for 
state 
store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4]

  was:
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running but they had separate directories for state stores. 
In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 
StreamThreads, I have implemented some Kafka Streams restart policy in my 
{{UncaughtExceptionHandler}} which on some transient exceptions restarts the 
{{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
{{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
{{org.apache.kafka.streams.KafkaStreams.start()}}.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
I have been running this fork in production for 3 days and the error doesn't 
come-up.

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938


> Commit during rebalance does not close RocksDB which later causes: 
> org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
> 
>
> Key: KAFKA-4455
> URL: https://issues.apache.org/jira/browse/KAFKA-4455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Kafka Streams were running on CentOS - I have observed 
> this - after some time the locks were released even if the jvm/process wasn't 
> restarted, so I guess CentOS has some lock cleaning policy.
>Reporter: Davor Poldrugo
>  Labels: stacktrace
> Attachments: 

[jira] [Updated] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available

2016-11-28 Thread Davor Poldrugo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davor Poldrugo updated KAFKA-4455:
--
Description: 
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running but they had separate directories for state stores. 
In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 
StreamThreads, I have implemented some Kafka Streams restart policy in my 
{{UncaughtExceptionHandler}} which on some transient exceptions restarts the 
{{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
{{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
{{org.apache.kafka.streams.KafkaStreams.start()}}.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
I have been running this fork in production for 3 days and the error doesn't 
come-up.

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938

  was:
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running but they had separate directories for state stores. 
In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 
StreamThreads, I have implemented some Kafka Streams restart policy in my 
{{UncaughtExceptionHandler}} which on some transient exceptions restarts the 
{{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
{{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
{{org.apache.kafka.streams.KafkaStreams.start()}}.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938


> Commit during rebalance does not close RocksDB which later causes: 
> org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
> 
>
> Key: KAFKA-4455
> URL: https://issues.apache.org/jira/browse/KAFKA-4455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Kafka Streams were running on CentOS - I have observed 
> this - after some time the locks were released even if the jvm/process wasn't 
> restarted, so I guess CentOS has some lock cleaning policy.
>Reporter: Davor Poldrugo
>  Labels: stacktrace
> Attachments: RocksDBException_IO-error_stacktrace.txt
>
>
> h2. Problem description
> From time to time a rebalance in Kafka Streams causes the commit to throw 
> CommitFailedException. When this exception is thrown, the tasks and 
> processors are not closed. If so

[jira] [Updated] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available

2016-11-28 Thread Davor Poldrugo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davor Poldrugo updated KAFKA-4455:
--
Description: 
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running but they had separate directories for state stores. 
In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 
StreamThreads, I have implemented some Kafka Streams restart policy in my 
{{UncaughtExceptionHandler}} which on some transient exceptions restarts the 
{{org.apache.kafka.streams.KafkaStreams}} topology, by calling 
{{org.apache.kafka.streams.KafkaStreams.stop()}} and then 
{{org.apache.kafka.streams.KafkaStreams.start()}}.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938

  was:
h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running but they had separate directories for state stores.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938


> Commit during rebalance does not close RocksDB which later causes: 
> org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
> 
>
> Key: KAFKA-4455
> URL: https://issues.apache.org/jira/browse/KAFKA-4455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Kafka Streams were running on CentOS - I have observed 
> this - after some time the locks were released even if the jvm/process wasn't 
> restarted, so I guess CentOS has some lock cleaning policy.
>Reporter: Davor Poldrugo
>  Labels: stacktrace
> Attachments: RocksDBException_IO-error_stacktrace.txt
>
>
> h2. Problem description
> From time to time a rebalance in Kafka Streams causes the commit to throw 
> CommitFailedException. When this exception is thrown, the tasks and 
> processors are not closed. If some processor contains a state store 
> (RocksDB), the RocksDB is not closed, which leads to not relasead LOCK's on 
> OS level, and when the Kafka Streams app is trying to open tasks and their 
> respective processors and state stores the {{org.rocksdb.RocksDBException: IO 
> error: lock .../LOCK: No locks available}} is thrown. If the the jvm/process 
> is restarted the locks are released.
> h2. Additional info
> I have been running 3 Kafka Streams instances on separate machines with 
> 

[jira] [Updated] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available

2016-11-28 Thread Davor Poldrugo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davor Poldrugo updated KAFKA-4455:
--
Attachment: RocksDBException_IO-error_stacktrace.txt

> Commit during rebalance does not close RocksDB which later causes: 
> org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
> 
>
> Key: KAFKA-4455
> URL: https://issues.apache.org/jira/browse/KAFKA-4455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: Kafka Streams were running on CentOS - I have observed 
> this - after some time the locks were released even if the jvm/process wasn't 
> restarted, so I guess CentOS has some lock cleaning policy.
>Reporter: Davor Poldrugo
>  Labels: stacktrace
> Attachments: RocksDBException_IO-error_stacktrace.txt
>
>
> h2. Problem description
> From time to time a rebalance in Kafka Streams causes the commit to throw 
> CommitFailedException. When this exception is thrown, the tasks and 
> processors are not closed. If some processor contains a state store 
> (RocksDB), the RocksDB is not closed, which leads to not relasead LOCK's on 
> OS level, and when the Kafka Streams app is trying to open tasks and their 
> respective processors and state stores the {{org.rocksdb.RocksDBException: IO 
> error: lock .../LOCK: No locks available}} is thrown. If the the jvm/process 
> is restarted the locks are released.
> h2. Additional info
> I have been running 3 Kafka Streams instances on separate machines with 
> {{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
> Streams apps were running but they had separate directories for state stores.
> h2. Stacktrace
> [^RocksDBException_IO-error_stacktrace.txt] 
> h2. Suggested solution
> To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
> lead to release of resources - in this case - filesystem LOCK files.
> h2. Possible solution code
> Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
> Commit: [BUGFIX: When commit fails during rebalance - release 
> resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
> h2. Note
> This could be related this issues: KAFKA-3708 and KAFKA-3938



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4455) Commit during rebalance does not close RocksDB which later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available

2016-11-28 Thread Davor Poldrugo (JIRA)
Davor Poldrugo created KAFKA-4455:
-

 Summary: Commit during rebalance does not close RocksDB which 
later causes: org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks 
available
 Key: KAFKA-4455
 URL: https://issues.apache.org/jira/browse/KAFKA-4455
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.0
 Environment: Kafka Streams were running on CentOS - I have observed 
this - after some time the locks were released even if the jvm/process wasn't 
restarted, so I guess CentOS has some lock cleaning policy.
Reporter: Davor Poldrugo


h2. Problem description
>From time to time a rebalance in Kafka Streams causes the commit to throw 
>CommitFailedException. When this exception is thrown, the tasks and processors 
>are not closed. If some processor contains a state store (RocksDB), the 
>RocksDB is not closed, which leads to not relasead LOCK's on OS level, and 
>when the Kafka Streams app is trying to open tasks and their respective 
>processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock 
>.../LOCK: No locks available}} is thrown. If the the jvm/process is restarted 
>the locks are released.

h2. Additional info
I have been running 3 Kafka Streams instances on separate machines with 
{{num.stream.threads=1}} and each with it's own state directory. Other Kafka 
Streams apps were running but they had separate directories for state stores.

h2. Stacktrace
[^RocksDBException_IO-error_stacktrace.txt] 

h2. Suggested solution
To avoid restarting the jvm, modify Kafka Streams to close tasks, which will 
lead to release of resources - in this case - filesystem LOCK files.

h2. Possible solution code
Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
Commit: [BUGFIX: When commit fails during rebalance - release 
resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]

h2. Note
This could be related this issues: KAFKA-3708 and KAFKA-3938



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2016-10-24 Thread Davor Poldrugo (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15602159#comment-15602159
 ] 

Davor Poldrugo edited comment on KAFKA-4273 at 10/24/16 2:36 PM:
-

Is this possibly resolved in 0.10.1.0 (KAFKA-3740) with streams config: 
*rocksdb.config.setter* ?
https://kafka.apache.org/0101/documentation.html#streamsconfigs

Description of the config: _A Rocks DB config setter class that implements the 
RocksDBConfigSetter interface_

Interface is here: 
https://github.com/apache/kafka/blob/0.10.1.0/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java


was (Author: dpoldrugo):
Is this possibly resolved in 0.10.1.0 with streams config: 
*rocksdb.config.setter* ?
https://kafka.apache.org/0101/documentation.html#streamsconfigs

Description of the config: _A Rocks DB config setter class that implements the 
RocksDBConfigSetter interface_

Interface is here: 
https://github.com/apache/kafka/blob/0.10.1.0/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java

> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Assignee: Guozhang Wang
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2016-10-24 Thread Davor Poldrugo (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15602159#comment-15602159
 ] 

Davor Poldrugo edited comment on KAFKA-4273 at 10/24/16 2:34 PM:
-

Is this possibly resolved in 0.10.1.0 with streams config: 
*rocksdb.config.setter* ?
https://kafka.apache.org/0101/documentation.html#streamsconfigs

Description of the config: _A Rocks DB config setter class that implements the 
RocksDBConfigSetter interface_

Interface is here: 
https://github.com/apache/kafka/blob/0.10.1.0/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java


was (Author: dpoldrugo):
Is this possibly resolved in 0.10.1.0 with streams config: 
*rocksdb.config.setter* ?
http://kafka.apache.org/documentation#streamsconfigs

Description of the config: _A Rocks DB config setter class that implements the 
RocksDBConfigSetter interface_

Interface is here: 
https://github.com/apache/kafka/blob/0.10.1.0/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java

> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Assignee: Guozhang Wang
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2016-10-24 Thread Davor Poldrugo (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15602159#comment-15602159
 ] 

Davor Poldrugo commented on KAFKA-4273:
---

Is this possibly resolved in 0.10.1.0 with streams config: 
*rocksdb.config.setter* ?
http://kafka.apache.org/documentation#streamsconfigs

Description of the config: _A Rocks DB config setter class that implements the 
RocksDBConfigSetter interface_

Interface is here: 
https://github.com/apache/kafka/blob/0.10.1.0/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java

> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Assignee: Guozhang Wang
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2016-10-10 Thread Davor Poldrugo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davor Poldrugo updated KAFKA-4273:
--
Description: 
Hi!
I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state as 
far as I know - it's not configurable.
In my use case my data has TTL / retnetion period. It's 48 hours. After that - 
data can be discarded.

I join two topics: "messages" and "prices" using windowed inner join.
The two intermediate Kafka topics for this join are named:
 * messages-prices-join-this-changelog
 * messages-prices-join-other-changelog

Since these topics are created as compacted by Kafka Streams, and I don't wan't 
to keep data forever, I have altered them to not use compaction. Right now my 
RocksDB state stores grow indefinitely, and I don't have any options to define 
TTL, or somehow periodically clean the older data.

A "hack" that I use to keep my disk usage low - I have schedulled a job to 
periodically stop Kafka Streams instances - one at the time. This triggers a 
rebalance, and partitions migrate to other instances. When the instance is 
started again, there's another rebalance, and sometimes this instance starts 
processing partitions that wasn't processing before the stop - which leads to 
deletion of the RocksDB state store for those partitions 
(state.cleanup.delay.ms). In the next rebalance the local store is recreated 
with a restore consumer - which reads data from - as previously mentioned - a 
non compacted topic. And this effectively leads to a "hacked TTL support" in 
Kafka Streams DSL.

Questions:
 * Do you think would be reasonable to add support in the DSL api to define TTL 
for local store?
 * Which opens another question - there are use cases which don't need the 
intermediate topics to be created as "compact". Could also this be added to the 
DSL api? Maybe only this could be added, and this flag should also be used for 
the RocksDB TTL. Of course in this case another config would be mandatory - the 
retention period or TTL for the intermediate topics and the state stores. I saw 
there is a new cleanup.policy - compact_and_delete - added with KAFKA-4015.
 * Which also leads to another question, maybe some intermediate topics / state 
stores need different TTL, so a it's not as simple as that. But after 
KAFKA-3870, it will be easier.

RocksDB supports TTL:
 * 
https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
 * https://github.com/facebook/rocksdb/wiki/Time-to-Live
 * 
https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java

A somehow similar issue: KAFKA-4212



  was:
Hi!
I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state as 
far as I know - it's not configurable.
In my use case my data has TTL / retnetion period. It's 48 hours. After that - 
data can be discarded.

I join two topics: "messages" and "prices" using windowed inner join.
The two intermediate Kafka topics for this join are named:
 * messages-prices-join-this-changelog
 * messages-prices-join-other-changelog

Since these topics are created as compacted by Kafka Streams, and I don't wan't 
to keep data forever, I have altered them to not use compaction. Right now my 
RocksDB state stores grow indefinitely, and I don't have any options to define 
TTL, or somehow periodically clean the older data. I 

A "hack" that I use to keep my disk usage low - I have schedulled a job to 
periodically stop Kafka Streams instances - one at the time. This triggers a 
rebalance, and partitions migrate to other instances. When the instance is 
started again, there's another rebalance, and sometimes this instance starts 
processing partitions that wasn't processing before the stop - which leads to 
deletion of the RocksDB state store for those partitions 
(state.cleanup.delay.ms). In the next rebalance the local store is recreated 
with a restore consumer - which reads data from - as previously mentioned - a 
non compacted topic. And this effectively leads to a "hacked TTL support" in 
Kafka Streams DSL.

Questions:
 * Do you think would be reasonable to add support in the DSL api to define TTL 
for local store?
 * Which opens another question - there are use cases which don't need the 
intermediate topics to be created as "compact". Could also this be added to the 
DSL api? Maybe only this could be added, and this flag should also be used for 
the RocksDB TTL. Of course in this case another config would be mandatory - the 
retention period or TTL for the intermediate topics and the state stores. I saw 
there is a new cleanup.policy - compact_and_delete - added with KAFKA-4015.
 * Which also leads to another question, maybe some intermediate topics / state 
stores need different TTL, so a it's not as simple as that. But after 
KAFKA-3870, it will be easier.

RocksDB supports TTL:
 * 
http

[jira] [Updated] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2016-10-07 Thread Davor Poldrugo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davor Poldrugo updated KAFKA-4273:
--
Description: 
Hi!
I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state as 
far as I know - it's not configurable.
In my use case my data has TTL / retnetion period. It's 48 hours. After that - 
data can be discarded.

I join two topics: "messages" and "prices" using windowed inner join.
The two intermediate Kafka topics for this join are named:
 * messages-prices-join-this-changelog
 * messages-prices-join-other-changelog

Since these topics are created as compacted by Kafka Streams, and I don't wan't 
to keep data forever, I have altered them to not use compaction. Right now my 
RocksDB state stores grow indefinitely, and I don't have any options to define 
TTL, or somehow periodically clean the older data. I 

A "hack" that I use to keep my disk usage low - I have schedulled a job to 
periodically stop Kafka Streams instances - one at the time. This triggers a 
rebalance, and partitions migrate to other instances. When the instance is 
started again, there's another rebalance, and sometimes this instance starts 
processing partitions that wasn't processing before the stop - which leads to 
deletion of the RocksDB state store for those partitions 
(state.cleanup.delay.ms). In the next rebalance the local store is recreated 
with a restore consumer - which reads data from - as previously mentioned - a 
non compacted topic. And this effectively leads to a "hacked TTL support" in 
Kafka Streams DSL.

Questions:
 * Do you think would be reasonable to add support in the DSL api to define TTL 
for local store?
 * Which opens another question - there are use cases which don't need the 
intermediate topics to be created as "compact". Could also this be added to the 
DSL api? Maybe only this could be added, and this flag should also be used for 
the RocksDB TTL. Of course in this case another config would be mandatory - the 
retention period or TTL for the intermediate topics and the state stores. I saw 
there is a new cleanup.policy - compact_and_delete - added with KAFKA-4015.
 * Which also leads to another question, maybe some intermediate topics / state 
stores need different TTL, so a it's not as simple as that. But after 
KAFKA-3870, it will be easier.

RocksDB supports TTL:
 * 
https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
 * https://github.com/facebook/rocksdb/wiki/Time-to-Live
 * 
https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java

A somehow similar issue: KAFKA-4212



  was:
Hi!
I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state as 
far as I know - it's not configurable.
In my use case my data has TTL / retnetion period. It's 48 hours. After that - 
data can be discarded.

I join two topics: "messages" and "prices" using windowed inner join.
The two intermediate Kafka topics for this join are named:
 * messages-prices-join-this-changelog
 * messages-prices-join-other-changelog

Since these topics are created as compacted by Kafka Streams, and I don't wan't 
to keep data forever, I have altered them to not use compaction. Right now my 
RocksDB state stores grow indefinitely, and I don't have any options to define 
TTL, or somehow periodically clean the older data. I 

A "hack" that I use to keep my disk usage low - I have schedulled a job to 
periodically stop Kafka Streams instances - one at the time. This triggers a 
rebalance, and partitions migrate to other instances. When the instance is 
started again, there's another rebalance, and sometimes this instance starts 
processing partitions that wasn't processing before the stop - which leads to 
deletion of the RocksDB state store for those partitions 
(state.cleanup.delay.ms). In the next rebalance the local store is recreated 
with a restore consumer - which reads data from - as previously mentioned - a 
non compacted topic. And this effectively leads to a "hacked TTL support" in 
Kafka Streams DSL.

Questions:
 * Do you think would be reasonable to add support in the DSL api to define TTL 
for local store?
 * Which opens another question - there are use cases which don't need the 
intermediate topics to be created as "compact". Could also this be added to the 
DSL api? Maybe only this could be added, and this flag should also be used for 
the RocksDB TTL. Of course in this case another config would be mandatory - the 
retention period or TTL for the intermediate topics and the state stores.
 * Which also leads to another question, maybe some intermediate topics / state 
stores need different TTL, so a it's not as simple as that.

RocksDB supports TTL:
 * 
https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java

[jira] [Updated] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2016-10-07 Thread Davor Poldrugo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davor Poldrugo updated KAFKA-4273:
--
Description: 
Hi!
I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state as 
far as I know - it's not configurable.
In my use case my data has TTL / retnetion period. It's 48 hours. After that - 
data can be discarded.

I join two topics: "messages" and "prices" using windowed inner join.
The two intermediate Kafka topics for this join are named:
 * messages-prices-join-this-changelog
 * messages-prices-join-other-changelog

Since these topics are created as compacted by Kafka Streams, and I don't wan't 
to keep data forever, I have altered them to not use compaction. Right now my 
RocksDB state stores grow indefinitely, and I don't have any options to define 
TTL, or somehow periodically clean the older data. I 

A "hack" that I use to keep my disk usage low - I have schedulled a job to 
periodically stop Kafka Streams instances - one at the time. This triggers a 
rebalance, and partitions migrate to other instances. When the instance is 
started again, there's another rebalance, and sometimes this instance starts 
processing partitions that wasn't processing before the stop - which leads to 
deletion of the RocksDB state store for those partitions 
(state.cleanup.delay.ms). In the next rebalance the local store is recreated 
with a restore consumer - which reads data from - as previously mentioned - a 
non compacted topic. And this effectively leads to a "hacked TTL support" in 
Kafka Streams DSL.

Questions:
 * Do you think would be reasonable to add support in the DSL api to define TTL 
for local store?
 * Which opens another question - there are use cases which don't need the 
intermediate topics to be created as "compact". Could also this be added to the 
DSL api? Maybe only this could be added, and this flag should also be used for 
the RocksDB TTL. Of course in this case another config would be mandatory - the 
retention period or TTL for the intermediate topics and the state stores.
 * Which also leads to another question, maybe some intermediate topics / state 
stores need different TTL, so a it's not as simple as that.

RocksDB supports TTL:
 * 
https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
 * https://github.com/facebook/rocksdb/wiki/Time-to-Live
 * 
https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java

A somehow similar issue: KAFKA-4212



  was:
Hi!
I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state as 
far as I know - it's not configurable.
In my use case my data has TTL / retnetion period. It's 48 hours. After that - 
data can be discarded.

I join two topics: "messages" and "prices" using windowed inner join.
The two intermediate Kafka topics for this join are named:
 * messages-prices-join-this-changelog
 * messages-prices-join-other-changelog

Since these topics are created as compacted by Kafka Streams, and I don't wan't 
to keep data forever, I have altered them to not use compaction. Right now my 
RocksDB state stores grow indefinitely, and I don't have any options to define 
TTL, or somehow periodically clean the older data. I 

A "hack" that I use to keep my disk usage low - I have schedulled a job to 
periodically stop Kafka Streams instances - one at the time. This triggers a 
rebalance, and partitions migrate to other instances. When the instance is 
started again, there's another rebalance, and sometimes this instance starts 
processing partitions that wasn't processing before the stop - which leads to 
deletion of the RocksDB state store for those partitions 
(state.cleanup.delay.ms). In the next rebalance the local store is recreated 
with a restore consumer - which reads data from - as previously mentioned - a 
non compacted topic. And this effectively leads to a "hacked TTL support" in 
Kafka Streams DSL.

Questions:
 * Do you think would be reasonable to add support in the DSL api to define TTL 
for local store?
 * Which opens another question - there are use cases which don't need the 
intermediate topics to be created as "compact". Could also this be added to the 
DSL api? Maybe only this could be added, and this flag should also be used for 
the RocksDB TTL. Of course in this case another config would be mandatory - the 
retention period or TTL for the intermediate topics and the state stores.
 * Which also leads to another question, maybe some intermediate topics / state 
stores need different TTL, so a it's not as simple as that.

RocksDB supports TTL:
 * 
https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
 * https://github.com/facebook/rocksdb/wiki/Time-to-Live
 * 
https://github.com/facebook/rocksdb/blob/master/java/src/m

[jira] [Created] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2016-10-07 Thread Davor Poldrugo (JIRA)
Davor Poldrugo created KAFKA-4273:
-

 Summary: Streams DSL - Add TTL / retention period support for 
intermediate topics and state stores
 Key: KAFKA-4273
 URL: https://issues.apache.org/jira/browse/KAFKA-4273
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.0.1
Reporter: Davor Poldrugo
Assignee: Guozhang Wang


Hi!
I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state as 
far as I know - it's not configurable.
In my use case my data has TTL / retnetion period. It's 48 hours. After that - 
data can be discarded.

I join two topics: "messages" and "prices" using windowed inner join.
The two intermediate Kafka topics for this join are named:
 * messages-prices-join-this-changelog
 * messages-prices-join-other-changelog

Since these topics are created as compacted by Kafka Streams, and I don't wan't 
to keep data forever, I have altered them to not use compaction. Right now my 
RocksDB state stores grow indefinitely, and I don't have any options to define 
TTL, or somehow periodically clean the older data. I 

A "hack" that I use to keep my disk usage low - I have schedulled a job to 
periodically stop Kafka Streams instances - one at the time. This triggers a 
rebalance, and partitions migrate to other instances. When the instance is 
started again, there's another rebalance, and sometimes this instance starts 
processing partitions that wasn't processing before the stop - which leads to 
deletion of the RocksDB state store for those partitions 
(state.cleanup.delay.ms). In the next rebalance the local store is recreated 
with a restore consumer - which reads data from - as previously mentioned - a 
non compacted topic. And this effectively leads to a "hacked TTL support" in 
Kafka Streams DSL.

Questions:
 * Do you think would be reasonable to add support in the DSL api to define TTL 
for local store?
 * Which opens another question - there are use cases which don't need the 
intermediate topics to be created as "compact". Could also this be added to the 
DSL api? Maybe only this could be added, and this flag should also be used for 
the RocksDB TTL. Of course in this case another config would be mandatory - the 
retention period or TTL for the intermediate topics and the state stores.
 * Which also leads to another question, maybe some intermediate topics / state 
stores need different TTL, so a it's not as simple as that.

RocksDB supports TTL:
 * 
https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
 * https://github.com/facebook/rocksdb/wiki/Time-to-Live
 * 
https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)