[jira] [Issue Comment Deleted] (KAFKA-4392) Failed to lock the state directory due to an unexpected exception

2017-03-22 Thread Elias Levy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy updated KAFKA-4392:
--
Comment: was deleted

(was: I am still seeing this error in 0.10.2.0 during rebalances.  Reopen or 
create a new issue?

WARN  2017-03-22 19:06:14,423 [StreamThread-20][StreamThread.java:1184] : Could 
not create task 3_346. Will retry.
org.apache.kafka.streams.errors.LockException: task [3_346] Failed to lock the 
state directory: /data/kafka_streams/some_job/3_346
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102)
at 
org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
at 
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
at 
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)

)

> Failed to lock the state directory due to an unexpected exception
> -
>
> Key: KAFKA-4392
> URL: https://issues.apache.org/jira/browse/KAFKA-4392
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Ara Ebrahimi
>Assignee: Guozhang Wang
> Fix For: 0.10.2.0
>
>
> This happened on streaming startup, on a clean installation, no existing 
> folder. Here I was starting 4 instances of our streaming app on 4 machines 
> and one threw this exception. Seems to me there’s a race condition somewhere 
> when instances discover others, or something like that.
> 2016-11-02 15:43:47 INFO  StreamRunner:59 - Started http server successfully.
> 2016-11-02 15:44:50 ERROR StateDirectory:147 - Failed to lock the state 
> directory due to an unexpected exception
> java.nio.file.NoSuchFileException: 
> /data/1/kafka-streams/myapp-streams/7_21/.lock
>   at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>   at 
> sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:177)
>   at java.nio.channels.FileChannel.open(FileChannel.java:287)
>   at java.nio.channels.FileChannel.open(FileChannel.java:335)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.getOrCreateFileChannel(StateDirectory.java:176)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.lock(StateDirectory.java:90)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:140)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeClean(StreamThread.java:552)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:459)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> ^C
> [arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/7_21/
> ls: cannot access /data/1/kafka-streams/myapp-streams/7_21/: No such file or 
> directory
> [arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/
> total 4
> drwxr-xr-x 74 root root 4096 Nov  2 15:44 .
> drwxr-xr-x  3 root root   27 Nov  2 15:43 ..
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_1
> drwxr-xr-x  3 root root   32 Nov  2 

[jira] [Commented] (KAFKA-4392) Failed to lock the state directory due to an unexpected exception

2017-03-22 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15937388#comment-15937388
 ] 

Elias Levy commented on KAFKA-4392:
---

I am still seeing this error in 0.10.2.0 during rebalances.  Reopen or create a 
new issue?

WARN  2017-03-22 19:06:14,423 [StreamThread-20][StreamThread.java:1184] : Could 
not create task 3_346. Will retry.
org.apache.kafka.streams.errors.LockException: task [3_346] Failed to lock the 
state directory: /data/kafka_streams/some_job/3_346
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102)
at 
org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
at 
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
at 
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)



> Failed to lock the state directory due to an unexpected exception
> -
>
> Key: KAFKA-4392
> URL: https://issues.apache.org/jira/browse/KAFKA-4392
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Ara Ebrahimi
>Assignee: Guozhang Wang
> Fix For: 0.10.2.0
>
>
> This happened on streaming startup, on a clean installation, no existing 
> folder. Here I was starting 4 instances of our streaming app on 4 machines 
> and one threw this exception. Seems to me there’s a race condition somewhere 
> when instances discover others, or something like that.
> 2016-11-02 15:43:47 INFO  StreamRunner:59 - Started http server successfully.
> 2016-11-02 15:44:50 ERROR StateDirectory:147 - Failed to lock the state 
> directory due to an unexpected exception
> java.nio.file.NoSuchFileException: 
> /data/1/kafka-streams/myapp-streams/7_21/.lock
>   at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>   at 
> sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:177)
>   at java.nio.channels.FileChannel.open(FileChannel.java:287)
>   at java.nio.channels.FileChannel.open(FileChannel.java:335)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.getOrCreateFileChannel(StateDirectory.java:176)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.lock(StateDirectory.java:90)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:140)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeClean(StreamThread.java:552)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:459)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> ^C
> [arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/7_21/
> ls: cannot access /data/1/kafka-streams/myapp-streams/7_21/: No such file or 
> directory
> [arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/
> total 4
> drwxr-xr-x 74 root root 4096 Nov  2 15:44 .
> drwxr-xr-x  3 root root   27 Nov  2 15:43 ..
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_1
> drwxr-xr-x  3 root root   32 

[jira] [Commented] (KAFKA-4392) Failed to lock the state directory due to an unexpected exception

2017-03-22 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15937389#comment-15937389
 ] 

Elias Levy commented on KAFKA-4392:
---

I am still seeing this error in 0.10.2.0 during rebalances.  Reopen or create a 
new issue?

WARN  2017-03-22 19:06:14,423 [StreamThread-20][StreamThread.java:1184] : Could 
not create task 3_346. Will retry.
org.apache.kafka.streams.errors.LockException: task [3_346] Failed to lock the 
state directory: /data/kafka_streams/some_job/3_346
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102)
at 
org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
at 
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
at 
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)



> Failed to lock the state directory due to an unexpected exception
> -
>
> Key: KAFKA-4392
> URL: https://issues.apache.org/jira/browse/KAFKA-4392
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Ara Ebrahimi
>Assignee: Guozhang Wang
> Fix For: 0.10.2.0
>
>
> This happened on streaming startup, on a clean installation, no existing 
> folder. Here I was starting 4 instances of our streaming app on 4 machines 
> and one threw this exception. Seems to me there’s a race condition somewhere 
> when instances discover others, or something like that.
> 2016-11-02 15:43:47 INFO  StreamRunner:59 - Started http server successfully.
> 2016-11-02 15:44:50 ERROR StateDirectory:147 - Failed to lock the state 
> directory due to an unexpected exception
> java.nio.file.NoSuchFileException: 
> /data/1/kafka-streams/myapp-streams/7_21/.lock
>   at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>   at 
> sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:177)
>   at java.nio.channels.FileChannel.open(FileChannel.java:287)
>   at java.nio.channels.FileChannel.open(FileChannel.java:335)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.getOrCreateFileChannel(StateDirectory.java:176)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.lock(StateDirectory.java:90)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:140)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeClean(StreamThread.java:552)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:459)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> ^C
> [arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/7_21/
> ls: cannot access /data/1/kafka-streams/myapp-streams/7_21/: No such file or 
> directory
> [arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/
> total 4
> drwxr-xr-x 74 root root 4096 Nov  2 15:44 .
> drwxr-xr-x  3 root root   27 Nov  2 15:43 ..
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_1
> drwxr-xr-x  3 root root   32 

[jira] [Updated] (KAFKA-4919) Document that stores must not be closed when Processors are closed

2017-03-22 Thread Elias Levy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy updated KAFKA-4919:
--
Summary: Document that stores must not be closed when Processors are closed 
 (was: Streams job fails with InvalidStateStoreException: Store is currently 
closed)

> Document that stores must not be closed when Processors are closed
> --
>
> Key: KAFKA-4919
> URL: https://issues.apache.org/jira/browse/KAFKA-4919
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Elias Levy
>
> I have a streams job, that previously worked, that consumes and writes to a 
> large number of topics with many partitions and that uses many threads.  I 
> upgraded the job to 0.10.2.0.  The job now fails after a short time running, 
> seemingly after a rebalance.
> {quote}
> WARN  2017-03-19 18:03:20,432 [StreamThread-10][StreamThread.java:160] : 
> Unexpected state transition from RUNNING to NOT_RUNNING
> {quote}
> The first observation is that Streams is no longer outputting exceptions and 
> backtraces.  I had to add code to get this information.
> The exception:
> {quote}
> Exception: org.apache.kafka.streams.errors.StreamsException: Exception caught 
> in process. taskId=1_225, processor=KSTREAM-SOURCE-03, 
> topic=some_topic, partition=225, offset=266411
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_225, processor=KSTREAM-SOURCE-03, topic=some_topic, 
> partition=225, offset=266411
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: Store 
> someStore-201701060400 is currently closed
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:205)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:221)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:74)
>   at 
> org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54)
>   at 
> org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
>   ... X more
> {quote}
> The error occurs for many partitions.
> This was preceded by (for this partition):
> {quote}
> INFO  2017-03-19 18:03:16,403 [StreamThread-10][ConsumerCoordinator.java:393] 
> : Revoking previously assigned partitions [some_topic-225] for group some_job
> INFO  2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:254] : 
> stream-thread [StreamThread-10] partitions [[some_topic-225]] revoked at the 
> beginning of consumer rebalance.
> INFO  2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:1056] : 
> stream-thread [StreamThread-10] Closing a task's topology 1_225
> INFO  2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:544] : 
> stream-thread [StreamThread-10] Flushing state stores of task 1_225
> INFO  2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:534] : 
> stream-thread [StreamThread-10] Committing consumer offsets of task 1_225
> INFO  2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1012] : 
> stream-thread [StreamThread-10] Updating suspended tasks to contain active 
> tasks [[1_225, 0_445, 0_30]]
> INFO  2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1019] : 
> stream-thread [StreamThread-10] Removing all active tasks [[1_225, 0_445, 
> 0_30]]
> INFO  2017-03-19 18:03:19,925 [StreamThread-10][ConsumerCoordinator.java:252] 
> : Setting newly assigned partitions [some_tpoic-225] for group some_job
> INFO  2017-03-19 18:03:19,927 [StreamThread-10][StreamThread.java:228] : 
> stream-thread [StreamThread-10] New partitions [[some_topic-225]] assigned at 
> the end of consumer rebalance.
> INFO  2017-03-19 18:03:19,929 [StreamThread-10][StreamTask.java:333] : task 
> [1_225] Initializing processor nodes of the topology
> Something happens.  What ???
> INFO  2017-03-19 18:03:20,135 [StreamThread-10][StreamThread.java:1045] : 
> stream-thread [StreamThread-10] Closing a task 1_225
> INFO  2017-03-19 18:03:20,355 [StreamThread-10][StreamThread.java:544] : 
> stream-thread [StreamThread-10] Flushing state stores of task 1_225
> INFO  2017-03-19 18:03:20,355 [StreamThread-10][StreamThread.java:523] : 
> stream-thread 

[jira] [Commented] (KAFKA-4919) Streams job fails with InvalidStateStoreException: Store is currently closed

2017-03-22 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15936714#comment-15936714
 ] 

Elias Levy commented on KAFKA-4919:
---

Thanks for the information.  It is something that should probably be mentioned 
in the release notes.

> Streams job fails with InvalidStateStoreException: Store is currently closed
> 
>
> Key: KAFKA-4919
> URL: https://issues.apache.org/jira/browse/KAFKA-4919
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Elias Levy
>
> I have a streams job, that previously worked, that consumes and writes to a 
> large number of topics with many partitions and that uses many threads.  I 
> upgraded the job to 0.10.2.0.  The job now fails after a short time running, 
> seemingly after a rebalance.
> {quote}
> WARN  2017-03-19 18:03:20,432 [StreamThread-10][StreamThread.java:160] : 
> Unexpected state transition from RUNNING to NOT_RUNNING
> {quote}
> The first observation is that Streams is no longer outputting exceptions and 
> backtraces.  I had to add code to get this information.
> The exception:
> {quote}
> Exception: org.apache.kafka.streams.errors.StreamsException: Exception caught 
> in process. taskId=1_225, processor=KSTREAM-SOURCE-03, 
> topic=some_topic, partition=225, offset=266411
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_225, processor=KSTREAM-SOURCE-03, topic=some_topic, 
> partition=225, offset=266411
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: Store 
> someStore-201701060400 is currently closed
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:205)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:221)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:74)
>   at 
> org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54)
>   at 
> org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
>   ... X more
> {quote}
> The error occurs for many partitions.
> This was preceded by (for this partition):
> {quote}
> INFO  2017-03-19 18:03:16,403 [StreamThread-10][ConsumerCoordinator.java:393] 
> : Revoking previously assigned partitions [some_topic-225] for group some_job
> INFO  2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:254] : 
> stream-thread [StreamThread-10] partitions [[some_topic-225]] revoked at the 
> beginning of consumer rebalance.
> INFO  2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:1056] : 
> stream-thread [StreamThread-10] Closing a task's topology 1_225
> INFO  2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:544] : 
> stream-thread [StreamThread-10] Flushing state stores of task 1_225
> INFO  2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:534] : 
> stream-thread [StreamThread-10] Committing consumer offsets of task 1_225
> INFO  2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1012] : 
> stream-thread [StreamThread-10] Updating suspended tasks to contain active 
> tasks [[1_225, 0_445, 0_30]]
> INFO  2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1019] : 
> stream-thread [StreamThread-10] Removing all active tasks [[1_225, 0_445, 
> 0_30]]
> INFO  2017-03-19 18:03:19,925 [StreamThread-10][ConsumerCoordinator.java:252] 
> : Setting newly assigned partitions [some_tpoic-225] for group some_job
> INFO  2017-03-19 18:03:19,927 [StreamThread-10][StreamThread.java:228] : 
> stream-thread [StreamThread-10] New partitions [[some_topic-225]] assigned at 
> the end of consumer rebalance.
> INFO  2017-03-19 18:03:19,929 [StreamThread-10][StreamTask.java:333] : task 
> [1_225] Initializing processor nodes of the topology
> Something happens.  What ???
> INFO  2017-03-19 18:03:20,135 [StreamThread-10][StreamThread.java:1045] : 
> stream-thread [StreamThread-10] Closing a task 1_225
> INFO  2017-03-19 18:03:20,355 [StreamThread-10][StreamThread.java:544] : 
> stream-thread [StreamThread-10] Flushing state stores of task 1_225
> INFO  2017-03-19 18:03:20,355 [StreamThread-10][StreamThread.java:523] : 
> stream-thread 

[jira] [Commented] (KAFKA-4919) Streams job fails with InvalidStateStoreException: Store is currently closed

2017-03-21 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15935711#comment-15935711
 ] 

Elias Levy commented on KAFKA-4919:
---

I believe I found the issue.  I am closing the store when my {{Processor}} is 
closed.  The {{Processor}} is closed during rebalance via 
{{onPartitionsRevoked}} -> {{suspendTasksAndStat}} -> 
{{closeAllTasksTopologies}} -> {{performOnAllTasks}} -> {{closeTopology}} -> 
{{ProcessorNode.close}}.

It was my understanding that it was proper to close state stores used by a 
{{Processor}} when the {{Processor}} is closed. The examples in the low-level 
API documentation follow this pattern (See 
[https://kafka.apache.org/0102/documentation/streams#streams_processor] and 
[https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java]).

Has this changed in 0.10.2.0?  Or is this a new issue only affecting the 
refactored segmented stores?

> Streams job fails with InvalidStateStoreException: Store is currently closed
> 
>
> Key: KAFKA-4919
> URL: https://issues.apache.org/jira/browse/KAFKA-4919
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Elias Levy
>
> I have a streams job, that previously worked, that consumes and writes to a 
> large number of topics with many partitions and that uses many threads.  I 
> upgraded the job to 0.10.2.0.  The job now fails after a short time running, 
> seemingly after a rebalance.
> {quote}
> WARN  2017-03-19 18:03:20,432 [StreamThread-10][StreamThread.java:160] : 
> Unexpected state transition from RUNNING to NOT_RUNNING
> {quote}
> The first observation is that Streams is no longer outputting exceptions and 
> backtraces.  I had to add code to get this information.
> The exception:
> {quote}
> Exception: org.apache.kafka.streams.errors.StreamsException: Exception caught 
> in process. taskId=1_225, processor=KSTREAM-SOURCE-03, 
> topic=some_topic, partition=225, offset=266411
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_225, processor=KSTREAM-SOURCE-03, topic=some_topic, 
> partition=225, offset=266411
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: Store 
> someStore-201701060400 is currently closed
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:205)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:221)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:74)
>   at 
> org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54)
>   at 
> org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
>   ... X more
> {quote}
> The error occurs for many partitions.
> This was preceded by (for this partition):
> {quote}
> INFO  2017-03-19 18:03:16,403 [StreamThread-10][ConsumerCoordinator.java:393] 
> : Revoking previously assigned partitions [some_topic-225] for group some_job
> INFO  2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:254] : 
> stream-thread [StreamThread-10] partitions [[some_topic-225]] revoked at the 
> beginning of consumer rebalance.
> INFO  2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:1056] : 
> stream-thread [StreamThread-10] Closing a task's topology 1_225
> INFO  2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:544] : 
> stream-thread [StreamThread-10] Flushing state stores of task 1_225
> INFO  2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:534] : 
> stream-thread [StreamThread-10] Committing consumer offsets of task 1_225
> INFO  2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1012] : 
> stream-thread [StreamThread-10] Updating suspended tasks to contain active 
> tasks [[1_225, 0_445, 0_30]]
> INFO  2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1019] : 
> stream-thread [StreamThread-10] Removing all active tasks [[1_225, 0_445, 
> 0_30]]
> INFO  2017-03-19 18:03:19,925 [StreamThread-10][ConsumerCoordinator.java:252] 
> : Setting newly assigned partitions [some_tpoic-225] for group some_job
> INFO  

[jira] [Commented] (KAFKA-4919) Streams job fails with InvalidStateStoreException: Store is currently closed

2017-03-21 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15935252#comment-15935252
 ] 

Elias Levy commented on KAFKA-4919:
---

The issue appears to be that a {{RocksDBStore}} segment of a 
{{RocksDBSegmentedBytesStore}} is closed during a call to {{put}} when the 
store is reused after a consumer rebalance.  I've reviewed the code, but I 
can't see how the segment could be closed after the rebalance.  If it is a 
newly crated segment, it will be open.  If it is closed, then it should have 
been destroyed and removed from the segments list in {{Segments.cleanup}}.

Thoughts?

> Streams job fails with InvalidStateStoreException: Store is currently closed
> 
>
> Key: KAFKA-4919
> URL: https://issues.apache.org/jira/browse/KAFKA-4919
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Elias Levy
>
> I have a streams job, that previously worked, that consumes and writes to a 
> large number of topics with many partitions and that uses many threads.  I 
> upgraded the job to 0.10.2.0.  The job now fails after a short time running, 
> seemingly after a rebalance.
> {quote}
> WARN  2017-03-19 18:03:20,432 [StreamThread-10][StreamThread.java:160] : 
> Unexpected state transition from RUNNING to NOT_RUNNING
> {quote}
> The first observation is that Streams is no longer outputting exceptions and 
> backtraces.  I had to add code to get this information.
> The exception:
> {quote}
> Exception: org.apache.kafka.streams.errors.StreamsException: Exception caught 
> in process. taskId=1_225, processor=KSTREAM-SOURCE-03, 
> topic=some_topic, partition=225, offset=266411
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_225, processor=KSTREAM-SOURCE-03, topic=some_topic, 
> partition=225, offset=266411
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: Store 
> someStore-201701060400 is currently closed
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:205)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:221)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:74)
>   at 
> org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54)
>   at 
> org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
>   ... X more
> {quote}
> The error occurs for many partitions.
> This was preceded by (for this partition):
> {quote}
> INFO  2017-03-19 18:03:16,403 [StreamThread-10][ConsumerCoordinator.java:393] 
> : Revoking previously assigned partitions [some_topic-225] for group some_job
> INFO  2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:254] : 
> stream-thread [StreamThread-10] partitions [[some_topic-225]] revoked at the 
> beginning of consumer rebalance.
> INFO  2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:1056] : 
> stream-thread [StreamThread-10] Closing a task's topology 1_225
> INFO  2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:544] : 
> stream-thread [StreamThread-10] Flushing state stores of task 1_225
> INFO  2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:534] : 
> stream-thread [StreamThread-10] Committing consumer offsets of task 1_225
> INFO  2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1012] : 
> stream-thread [StreamThread-10] Updating suspended tasks to contain active 
> tasks [[1_225, 0_445, 0_30]]
> INFO  2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1019] : 
> stream-thread [StreamThread-10] Removing all active tasks [[1_225, 0_445, 
> 0_30]]
> INFO  2017-03-19 18:03:19,925 [StreamThread-10][ConsumerCoordinator.java:252] 
> : Setting newly assigned partitions [some_tpoic-225] for group some_job
> INFO  2017-03-19 18:03:19,927 [StreamThread-10][StreamThread.java:228] : 
> stream-thread [StreamThread-10] New partitions [[some_topic-225]] assigned at 
> the end of consumer rebalance.
> INFO  2017-03-19 18:03:19,929 [StreamThread-10][StreamTask.java:333] : task 
> [1_225] Initializing processor nodes of the topology
> Something happens.  What ???
> INFO  2017-03-19 18:03:20,135 

[jira] [Updated] (KAFKA-4919) Streams job fails with InvalidStateStoreException: Store is currently closed

2017-03-19 Thread Elias Levy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy updated KAFKA-4919:
--
Summary: Streams job fails with InvalidStateStoreException: Store is 
currently closed  (was: Streams job fails with StreamsExceptio)

> Streams job fails with InvalidStateStoreException: Store is currently closed
> 
>
> Key: KAFKA-4919
> URL: https://issues.apache.org/jira/browse/KAFKA-4919
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Elias Levy
>
> I have a streams job, that previously worked, that consumes and writes to a 
> large number of topics with many partitions and that uses many threads.  I 
> upgraded the job to 0.10.2.0.  The job now fails after a short time running, 
> seemingly after a rebalance.
> {quote}
> WARN  2017-03-19 18:03:20,432 [StreamThread-10][StreamThread.java:160] : 
> Unexpected state transition from RUNNING to NOT_RUNNING
> {quote}
> The first observation is that Streams is no longer outputting exceptions and 
> backtraces.  I had to add code to get this information.
> The exception:
> {quote}
> Exception: org.apache.kafka.streams.errors.StreamsException: Exception caught 
> in process. taskId=1_225, processor=KSTREAM-SOURCE-03, 
> topic=some_topic, partition=225, offset=266411
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_225, processor=KSTREAM-SOURCE-03, topic=some_topic, 
> partition=225, offset=266411
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: Store 
> someStore-201701060400 is currently closed
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:205)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:221)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:74)
>   at 
> org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54)
>   at 
> org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
>   ... X more
> {quote}
> The error occurs for many partitions.
> This was preceded by (for this partition):
> {quote}
> INFO  2017-03-19 18:03:16,403 [StreamThread-10][ConsumerCoordinator.java:393] 
> : Revoking previously assigned partitions [some_topic-225] for group some_job
> INFO  2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:254] : 
> stream-thread [StreamThread-10] partitions [[some_topic-225]] revoked at the 
> beginning of consumer rebalance.
> INFO  2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:1056] : 
> stream-thread [StreamThread-10] Closing a task's topology 1_225
> INFO  2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:544] : 
> stream-thread [StreamThread-10] Flushing state stores of task 1_225
> INFO  2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:534] : 
> stream-thread [StreamThread-10] Committing consumer offsets of task 1_225
> INFO  2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1012] : 
> stream-thread [StreamThread-10] Updating suspended tasks to contain active 
> tasks [[1_225, 0_445, 0_30]]
> INFO  2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1019] : 
> stream-thread [StreamThread-10] Removing all active tasks [[1_225, 0_445, 
> 0_30]]
> INFO  2017-03-19 18:03:19,925 [StreamThread-10][ConsumerCoordinator.java:252] 
> : Setting newly assigned partitions [some_tpoic-225] for group some_job
> INFO  2017-03-19 18:03:19,927 [StreamThread-10][StreamThread.java:228] : 
> stream-thread [StreamThread-10] New partitions [[some_topic-225]] assigned at 
> the end of consumer rebalance.
> INFO  2017-03-19 18:03:19,929 [StreamThread-10][StreamTask.java:333] : task 
> [1_225] Initializing processor nodes of the topology
> Something happens.  What ???
> INFO  2017-03-19 18:03:20,135 [StreamThread-10][StreamThread.java:1045] : 
> stream-thread [StreamThread-10] Closing a task 1_225
> INFO  2017-03-19 18:03:20,355 [StreamThread-10][StreamThread.java:544] : 
> stream-thread [StreamThread-10] Flushing state stores of task 1_225
> INFO  2017-03-19 18:03:20,355 [StreamThread-10][StreamThread.java:523] : 
> stream-thread [StreamThread-10] 

[jira] [Created] (KAFKA-4919) Streams job fails with StreamsExceptio

2017-03-19 Thread Elias Levy (JIRA)
Elias Levy created KAFKA-4919:
-

 Summary: Streams job fails with StreamsExceptio
 Key: KAFKA-4919
 URL: https://issues.apache.org/jira/browse/KAFKA-4919
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Elias Levy


I have a streams job, that previously worked, that consumes and writes to a 
large number of topics with many partitions and that uses many threads.  I 
upgraded the job to 0.10.2.0.  The job now fails after a short time running, 
seemingly after a rebalance.

{quote}
WARN  2017-03-19 18:03:20,432 [StreamThread-10][StreamThread.java:160] : 
Unexpected state transition from RUNNING to NOT_RUNNING
{quote}

The first observation is that Streams is no longer outputting exceptions and 
backtraces.  I had to add code to get this information.

The exception:

{quote}
Exception: org.apache.kafka.streams.errors.StreamsException: Exception caught 
in process. taskId=1_225, processor=KSTREAM-SOURCE-03, 
topic=some_topic, partition=225, offset=266411
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=1_225, processor=KSTREAM-SOURCE-03, topic=some_topic, 
partition=225, offset=266411
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: Store 
someStore-201701060400 is currently closed
at 
org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:205)
at 
org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:221)
at 
org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:74)
at 
org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54)
at 
org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101)
at 
org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
... X more
{quote}

The error occurs for many partitions.

This was preceded by (for this partition):

{quote}
INFO  2017-03-19 18:03:16,403 [StreamThread-10][ConsumerCoordinator.java:393] : 
Revoking previously assigned partitions [some_topic-225] for group some_job
INFO  2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:254] : 
stream-thread [StreamThread-10] partitions [[some_topic-225]] revoked at the 
beginning of consumer rebalance.
INFO  2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:1056] : 
stream-thread [StreamThread-10] Closing a task's topology 1_225
INFO  2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:544] : 
stream-thread [StreamThread-10] Flushing state stores of task 1_225
INFO  2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:534] : 
stream-thread [StreamThread-10] Committing consumer offsets of task 1_225
INFO  2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1012] : 
stream-thread [StreamThread-10] Updating suspended tasks to contain active 
tasks [[1_225, 0_445, 0_30]]
INFO  2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1019] : 
stream-thread [StreamThread-10] Removing all active tasks [[1_225, 0_445, 0_30]]

INFO  2017-03-19 18:03:19,925 [StreamThread-10][ConsumerCoordinator.java:252] : 
Setting newly assigned partitions [some_tpoic-225] for group some_job
INFO  2017-03-19 18:03:19,927 [StreamThread-10][StreamThread.java:228] : 
stream-thread [StreamThread-10] New partitions [[some_topic-225]] assigned at 
the end of consumer rebalance.
INFO  2017-03-19 18:03:19,929 [StreamThread-10][StreamTask.java:333] : task 
[1_225] Initializing processor nodes of the topology

Something happens.  What ???

INFO  2017-03-19 18:03:20,135 [StreamThread-10][StreamThread.java:1045] : 
stream-thread [StreamThread-10] Closing a task 1_225
INFO  2017-03-19 18:03:20,355 [StreamThread-10][StreamThread.java:544] : 
stream-thread [StreamThread-10] Flushing state stores of task 1_225
INFO  2017-03-19 18:03:20,355 [StreamThread-10][StreamThread.java:523] : 
stream-thread [StreamThread-10] Closing the state manager of task 1_225
INFO  2017-03-19 18:03:20,432 [StreamThread-10][StreamThread.java:1019] : 
stream-thread [StreamThread-10] Removing all active tasks [[1_225, 0_445, 0_30]]
INFO  2017-03-19 18:03:20,432 [StreamThread-10][StreamThread.java:1034] : 
stream-thread [StreamThread-10] Removing all standby tasks [[]]
INFO  2017-03-19 18:03:20,432 [StreamThread-10][StreamThread.java:427] : 
stream-thread [StreamThread-10] Stream thread shutdown complete
{quote}



--

[jira] [Commented] (KAFKA-4887) Enabling caching on a persistent window store breaks support for duplicate insertion

2017-03-13 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922826#comment-15922826
 ] 

Elias Levy commented on KAFKA-4887:
---

It is a bit complicated.  I am abusing the {{RocksDBWnidowStore}} as a TTL 
cache (something I've covered in other JIRA tickets).  In this particular case 
the store is used to perform a join between two streams where in theory one of 
the streams is keyed by two values, while the other stream is keyed only by a 
single of those values.  In practice the streams are only keyed by their shared 
value and when items from the stream that in theory is keyed by two values, as 
they are inserted with a single key, there may be collisions.  To get around 
that I turn on the feature to allow duplicates and the join is performed by 
iterating over duplicates and matching the second key value.


> Enabling caching on a persistent window store breaks support for duplicate 
> insertion
> 
>
> Key: KAFKA-4887
> URL: https://issues.apache.org/jira/browse/KAFKA-4887
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Elias Levy
>
> {{CachingWindowStore}} and {{RocksDBWindowStore}} interact badly when 
> duplicate insertion support is enabled by passing {{true}} as the fourth 
> argument to {{windowed}} in the state store supplier.
> When the feature is enabled, {{RocksDBWindowStore}} correct handles 
> duplicates by assigning a unique sequence number to each element on insertion 
> and using the number within the key.
> When caching is enabled by calling {{enableCaching}} on the supplier, 
> {{CachingWindowStore}} fails to the the same.  Thus, of multiple values 
> inserted with the same key, only the last one survives.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4887) Enabling caching on a persistent window store breaks support for duplicate insertion

2017-03-11 Thread Elias Levy (JIRA)
Elias Levy created KAFKA-4887:
-

 Summary: Enabling caching on a persistent window store breaks 
support for duplicate insertion
 Key: KAFKA-4887
 URL: https://issues.apache.org/jira/browse/KAFKA-4887
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Elias Levy


{{CachingWindowStore}} and {{RocksDBWindowStore}} interact badly when duplicate 
insertion support is enabled by passing {{true}} as the fourth argument to 
{{windowed}} in the state store supplier.

When the feature is enabled, {{RocksDBWindowStore}} correct handles duplicates 
by assigning a unique sequence number to each element on insertion and using 
the number within the key.

When caching is enabled by calling {{enableCaching}} on the supplier, 
{{CachingWindowStore}} fails to the the same.  Thus, of multiple values 
inserted with the same key, only the last one survives.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2017-02-15 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15868971#comment-15868971
 ] 

Elias Levy commented on KAFKA-2729:
---

Hit this again during testing with 0.10.0.1 on a 10 node broker cluster with a 
3 node ZK ensemble.  This should have priority Blocker instead of Major.

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
>Reporter: Danil Serdyuchenko
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4748) Need a way to shutdown all workers in a Streams application at the same time

2017-02-08 Thread Elias Levy (JIRA)
Elias Levy created KAFKA-4748:
-

 Summary: Need a way to shutdown all workers in a Streams 
application at the same time
 Key: KAFKA-4748
 URL: https://issues.apache.org/jira/browse/KAFKA-4748
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.1
Reporter: Elias Levy


If you have a fleet of Stream workers for an application and attempt to shut 
them down simultaneously (e.g. via SIGTERM and 
Runtime.getRuntime().addShutdownHook() and streams.close())), a large number of 
the workers fail to shutdown.

The problem appears to be a race condition between the shutdown signal and the 
consumer rebalancing that is triggered by some of the workers existing before 
others.  Apparently, workers that receive the signal later fail to exit 
apparently as they are caught in the rebalance.

Terminating workers in a rolling fashion is not advisable in some situations.  
The rolling shutdown will result in many unnecessary rebalances and may fail, 
as the application may have large amount of local state that a smaller number 
of nodes may not be able to store.

It would appear that there is a need for a protocol change to allow the 
coordinator to signal a consumer group to shutdown without leading to 
rebalancing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-01-24 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836739#comment-15836739
 ] 

Elias Levy commented on KAFKA-4144:
---

What Matthias said.  The issue has nothing to do with different applications.  
The problem is that the single configurable TimestampExtractor is used for all 
input topics.  That means you must dump the timestamps extraction logic for all 
topics, which may be quite different, into a single TimestampExtractor.

Imagine if you could only configure a single SerDe that would have to handle 
all topics, instead of being able to configure a SerDe per topic.

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api
>
> At the moment the timestamp extractor is configured via a StreamConfig value 
> to KafkaStreams.  That means you can only have a single timestamp extractor 
> per app, even though you may be joining multiple streams/tables that require 
> different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> KStreamBuilder.stream/table, just like you can specify key and value serdes 
> that override the StreamConfig defaults.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-01-22 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833913#comment-15833913
 ] 

Elias Levy commented on KAFKA-4144:
---

Jeyhun, sorry if I was not clear.  My comment about not being able to configure 
{{TimestampExtractor}} is only applicable to the current implementation, as it 
is instantiated by {{StreamTask}} and the current interface of 
{{TimestampExtractor}} provides not access to configuration data.  

So right now if you are consuming multiple distinct topics as different streams 
and/or tables, you have to create a case statement in your 
{{TimestampExtractor}} to handle all the topics with a single extractor, as 
that is all that you can configure right now via the {{timestamp.extractor}} 
configuration property.  And because {{TimestampExtractor}} is instantiated by 
{{StreamTask}} and the extractor does not have access to configuration data, 
you must hardcode the topic names in the extractor.  That means you can't 
change input topic names dynamically.  You can to recompile to change them.

Obviously these observation is are longer applicable if the application can 
instantiate it's own {{TimestampExtractor}}s and pass them as an argument to 
{{TopologyBuilder.addSource}}, {{KStreamBuilder.source}}, and 
{{KStreamBuilder.table}}.

As for Matthias' comments, I agree with #1 and #3.  Not quite sure what he 
means by #2.  Surely I could create a {{TimestampExtractor}} passing the 
constructor a topic name read from a config file while defining the topology, 
just like I can create SerDes that support the Confluent schema registry, where 
the registry endpoint is read from the config and use those while defining the 
topology.

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api
>
> At the moment the timestamp extractor is configured via a StreamConfig value 
> to KafkaStreams.  That means you can only have a single timestamp extractor 
> per app, even though you may be joining multiple streams/tables that require 
> different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> KStreamBuilder.stream/table, just like you can specify key and value serdes 
> that override the StreamConfig defaults.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-01-22 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833649#comment-15833649
 ] 

Elias Levy commented on KAFKA-4144:
---

That's fine.  I am less concerned about the implementation details than about 
the developer's API experience.  E.g. having {{TopologyBuilder.addSource}}, 
{{KStreamBuilder.source}}, and {{KStreamBuilder.table}} take a 
{{TimestampExtractor}}.

There is another problem with {{TimestampExtractor}}.  It is instantiated 
within StreamTask and the instances cannot be configured.  That means that the 
topics that {{TimestampExtractor}} supports are hardcoded into it. That makes 
it impossible to have a Stream application that uses {{TimestampExtractor}} 
were the input topics can be configured at execution time.

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api
>
> At the moment the timestamp extractor is configured via a StreamConfig value 
> to KafkaStreams.  That means you can only have a single timestamp extractor 
> per app, even though you may be joining multiple streams/tables that require 
> different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> KStreamBuilder.stream/table, just like you can specify key and value serdes 
> that override the StreamConfig defaults.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4683) Mismatch between Stream windowed store and broker log retention logic

2017-01-20 Thread Elias Levy (JIRA)
Elias Levy created KAFKA-4683:
-

 Summary: Mismatch between Stream windowed store and broker log 
retention logic
 Key: KAFKA-4683
 URL: https://issues.apache.org/jira/browse/KAFKA-4683
 Project: Kafka
  Issue Type: Bug
  Components: log, streams
Affects Versions: 0.10.1.1
Reporter: Elias Levy


The RocksDBWindowStore keeps key-value entries for a configurable retention 
period.  The leading edge of the time period kept is determined the newest 
timestamp of an inserted KV.  The trailing edge is this leading edge minus the 
requested retention period.

If logging is enabled, changes to the store are written to a change log topic 
that is configured with a retention.ms value equal to the store retention 
period.  The leading edge of the time period kept by the log is the current 
time.  The trailing edge is the leading edge minus the requested retention 
period.

The difference on how the leading edge is determined can result in unexpected 
behavior.

If the stream application is processing data older than the retention period 
and storing it in a windowed store, the store will have data for the retention 
period looking back from the newest timestamp of the processed message.  But 
the messages written to the state changeling will almost immediately be deleted 
by the broker, as they will fall outside of the retention window as it computes 
it.  

If the application is stopped and restarted in this state, and if the local 
state has been lost of some reason, the application won't be able to recover 
the sate from the broker, as it broker has deleted it.


In addition, I've noticed that there is a discrepancy on what timestamp is used 
between the store and the change log.  The store will use the timestamp passed 
as an argument to {{put}}, or if no timestamp is passed, fallback to 
{{context.timestamp}}.  But {{StoreChangeLogger.logChange}} does not take a 
timestamp.  Instead is always uses {{context.timestamp}} to write the change to 
the broker.  Thus it is possible that the state store and the change log to use 
different timestamps for the same KV.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4639) Kafka Streams metrics are undocumented

2017-01-16 Thread Elias Levy (JIRA)
Elias Levy created KAFKA-4639:
-

 Summary: Kafka Streams metrics are undocumented
 Key: KAFKA-4639
 URL: https://issues.apache.org/jira/browse/KAFKA-4639
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.1
Reporter: Elias Levy
Priority: Minor


The documentation is silent on the metrics collected by the Streams framework.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4608) RocksDBWindowStore.fetch() is inefficient for large ranges

2017-01-08 Thread Elias Levy (JIRA)
Elias Levy created KAFKA-4608:
-

 Summary: RocksDBWindowStore.fetch() is inefficient for large ranges
 Key: KAFKA-4608
 URL: https://issues.apache.org/jira/browse/KAFKA-4608
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.1.1
Reporter: Elias Levy


It is not unreasonable for a user to call {{RocksDBWindowStore.fetch}} to scan 
for a key across a large time range.  For instance, someone may call it with a 
{{timeFrom}} of zero or a {{timeTo}} of max long in an attempt to fetch keys 
matching across all time forwards or backwards.  

But if you do so, {{fetch}} will peg the CPU, as it attempts to iterate over 
every single segment id in the range. That is obviously very inefficient.  

{{fetch}} should trim the {{timeFrom}}/{{timeTo}} range based on the available 
time range in the {{segments}} hash map, so that it only iterates over the 
available time range.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4217) KStream.transform equivalent of flatMap

2016-10-07 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15556371#comment-15556371
 ] 

Elias Levy commented on KAFKA-4217:
---

That would work as well.

> KStream.transform equivalent of flatMap
> ---
>
> Key: KAFKA-4217
> URL: https://issues.apache.org/jira/browse/KAFKA-4217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> {{KStream.transform}} gives you access to state stores while allowing you to 
> return zero or one transformed {{KeyValue}}.  Alas, it is unclear what method 
> you should use if you want to access state stores and return zero or multiple 
> {{KeyValue}}.  Presumably you can use {{transform}}, always return {{null}}, 
> and use {{ProcessorContext.forward}} to emit {{KeyValues}}.
> It may be good to introduce a {{transform}}-like {{flatMap}} equivalent, or 
> allow store access from other {{KStream}} methods, such as {{flatMap}} itself.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4217) KStream.transform equivalent of flatMap

2016-10-07 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1654#comment-1654
 ] 

Elias Levy commented on KAFKA-4217:
---

It would seem to be the same request: allow a transform that emits multiple 
values.

Like Fodor, I am using {{ProcessorContext.forward}} to emit multiple values 
from {{Transformer.transform}}. Unlike him, I return {{null}} from the method 
instead of returning dummy values that must be filtered.

At the very least, it should be documented that one can use 
{{ProcessorContext.forward}} to emit multiple values from 
{{Transformer.transform}}.  Ideally, {{Transformer.transform}} would be 
modified to allow returning multiple values, or a variant of {{Transformer}} 
would allow you to do so.

> KStream.transform equivalent of flatMap
> ---
>
> Key: KAFKA-4217
> URL: https://issues.apache.org/jira/browse/KAFKA-4217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> {{KStream.transform}} gives you access to state stores while allowing you to 
> return zero or one transformed {{KeyValue}}.  Alas, it is unclear what method 
> you should use if you want to access state stores and return zero or multiple 
> {{KeyValue}}.  Presumably you can use {{transform}}, always return {{null}}, 
> and use {{ProcessorContext.forward}} to emit {{KeyValues}}.
> It may be good to introduce a {{transform}}-like {{flatMap}} equivalent, or 
> allow store access from other {{KStream}} methods, such as {{flatMap}} itself.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2016-09-28 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15531353#comment-15531353
 ] 

Elias Levy commented on KAFKA-4212:
---

I am using the {{KStream.transform}} API, the {{Transform}} interface, and 
making use of state stores that I create.  So very close to using the low level 
{{Processor}} API.  But even so, the logic is essentially the same of the high 
level join, using a sliding window, except that one of the streams has a 
composite key.

I don't see what hopping windows have to do with the use case.

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2016-09-28 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15531116#comment-15531116
 ] 

Elias Levy commented on KAFKA-4212:
---

But joins are not performed on hopping windows, they are performed on a single 
sliding window.  


> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2016-09-28 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15531022#comment-15531022
 ] 

Elias Levy commented on KAFKA-4212:
---

Not sure I follow.  

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2016-09-28 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15530995#comment-15530995
 ] 

Elias Levy commented on KAFKA-4212:
---

I would described it as a TTL, not LRU.  We want the records to expire based on 
the record timestamp, not when the record was last accessed.

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2016-09-27 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527841#comment-15527841
 ] 

Elias Levy commented on KAFKA-4212:
---

Looks like using {{WindowStore}} is not quite so simple, as you can't actually 
iterate it in reverse order.  It can be still be used, but you must iterate 
forward until the last status update and last client status query for each 
client.  All the more reason for a TTL enabled {{KeyValueStore}}.

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4144) Allow per stream/table timestamp extractor

2016-09-27 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527301#comment-15527301
 ] 

Elias Levy commented on KAFKA-4144:
---

Indeed, that is how it is handled today.  But it's not a very clean way to do 
so.

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> At the moment the timestamp extractor is configured via a StreamConfig value 
> to KafkaStreams.  That means you can only have a single timestamp extractor 
> per app, even though you may be joining multiple streams/tables that require 
> different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> KStreamBuilder.stream/table, just like you can specify key and value serdes 
> that override the StreamConfig defaults.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2016-09-26 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524513#comment-15524513
 ] 

Elias Levy commented on KAFKA-4212:
---

More generally, the use case is: I've told a bunch of folks about some property 
of some type of object, and I would like to notify those folks every time that 
property changes for for the specific objects they have asked about during some 
configurable time period since the last time they've asked me about the object.

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2016-09-26 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524402#comment-15524402
 ] 

Elias Levy commented on KAFKA-4212:
---

The general use case is the joining of updates to two tables over a limited 
period of time.

Consider a hypothetical monitoring service that that allows clients to query 
the status of nodes.  The application may wish to inform the clients whenever 
the status of a node that they have queried changes, but only if the client has 
queried the status during the past 24 hours and if the last status for a node 
is different from the last status the client received.

To do so the service can consume a stream of client node status queries with 
their responses and node status updates.  From the stream of client node status 
queries the service would maintain a cache of the last status for a node sent 
to to a client such that entries expire after 24 hours.  From the node status 
updates the service would maintain a mapping of node to latest status.

When a client query is received, the service can check on the node status 
mapping to see if there is a newer status, and if there is, generate a 
notification.  When a node status update is received, the service can check the 
last status sent to clients in the cache and generate a notification with the 
new status to all clients that previously queried for a node's status.

As an optimization the mapping of nodes to latest status can also be a cache 
with a TTL, since you don't need to keep the statuses of a nodes that haven't 
changed in more than 24 hours, as you'll never receive a delayed node status 
query to match it against.

Abstractly this is equivalent to a {{KTable}}-{{KTable}} inner join where 
entries in each {{KTable}} expire after some TTL, and where one table has a 
composite primary key (node id and client id on one {{KTable}} vs just node it 
on the other).

It could also be though as a windowed {{KTable}} - {{KTable}} join (although in 
such case records that fall outside the window would never be used and are just 
wasting space), or a windowed {{KStream}}-{{KStream}} join of table updates 
where only the latest updated values are used (i.e. discard updates in the 
window if there is a newer update).  Although, again, these would be joins 
where the primary keys are not identical as one is a composite.

Alas, Kafka Streams does not support windowed {{KTable}}-{{KTable}} joins, 
TTL'ed {{KeyValueStore}} s, or joins across {{KTable}} s and/or {{KStream}} s 
with different keys.

That said, the above service can be implemented by joining the client status 
query and client status updates streams using custom processors and by abusing 
{{WindowStore}}.  {{WindowStore}} can be used as a form of TTL'ed 
{{KeyValueStore}}, as it will drop old values that fall out of its window, and 
by iterating in reverse order and only using the latest value. And since it 
allows you to store multiple values for the same key (node id), you can record 
the node status you handed out to clients (node id key; client id, status, and 
timestamp as value) and then iterate over all of them for a given node id 
keeping only the latest one for each client id when a node status update comes 
in an you perform the join.



> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2016-09-25 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15521287#comment-15521287
 ] 

Elias Levy edited comment on KAFKA-4212 at 9/25/16 7:17 PM:


It should be noted that a variable-capacity memory-overflowing TTL caching 
store is semantically equivalent to a KTable that expires entries via a TTL.  
Such a KTable may be a viable alternative or at least a useful additional 
abstraction.


was (Author: elevy):
I should be noted that a variable-capacity memory-overflowing TTL caching store 
is semantically equivalent to a KTable that expires entries via a TTL.  Such a 
KTable may be a viable alternative or at least a useful additional abstraction.

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2016-09-25 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15521287#comment-15521287
 ] 

Elias Levy commented on KAFKA-4212:
---

I should be noted that a variable-capacity memory-overflowing TTL caching store 
is semantically equivalent to a KTable that expires entries via a TTL.  Such a 
KTable may be a viable alternative or at least a useful additional abstraction.

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4219) Permit setting of event time in stream processor

2016-09-25 Thread Elias Levy (JIRA)
Elias Levy created KAFKA-4219:
-

 Summary: Permit setting of event time in stream processor
 Key: KAFKA-4219
 URL: https://issues.apache.org/jira/browse/KAFKA-4219
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.0.1
Reporter: Elias Levy
Assignee: Guozhang Wang


Event time is assigned in stream sources via {{TimestampExtractor}}.  Once the 
event time has been assigned, it remains the same, regardless of any downstream 
processing in the topology.  This is insufficient for many processing jobs, 
particularly when the output of the job is written back into a Kafka topic, 
where the record's time is encoded outside of the record's value.

For instance:

* When performing windowed aggregations it may be desirable for the timestamp 
of the emitted record to be lower or higher limits of the time window, rather 
than the timestamp of the last processed element, which may be anywhere within 
the time window.

* When joining two streams, it is non-deterministic which of the two record's 
timestamps will be the timestamp of the emitted record.  It would be either one 
depending on what order the records are processed.  Even where this 
deterministic, it may be desirable for the emitted timestamp to be altogether 
different from the timestamp of the joined records.  For instance, setting the 
timestamp to the current processing time may be desirable.

* In general, lower level processors may wish to set the timestamp of emitted 
records to an arbitrary value.
 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4120) byte[] keys in RocksDB state stores do not work as expected

2016-09-25 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15521271#comment-15521271
 ] 

Elias Levy commented on KAFKA-4120:
---

{{o.a.k.common.utils.Bytes}} is not a documented class.

> byte[] keys in RocksDB state stores do not work as expected
> ---
>
> Key: KAFKA-4120
> URL: https://issues.apache.org/jira/browse/KAFKA-4120
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> We ran into an issue using a byte[] key in a RocksDB state store (with the 
> byte array serde.) Internally, the RocksDB store keeps a LRUCache that is 
> backed by a LinkedHashMap that sits between the callers and the actual db. 
> The problem is that while the underlying rocks db will persist byte arrays 
> with equal data as equivalent keys, the LinkedHashMap uses byte[] reference 
> equality from Object.equals/hashcode. So, this can result in multiple entries 
> in the cache for two different byte arrays that have the same contents and 
> are backed by the same key in the db, resulting in unexpected behavior. 
> One such behavior that manifests from this is if you store a value in the 
> state store with a specific key, if you re-read that key with the same byte 
> array you will get the new value, but if you re-read that key with a 
> different byte array with the same bytes, you will get a stale value until 
> the db is flushed. (This made it particularly tricky to track down what was 
> happening :))
> The workaround for us is to convert the keys from raw byte arrays to a 
> deserialized avro structure that provides proper hashcode/equals semantics 
> for the intermediate cache. In general this seems like good practice, so one 
> of the proposed solutions is to simply emit a warning or exception if a key 
> type with breaking semantics like this is provided.
> A few proposed solutions:
> - When the state store is defined on array keys, ensure that the cache map 
> does proper comparisons on array values not array references. This would fix 
> this problem, but seems a bit strange to special case. However, I have a hard 
> time of thinking of other examples where this behavior would burn users.
> - Change the LRU cache to deserialize and serialize all keys to bytes and use 
> a value based comparison for the map. This would be the most correct, as it 
> would ensure that both the rocks db and the cache have identical key spaces 
> and equality/hashing semantics. However, this is probably slow, and since the 
> general case of using avro record types as keys works fine, it will largely 
> be unnecessary overhead.
> - Don't change anything about the behavior, but trigger a warning in the log 
> or fail to start if a state store is defined on array keys (or possibly any 
> key type that fails to properly override Object.equals/hashcode.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4218) Enable access to key in {{ValueTransformer}}

2016-09-24 Thread Elias Levy (JIRA)
Elias Levy created KAFKA-4218:
-

 Summary: Enable access to key in {{ValueTransformer}}
 Key: KAFKA-4218
 URL: https://issues.apache.org/jira/browse/KAFKA-4218
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.0.1
Reporter: Elias Levy
Assignee: Guozhang Wang


While transforming values via {{KStream.transformValues}} and 
{{ValueTransformer}}, the key associated with the value may be needed, even if 
it is not changed.  For instance, it may be used to access stores.  

As of now, the key is not available within these methods and interfaces, 
leading to the use of {{KStream.transform}} and {{Transformer}}, and the 
unnecessary creation of new {{KeyValue}} objects.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4217) KStream.transform equivalent of flatMap

2016-09-24 Thread Elias Levy (JIRA)
Elias Levy created KAFKA-4217:
-

 Summary: KStream.transform equivalent of flatMap
 Key: KAFKA-4217
 URL: https://issues.apache.org/jira/browse/KAFKA-4217
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.0.1
Reporter: Elias Levy
Assignee: Guozhang Wang


{{KStream.transform}} gives you access to state stores while allowing you to 
return zero or one transformed {{KeyValue}}s.  Alas, it is unclear what method 
you should use if you want to access state stores and return zero or multiple 
{{KeyValue}}s.  Presumably you can use {{transform}}, always return {{null}}, 
and use {{ProcessorContext.forward}} to emit {{KeyValues}}s.

It may be good to introduce a {{transform}}-like {{flatMap}} equivalent, or 
allow store access from other {{KStream}} methods, such as {{flatMap}} itself.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2016-09-23 Thread Elias Levy (JIRA)
Elias Levy created KAFKA-4212:
-

 Summary: Add a key-value store that is a TTL persistent cache
 Key: KAFKA-4212
 URL: https://issues.apache.org/jira/browse/KAFKA-4212
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.0.1
Reporter: Elias Levy
Assignee: Guozhang Wang


Some jobs needs to maintain as state a large set of key-values for some period 
of time.  I.e. they need to maintain a TTL cache of values potentially larger 
than memory. 

Currently Kafka Streams provides non-windowed and windowed key-value stores.  
Neither is an exact fit to this use case.  

The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
required, but does not support expiration.  The TTL option of RocksDB is 
explicitly not used.

The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
dropping, but it stores multiple items per key, based on their timestamp.  But 
this store can be repurposed as a cache by fetching the items in reverse 
chronological order and returning the first item found.

KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here we 
desire a variable-capacity memory-overflowing TTL caching store.

Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window

2016-09-14 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15491553#comment-15491553
 ] 

Elias Levy commented on KAFKA-4153:
---

I've updated the PR to reverse the before & after semantics as you've pointed 
out.

> Incorrect KStream-KStream join behavior with asymmetric time window
> ---
>
> Key: KAFKA-4153
> URL: https://issues.apache.org/jira/browse/KAFKA-4153
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Using Kafka 0.10.0.1, if joining records in two streams separated by some 
> time, but only when records from one stream are newer than records from the 
> other, i.e. doing:
> {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}}
> One would expect that the following would be equivalent:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}}
> Alas, that this is not the case.  Instead, this generates the same output as 
> the first example:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}}
> The problem is that the 
> [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697]
>  implementation in {{KStreamImpl}} fails to reverse the {{before}} and 
> {{after}} values when creates the {{KStreamKStreamJoin}} for the other 
> stream, even though is calls {{reverseJoiner}} to reverse the joiner.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window

2016-09-12 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485826#comment-15485826
 ] 

Elias Levy commented on KAFKA-4153:
---

Why would the other value be undefined? Default it to zero.

> Incorrect KStream-KStream join behavior with asymmetric time window
> ---
>
> Key: KAFKA-4153
> URL: https://issues.apache.org/jira/browse/KAFKA-4153
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Using Kafka 0.10.0.1, if joining records in two streams separated by some 
> time, but only when records from one stream are newer than records from the 
> other, i.e. doing:
> {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}}
> One would expect that the following would be equivalent:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}}
> Alas, that this is not the case.  Instead, this generates the same output as 
> the first example:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}}
> The problem is that the 
> [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697]
>  implementation in {{KStreamImpl}} fails to reverse the {{before}} and 
> {{after}} values when creates the {{KStreamKStreamJoin}} for the other 
> stream, even though is calls {{reverseJoiner}} to reverse the joiner.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window

2016-09-12 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485700#comment-15485700
 ] 

Elias Levy commented on KAFKA-4153:
---

You could do that, but it would be non-obvious to someone reading the code what 
the semantics of that window should be.

I would prefer seeing {{before}} and {{after}} have static alternatives, so you 
could write {{JoinWindows.before(100).after(50)}} or similar.

> Incorrect KStream-KStream join behavior with asymmetric time window
> ---
>
> Key: KAFKA-4153
> URL: https://issues.apache.org/jira/browse/KAFKA-4153
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Using Kafka 0.10.0.1, if joining records in two streams separated by some 
> time, but only when records from one stream are newer than records from the 
> other, i.e. doing:
> {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}}
> One would expect that the following would be equivalent:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}}
> Alas, that this is not the case.  Instead, this generates the same output as 
> the first example:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}}
> The problem is that the 
> [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697]
>  implementation in {{KStreamImpl}} fails to reverse the {{before}} and 
> {{after}} values when creates the {{KStreamKStreamJoin}} for the other 
> stream, even though is calls {{reverseJoiner}} to reverse the joiner.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window

2016-09-12 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485561#comment-15485561
 ] 

Elias Levy commented on KAFKA-4153:
---

As a side note, the API for {{JoinWindows}} has become mangled since 0.10.0.1.  
Now if you want to instantiate an asymetric {{JoinWindows}} you must first call 
{{JoinWindows.of(0)}} before you can call {{before}} or {{after}} to set to 
actual window boundaries.  The class should provide alternative static method 
for both {{before}} and {{after}} to instantiate the class without having to 
first call {{of}}.

> Incorrect KStream-KStream join behavior with asymmetric time window
> ---
>
> Key: KAFKA-4153
> URL: https://issues.apache.org/jira/browse/KAFKA-4153
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Using Kafka 0.10.0.1, if joining records in two streams separated by some 
> time, but only when records from one stream are newer than records from the 
> other, i.e. doing:
> {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}}
> One would expect that the following would be equivalent:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}}
> Alas, that this is not the case.  Instead, this generates the same output as 
> the first example:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}}
> The problem is that the 
> [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697]
>  implementation in {{KStreamImpl}} fails to reverse the {{before}} and 
> {{after}} values when creates the {{KStreamKStreamJoin}} for the other 
> stream, even though is calls {{reverseJoiner}} to reverse the joiner.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window

2016-09-12 Thread Elias Levy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy updated KAFKA-4153:
--
Flags: Patch

> Incorrect KStream-KStream join behavior with asymmetric time window
> ---
>
> Key: KAFKA-4153
> URL: https://issues.apache.org/jira/browse/KAFKA-4153
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> Using Kafka 0.10.0.1, if joining records in two streams separated by some 
> time, but only when records from one stream are newer than records from the 
> other, i.e. doing:
> {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}}
> One would expect that the following would be equivalent:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}}
> Alas, that this is not the case.  Instead, this generates the same output as 
> the first example:
> {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}}
> The problem is that the 
> [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697]
>  implementation in {{KStreamImpl}} fails to reverse the {{before}} and 
> {{after}} values when creates the {{KStreamKStreamJoin}} for the other 
> stream, even though is calls {{reverseJoiner}} to reverse the joiner.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window

2016-09-12 Thread Elias Levy (JIRA)
Elias Levy created KAFKA-4153:
-

 Summary: Incorrect KStream-KStream join behavior with asymmetric 
time window
 Key: KAFKA-4153
 URL: https://issues.apache.org/jira/browse/KAFKA-4153
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.0.1
Reporter: Elias Levy
Assignee: Guozhang Wang


Using Kafka 0.10.0.1, if joining records in two streams separated by some time, 
but only when records from one stream are newer than records from the other, 
i.e. doing:

{{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}}

One would expect that the following would be equivalent:

{{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}}

Alas, that this is not the case.  Instead, this generates the same output as 
the first example:

{{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}}

The problem is that the 
[{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697]
 implementation in {{KStreamImpl}} fails to reverse the {{before}} and 
{{after}} values when creates the {{KStreamKStreamJoin}} for the other stream, 
even though is calls {{reverseJoiner}} to reverse the joiner.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-4146) Kafka Stream ignores Serde exceptions leading to silently broken apps

2016-09-09 Thread Elias Levy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy resolved KAFKA-4146.
---
Resolution: Invalid

> Kafka Stream ignores Serde exceptions leading to silently broken apps
> -
>
> Key: KAFKA-4146
> URL: https://issues.apache.org/jira/browse/KAFKA-4146
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> It appears that Kafka Streams silently ignores Serde exceptions, leading to 
> app that are silently broken.
> E.g. if you make use of {{Stream.throough("topic")}} and the default Serde is 
> inappropriate for the type, the app will silently drop the data in the floor 
> without even the courtesy of printing a single error message.
> At the very least an initial error message should be generated, with the 
> option to generate messages for each such failure or a sampling of such 
> failure. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4146) Kafka Stream ignores Serde exceptions leading to silently broken apps

2016-09-09 Thread Elias Levy (JIRA)
Elias Levy created KAFKA-4146:
-

 Summary: Kafka Stream ignores Serde exceptions leading to silently 
broken apps
 Key: KAFKA-4146
 URL: https://issues.apache.org/jira/browse/KAFKA-4146
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.0.1
Reporter: Elias Levy
Assignee: Guozhang Wang


It appears that Kafka Streams silently ignores Serde exceptions, leading to app 
that are silently broken.

E.g. if you make use of {{Stream.throough("topic")}} and the default Serde is 
inappropriate for the type, the app will silently drop the data in the floor 
without even the courtesy of printing a single error message.

At the very least an initial error message should be generated, with the option 
to generate messages for each such failure or a sampling of such failure. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4144) Allow per stream/table timestamp extractor

2016-09-08 Thread Elias Levy (JIRA)
Elias Levy created KAFKA-4144:
-

 Summary: Allow per stream/table timestamp extractor
 Key: KAFKA-4144
 URL: https://issues.apache.org/jira/browse/KAFKA-4144
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.10.0.1
Reporter: Elias Levy
Assignee: Guozhang Wang


At the moment the timestamp extractor is configured via a StreamConfig value to 
KafkaStreams.  That means you can only have a single timestamp extractor per 
app, even though you may be joining multiple streams/tables that require 
different timestamp extraction methods.

You should be able to specify a timestamp extractor via 
KStreamBuilder.stream/table, just like you can specify key and value serdes 
that override the StreamConfig defaults.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3932) Consumer fails to consume in a round robin fashion

2016-07-06 Thread Elias Levy (JIRA)
Elias Levy created KAFKA-3932:
-

 Summary: Consumer fails to consume in a round robin fashion
 Key: KAFKA-3932
 URL: https://issues.apache.org/jira/browse/KAFKA-3932
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.10.0.0
Reporter: Elias Levy


The Java consumer fails consume messages in a round robin fashion.  This can 
lead to an unbalance consumption.

In our use case we have a set of consumer that can take a significant amount of 
time consuming messages off a topic.  For this reason, we are using the 
pause/poll/resume pattern to ensure the consumer session is not timeout.  The 
topic that is being consumed has been preloaded with message.  That means there 
is a significant message lag when the consumer is first started.  To limit how 
many messages are consumed at a time, the consumer has been configured with 
max.poll.records=1.

The first initial observation is that the client receive a large batch of 
messages for the first partition it decides to consume from and will consume 
all those messages before moving on, rather than returning a message from a 
different partition for each call to poll.

We solved this issue by configuring max.partition.fetch.bytes to be small 
enough that only a single message will be returned by the broker on each fetch, 
although this would not be feasible if message size were highly variable.

The behavior of the consumer after this change is to largely consume from a 
small number of partitions, usually just two, iterating between them, until it 
exhausts them, before moving to another partition.   This behavior is 
problematic if the messages have some rough time semantics and need to be 
process roughly time ordered across all partitions.

It would be useful if the consumer has a pluggable API that allowed custom 
logic to select which partition to consume from next, thus enabling the 
creation of a round robin partition consumer.










--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2359) New consumer - partitions auto assigned only on poll

2016-06-23 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346678#comment-15346678
 ] 

Elias Levy commented on KAFKA-2359:
---

I wonder if the calling pause() before calling poll() and seekToBeging() or 
seekToEnd() will solve your problem.  The pause should stop the fetching of 
records, while the poll() triggers the interaction with the coordinator to 
perform the partition assignment before calling seekToBegining and seekToEnd.

> New consumer - partitions auto assigned only on poll
> 
>
> Key: KAFKA-2359
> URL: https://issues.apache.org/jira/browse/KAFKA-2359
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Stevo Slavic
>Priority: Minor
>
> In the new consumer I encountered unexpected behavior. After constructing 
> {{KafkaConsumer}} instance with configured consumer rebalance callback 
> handler, and subscribing to a topic with "consumer.subscribe(topic)", 
> retrieving subscriptions would return empty set and callback handler would 
> not get called (no partitions ever assigned or revoked), no matter how long 
> instance was up.
> Then I found by inspecting {{KafkaConsumer}} code that partition assignment 
> will only be triggered on first {{poll}}, since {{pollOnce}} has:
> {noformat}
> // ensure we have partitions assigned if we expect to
> if (subscriptions.partitionsAutoAssigned())
> coordinator.ensurePartitionAssignment();
> {noformat}
> I'm proposing to fix this by including same {{ensurePartitionAssignment}} 
> fragment in {{KafkaConsumer.subscriptions}} accessor as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2016-02-04 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15133716#comment-15133716
 ] 

Elias Levy edited comment on KAFKA-2729 at 2/5/16 5:55 AM:
---

Had the same issue happen here while testing a 5 node Kafka cluster with a 3 
node ZK ensemble on Kubernetes on AWS.  After running for a while broker 2 
started showing the "Cached zkVersion [29] not equal to that in zookeeper, skip 
updating ISR" error message for al the partitions it leads.  For those 
partition it is the only in sync replica.  That has led to the Samza jobs I was 
running to stop.

I should note that I am running 0.9.0.0.


was (Author: elevy):
Had the same issue happen here while testing a 5 node Kafka cluster with a 3 
node ZK ensemble on Kubernetes on AWS.  After running for a while broker 2 
started showing the "Cached zkVersion [29] not equal to that in zookeeper, skip 
updating ISR" error message for al the partitions it leads.  For those 
partition it is the only in sync replica.  That has led to the Samza jobs I was 
running to stop.

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
>Reporter: Danil Serdyuchenko
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2016-02-04 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15133716#comment-15133716
 ] 

Elias Levy commented on KAFKA-2729:
---

Had the same issue happen here while testing a 5 node Kafka cluster with a 3 
node ZK ensemble on Kubernetes on AWS.  After running for a while broker 2 
started showing the "Cached zkVersion [29] not equal to that in zookeeper, skip 
updating ISR" error message for al the partitions it leads.  For those 
partition it is the only in sync replica.  That has led to the Samza jobs I was 
running to stop.

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
>Reporter: Danil Serdyuchenko
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)