[jira] [Issue Comment Deleted] (KAFKA-4392) Failed to lock the state directory due to an unexpected exception
[ https://issues.apache.org/jira/browse/KAFKA-4392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elias Levy updated KAFKA-4392: -- Comment: was deleted (was: I am still seeing this error in 0.10.2.0 during rebalances. Reopen or create a new issue? WARN 2017-03-22 19:06:14,423 [StreamThread-20][StreamThread.java:1184] : Could not create task 3_346. Will retry. org.apache.kafka.streams.errors.LockException: task [3_346] Failed to lock the state directory: /data/kafka_streams/some_job/3_346 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102) at org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) ) > Failed to lock the state directory due to an unexpected exception > - > > Key: KAFKA-4392 > URL: https://issues.apache.org/jira/browse/KAFKA-4392 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0 >Reporter: Ara Ebrahimi >Assignee: Guozhang Wang > Fix For: 0.10.2.0 > > > This happened on streaming startup, on a clean installation, no existing > folder. Here I was starting 4 instances of our streaming app on 4 machines > and one threw this exception. Seems to me there’s a race condition somewhere > when instances discover others, or something like that. > 2016-11-02 15:43:47 INFO StreamRunner:59 - Started http server successfully. > 2016-11-02 15:44:50 ERROR StateDirectory:147 - Failed to lock the state > directory due to an unexpected exception > java.nio.file.NoSuchFileException: > /data/1/kafka-streams/myapp-streams/7_21/.lock > at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) > at > sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:177) > at java.nio.channels.FileChannel.open(FileChannel.java:287) > at java.nio.channels.FileChannel.open(FileChannel.java:335) > at > org.apache.kafka.streams.processor.internals.StateDirectory.getOrCreateFileChannel(StateDirectory.java:176) > at > org.apache.kafka.streams.processor.internals.StateDirectory.lock(StateDirectory.java:90) > at > org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:140) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeClean(StreamThread.java:552) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:459) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) > ^C > [arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/7_21/ > ls: cannot access /data/1/kafka-streams/myapp-streams/7_21/: No such file or > directory > [arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/ > total 4 > drwxr-xr-x 74 root root 4096 Nov 2 15:44 . > drwxr-xr-x 3 root root 27 Nov 2 15:43 .. > drwxr-xr-x 3 root root 32 Nov 2 15:43 0_1 > drwxr-xr-x 3 root root 32 Nov 2
[jira] [Commented] (KAFKA-4392) Failed to lock the state directory due to an unexpected exception
[ https://issues.apache.org/jira/browse/KAFKA-4392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15937388#comment-15937388 ] Elias Levy commented on KAFKA-4392: --- I am still seeing this error in 0.10.2.0 during rebalances. Reopen or create a new issue? WARN 2017-03-22 19:06:14,423 [StreamThread-20][StreamThread.java:1184] : Could not create task 3_346. Will retry. org.apache.kafka.streams.errors.LockException: task [3_346] Failed to lock the state directory: /data/kafka_streams/some_job/3_346 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102) at org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Failed to lock the state directory due to an unexpected exception > - > > Key: KAFKA-4392 > URL: https://issues.apache.org/jira/browse/KAFKA-4392 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0 >Reporter: Ara Ebrahimi >Assignee: Guozhang Wang > Fix For: 0.10.2.0 > > > This happened on streaming startup, on a clean installation, no existing > folder. Here I was starting 4 instances of our streaming app on 4 machines > and one threw this exception. Seems to me there’s a race condition somewhere > when instances discover others, or something like that. > 2016-11-02 15:43:47 INFO StreamRunner:59 - Started http server successfully. > 2016-11-02 15:44:50 ERROR StateDirectory:147 - Failed to lock the state > directory due to an unexpected exception > java.nio.file.NoSuchFileException: > /data/1/kafka-streams/myapp-streams/7_21/.lock > at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) > at > sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:177) > at java.nio.channels.FileChannel.open(FileChannel.java:287) > at java.nio.channels.FileChannel.open(FileChannel.java:335) > at > org.apache.kafka.streams.processor.internals.StateDirectory.getOrCreateFileChannel(StateDirectory.java:176) > at > org.apache.kafka.streams.processor.internals.StateDirectory.lock(StateDirectory.java:90) > at > org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:140) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeClean(StreamThread.java:552) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:459) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) > ^C > [arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/7_21/ > ls: cannot access /data/1/kafka-streams/myapp-streams/7_21/: No such file or > directory > [arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/ > total 4 > drwxr-xr-x 74 root root 4096 Nov 2 15:44 . > drwxr-xr-x 3 root root 27 Nov 2 15:43 .. > drwxr-xr-x 3 root root 32 Nov 2 15:43 0_1 > drwxr-xr-x 3 root root 32
[jira] [Commented] (KAFKA-4392) Failed to lock the state directory due to an unexpected exception
[ https://issues.apache.org/jira/browse/KAFKA-4392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15937389#comment-15937389 ] Elias Levy commented on KAFKA-4392: --- I am still seeing this error in 0.10.2.0 during rebalances. Reopen or create a new issue? WARN 2017-03-22 19:06:14,423 [StreamThread-20][StreamThread.java:1184] : Could not create task 3_346. Will retry. org.apache.kafka.streams.errors.LockException: task [3_346] Failed to lock the state directory: /data/kafka_streams/some_job/3_346 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102) at org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Failed to lock the state directory due to an unexpected exception > - > > Key: KAFKA-4392 > URL: https://issues.apache.org/jira/browse/KAFKA-4392 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.0 >Reporter: Ara Ebrahimi >Assignee: Guozhang Wang > Fix For: 0.10.2.0 > > > This happened on streaming startup, on a clean installation, no existing > folder. Here I was starting 4 instances of our streaming app on 4 machines > and one threw this exception. Seems to me there’s a race condition somewhere > when instances discover others, or something like that. > 2016-11-02 15:43:47 INFO StreamRunner:59 - Started http server successfully. > 2016-11-02 15:44:50 ERROR StateDirectory:147 - Failed to lock the state > directory due to an unexpected exception > java.nio.file.NoSuchFileException: > /data/1/kafka-streams/myapp-streams/7_21/.lock > at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) > at > sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:177) > at java.nio.channels.FileChannel.open(FileChannel.java:287) > at java.nio.channels.FileChannel.open(FileChannel.java:335) > at > org.apache.kafka.streams.processor.internals.StateDirectory.getOrCreateFileChannel(StateDirectory.java:176) > at > org.apache.kafka.streams.processor.internals.StateDirectory.lock(StateDirectory.java:90) > at > org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:140) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeClean(StreamThread.java:552) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:459) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) > ^C > [arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/7_21/ > ls: cannot access /data/1/kafka-streams/myapp-streams/7_21/: No such file or > directory > [arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/ > total 4 > drwxr-xr-x 74 root root 4096 Nov 2 15:44 . > drwxr-xr-x 3 root root 27 Nov 2 15:43 .. > drwxr-xr-x 3 root root 32 Nov 2 15:43 0_1 > drwxr-xr-x 3 root root 32
[jira] [Updated] (KAFKA-4919) Document that stores must not be closed when Processors are closed
[ https://issues.apache.org/jira/browse/KAFKA-4919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elias Levy updated KAFKA-4919: -- Summary: Document that stores must not be closed when Processors are closed (was: Streams job fails with InvalidStateStoreException: Store is currently closed) > Document that stores must not be closed when Processors are closed > -- > > Key: KAFKA-4919 > URL: https://issues.apache.org/jira/browse/KAFKA-4919 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Elias Levy > > I have a streams job, that previously worked, that consumes and writes to a > large number of topics with many partitions and that uses many threads. I > upgraded the job to 0.10.2.0. The job now fails after a short time running, > seemingly after a rebalance. > {quote} > WARN 2017-03-19 18:03:20,432 [StreamThread-10][StreamThread.java:160] : > Unexpected state transition from RUNNING to NOT_RUNNING > {quote} > The first observation is that Streams is no longer outputting exceptions and > backtraces. I had to add code to get this information. > The exception: > {quote} > Exception: org.apache.kafka.streams.errors.StreamsException: Exception caught > in process. taskId=1_225, processor=KSTREAM-SOURCE-03, > topic=some_topic, partition=225, offset=266411 > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_225, processor=KSTREAM-SOURCE-03, topic=some_topic, > partition=225, offset=266411 > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: Store > someStore-201701060400 is currently closed > at > org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:205) > at > org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:221) > at > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:74) > at > org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54) > at > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109) > ... X more > {quote} > The error occurs for many partitions. > This was preceded by (for this partition): > {quote} > INFO 2017-03-19 18:03:16,403 [StreamThread-10][ConsumerCoordinator.java:393] > : Revoking previously assigned partitions [some_topic-225] for group some_job > INFO 2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:254] : > stream-thread [StreamThread-10] partitions [[some_topic-225]] revoked at the > beginning of consumer rebalance. > INFO 2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:1056] : > stream-thread [StreamThread-10] Closing a task's topology 1_225 > INFO 2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:544] : > stream-thread [StreamThread-10] Flushing state stores of task 1_225 > INFO 2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:534] : > stream-thread [StreamThread-10] Committing consumer offsets of task 1_225 > INFO 2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1012] : > stream-thread [StreamThread-10] Updating suspended tasks to contain active > tasks [[1_225, 0_445, 0_30]] > INFO 2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1019] : > stream-thread [StreamThread-10] Removing all active tasks [[1_225, 0_445, > 0_30]] > INFO 2017-03-19 18:03:19,925 [StreamThread-10][ConsumerCoordinator.java:252] > : Setting newly assigned partitions [some_tpoic-225] for group some_job > INFO 2017-03-19 18:03:19,927 [StreamThread-10][StreamThread.java:228] : > stream-thread [StreamThread-10] New partitions [[some_topic-225]] assigned at > the end of consumer rebalance. > INFO 2017-03-19 18:03:19,929 [StreamThread-10][StreamTask.java:333] : task > [1_225] Initializing processor nodes of the topology > Something happens. What ??? > INFO 2017-03-19 18:03:20,135 [StreamThread-10][StreamThread.java:1045] : > stream-thread [StreamThread-10] Closing a task 1_225 > INFO 2017-03-19 18:03:20,355 [StreamThread-10][StreamThread.java:544] : > stream-thread [StreamThread-10] Flushing state stores of task 1_225 > INFO 2017-03-19 18:03:20,355 [StreamThread-10][StreamThread.java:523] : > stream-thread
[jira] [Commented] (KAFKA-4919) Streams job fails with InvalidStateStoreException: Store is currently closed
[ https://issues.apache.org/jira/browse/KAFKA-4919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15936714#comment-15936714 ] Elias Levy commented on KAFKA-4919: --- Thanks for the information. It is something that should probably be mentioned in the release notes. > Streams job fails with InvalidStateStoreException: Store is currently closed > > > Key: KAFKA-4919 > URL: https://issues.apache.org/jira/browse/KAFKA-4919 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Elias Levy > > I have a streams job, that previously worked, that consumes and writes to a > large number of topics with many partitions and that uses many threads. I > upgraded the job to 0.10.2.0. The job now fails after a short time running, > seemingly after a rebalance. > {quote} > WARN 2017-03-19 18:03:20,432 [StreamThread-10][StreamThread.java:160] : > Unexpected state transition from RUNNING to NOT_RUNNING > {quote} > The first observation is that Streams is no longer outputting exceptions and > backtraces. I had to add code to get this information. > The exception: > {quote} > Exception: org.apache.kafka.streams.errors.StreamsException: Exception caught > in process. taskId=1_225, processor=KSTREAM-SOURCE-03, > topic=some_topic, partition=225, offset=266411 > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_225, processor=KSTREAM-SOURCE-03, topic=some_topic, > partition=225, offset=266411 > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: Store > someStore-201701060400 is currently closed > at > org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:205) > at > org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:221) > at > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:74) > at > org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54) > at > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109) > ... X more > {quote} > The error occurs for many partitions. > This was preceded by (for this partition): > {quote} > INFO 2017-03-19 18:03:16,403 [StreamThread-10][ConsumerCoordinator.java:393] > : Revoking previously assigned partitions [some_topic-225] for group some_job > INFO 2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:254] : > stream-thread [StreamThread-10] partitions [[some_topic-225]] revoked at the > beginning of consumer rebalance. > INFO 2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:1056] : > stream-thread [StreamThread-10] Closing a task's topology 1_225 > INFO 2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:544] : > stream-thread [StreamThread-10] Flushing state stores of task 1_225 > INFO 2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:534] : > stream-thread [StreamThread-10] Committing consumer offsets of task 1_225 > INFO 2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1012] : > stream-thread [StreamThread-10] Updating suspended tasks to contain active > tasks [[1_225, 0_445, 0_30]] > INFO 2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1019] : > stream-thread [StreamThread-10] Removing all active tasks [[1_225, 0_445, > 0_30]] > INFO 2017-03-19 18:03:19,925 [StreamThread-10][ConsumerCoordinator.java:252] > : Setting newly assigned partitions [some_tpoic-225] for group some_job > INFO 2017-03-19 18:03:19,927 [StreamThread-10][StreamThread.java:228] : > stream-thread [StreamThread-10] New partitions [[some_topic-225]] assigned at > the end of consumer rebalance. > INFO 2017-03-19 18:03:19,929 [StreamThread-10][StreamTask.java:333] : task > [1_225] Initializing processor nodes of the topology > Something happens. What ??? > INFO 2017-03-19 18:03:20,135 [StreamThread-10][StreamThread.java:1045] : > stream-thread [StreamThread-10] Closing a task 1_225 > INFO 2017-03-19 18:03:20,355 [StreamThread-10][StreamThread.java:544] : > stream-thread [StreamThread-10] Flushing state stores of task 1_225 > INFO 2017-03-19 18:03:20,355 [StreamThread-10][StreamThread.java:523] : > stream-thread
[jira] [Commented] (KAFKA-4919) Streams job fails with InvalidStateStoreException: Store is currently closed
[ https://issues.apache.org/jira/browse/KAFKA-4919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15935711#comment-15935711 ] Elias Levy commented on KAFKA-4919: --- I believe I found the issue. I am closing the store when my {{Processor}} is closed. The {{Processor}} is closed during rebalance via {{onPartitionsRevoked}} -> {{suspendTasksAndStat}} -> {{closeAllTasksTopologies}} -> {{performOnAllTasks}} -> {{closeTopology}} -> {{ProcessorNode.close}}. It was my understanding that it was proper to close state stores used by a {{Processor}} when the {{Processor}} is closed. The examples in the low-level API documentation follow this pattern (See [https://kafka.apache.org/0102/documentation/streams#streams_processor] and [https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java]). Has this changed in 0.10.2.0? Or is this a new issue only affecting the refactored segmented stores? > Streams job fails with InvalidStateStoreException: Store is currently closed > > > Key: KAFKA-4919 > URL: https://issues.apache.org/jira/browse/KAFKA-4919 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Elias Levy > > I have a streams job, that previously worked, that consumes and writes to a > large number of topics with many partitions and that uses many threads. I > upgraded the job to 0.10.2.0. The job now fails after a short time running, > seemingly after a rebalance. > {quote} > WARN 2017-03-19 18:03:20,432 [StreamThread-10][StreamThread.java:160] : > Unexpected state transition from RUNNING to NOT_RUNNING > {quote} > The first observation is that Streams is no longer outputting exceptions and > backtraces. I had to add code to get this information. > The exception: > {quote} > Exception: org.apache.kafka.streams.errors.StreamsException: Exception caught > in process. taskId=1_225, processor=KSTREAM-SOURCE-03, > topic=some_topic, partition=225, offset=266411 > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_225, processor=KSTREAM-SOURCE-03, topic=some_topic, > partition=225, offset=266411 > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: Store > someStore-201701060400 is currently closed > at > org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:205) > at > org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:221) > at > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:74) > at > org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54) > at > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109) > ... X more > {quote} > The error occurs for many partitions. > This was preceded by (for this partition): > {quote} > INFO 2017-03-19 18:03:16,403 [StreamThread-10][ConsumerCoordinator.java:393] > : Revoking previously assigned partitions [some_topic-225] for group some_job > INFO 2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:254] : > stream-thread [StreamThread-10] partitions [[some_topic-225]] revoked at the > beginning of consumer rebalance. > INFO 2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:1056] : > stream-thread [StreamThread-10] Closing a task's topology 1_225 > INFO 2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:544] : > stream-thread [StreamThread-10] Flushing state stores of task 1_225 > INFO 2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:534] : > stream-thread [StreamThread-10] Committing consumer offsets of task 1_225 > INFO 2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1012] : > stream-thread [StreamThread-10] Updating suspended tasks to contain active > tasks [[1_225, 0_445, 0_30]] > INFO 2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1019] : > stream-thread [StreamThread-10] Removing all active tasks [[1_225, 0_445, > 0_30]] > INFO 2017-03-19 18:03:19,925 [StreamThread-10][ConsumerCoordinator.java:252] > : Setting newly assigned partitions [some_tpoic-225] for group some_job > INFO
[jira] [Commented] (KAFKA-4919) Streams job fails with InvalidStateStoreException: Store is currently closed
[ https://issues.apache.org/jira/browse/KAFKA-4919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15935252#comment-15935252 ] Elias Levy commented on KAFKA-4919: --- The issue appears to be that a {{RocksDBStore}} segment of a {{RocksDBSegmentedBytesStore}} is closed during a call to {{put}} when the store is reused after a consumer rebalance. I've reviewed the code, but I can't see how the segment could be closed after the rebalance. If it is a newly crated segment, it will be open. If it is closed, then it should have been destroyed and removed from the segments list in {{Segments.cleanup}}. Thoughts? > Streams job fails with InvalidStateStoreException: Store is currently closed > > > Key: KAFKA-4919 > URL: https://issues.apache.org/jira/browse/KAFKA-4919 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Elias Levy > > I have a streams job, that previously worked, that consumes and writes to a > large number of topics with many partitions and that uses many threads. I > upgraded the job to 0.10.2.0. The job now fails after a short time running, > seemingly after a rebalance. > {quote} > WARN 2017-03-19 18:03:20,432 [StreamThread-10][StreamThread.java:160] : > Unexpected state transition from RUNNING to NOT_RUNNING > {quote} > The first observation is that Streams is no longer outputting exceptions and > backtraces. I had to add code to get this information. > The exception: > {quote} > Exception: org.apache.kafka.streams.errors.StreamsException: Exception caught > in process. taskId=1_225, processor=KSTREAM-SOURCE-03, > topic=some_topic, partition=225, offset=266411 > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_225, processor=KSTREAM-SOURCE-03, topic=some_topic, > partition=225, offset=266411 > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: Store > someStore-201701060400 is currently closed > at > org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:205) > at > org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:221) > at > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:74) > at > org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54) > at > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109) > ... X more > {quote} > The error occurs for many partitions. > This was preceded by (for this partition): > {quote} > INFO 2017-03-19 18:03:16,403 [StreamThread-10][ConsumerCoordinator.java:393] > : Revoking previously assigned partitions [some_topic-225] for group some_job > INFO 2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:254] : > stream-thread [StreamThread-10] partitions [[some_topic-225]] revoked at the > beginning of consumer rebalance. > INFO 2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:1056] : > stream-thread [StreamThread-10] Closing a task's topology 1_225 > INFO 2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:544] : > stream-thread [StreamThread-10] Flushing state stores of task 1_225 > INFO 2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:534] : > stream-thread [StreamThread-10] Committing consumer offsets of task 1_225 > INFO 2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1012] : > stream-thread [StreamThread-10] Updating suspended tasks to contain active > tasks [[1_225, 0_445, 0_30]] > INFO 2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1019] : > stream-thread [StreamThread-10] Removing all active tasks [[1_225, 0_445, > 0_30]] > INFO 2017-03-19 18:03:19,925 [StreamThread-10][ConsumerCoordinator.java:252] > : Setting newly assigned partitions [some_tpoic-225] for group some_job > INFO 2017-03-19 18:03:19,927 [StreamThread-10][StreamThread.java:228] : > stream-thread [StreamThread-10] New partitions [[some_topic-225]] assigned at > the end of consumer rebalance. > INFO 2017-03-19 18:03:19,929 [StreamThread-10][StreamTask.java:333] : task > [1_225] Initializing processor nodes of the topology > Something happens. What ??? > INFO 2017-03-19 18:03:20,135
[jira] [Updated] (KAFKA-4919) Streams job fails with InvalidStateStoreException: Store is currently closed
[ https://issues.apache.org/jira/browse/KAFKA-4919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elias Levy updated KAFKA-4919: -- Summary: Streams job fails with InvalidStateStoreException: Store is currently closed (was: Streams job fails with StreamsExceptio) > Streams job fails with InvalidStateStoreException: Store is currently closed > > > Key: KAFKA-4919 > URL: https://issues.apache.org/jira/browse/KAFKA-4919 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Elias Levy > > I have a streams job, that previously worked, that consumes and writes to a > large number of topics with many partitions and that uses many threads. I > upgraded the job to 0.10.2.0. The job now fails after a short time running, > seemingly after a rebalance. > {quote} > WARN 2017-03-19 18:03:20,432 [StreamThread-10][StreamThread.java:160] : > Unexpected state transition from RUNNING to NOT_RUNNING > {quote} > The first observation is that Streams is no longer outputting exceptions and > backtraces. I had to add code to get this information. > The exception: > {quote} > Exception: org.apache.kafka.streams.errors.StreamsException: Exception caught > in process. taskId=1_225, processor=KSTREAM-SOURCE-03, > topic=some_topic, partition=225, offset=266411 > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_225, processor=KSTREAM-SOURCE-03, topic=some_topic, > partition=225, offset=266411 > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: Store > someStore-201701060400 is currently closed > at > org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:205) > at > org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:221) > at > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:74) > at > org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54) > at > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109) > ... X more > {quote} > The error occurs for many partitions. > This was preceded by (for this partition): > {quote} > INFO 2017-03-19 18:03:16,403 [StreamThread-10][ConsumerCoordinator.java:393] > : Revoking previously assigned partitions [some_topic-225] for group some_job > INFO 2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:254] : > stream-thread [StreamThread-10] partitions [[some_topic-225]] revoked at the > beginning of consumer rebalance. > INFO 2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:1056] : > stream-thread [StreamThread-10] Closing a task's topology 1_225 > INFO 2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:544] : > stream-thread [StreamThread-10] Flushing state stores of task 1_225 > INFO 2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:534] : > stream-thread [StreamThread-10] Committing consumer offsets of task 1_225 > INFO 2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1012] : > stream-thread [StreamThread-10] Updating suspended tasks to contain active > tasks [[1_225, 0_445, 0_30]] > INFO 2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1019] : > stream-thread [StreamThread-10] Removing all active tasks [[1_225, 0_445, > 0_30]] > INFO 2017-03-19 18:03:19,925 [StreamThread-10][ConsumerCoordinator.java:252] > : Setting newly assigned partitions [some_tpoic-225] for group some_job > INFO 2017-03-19 18:03:19,927 [StreamThread-10][StreamThread.java:228] : > stream-thread [StreamThread-10] New partitions [[some_topic-225]] assigned at > the end of consumer rebalance. > INFO 2017-03-19 18:03:19,929 [StreamThread-10][StreamTask.java:333] : task > [1_225] Initializing processor nodes of the topology > Something happens. What ??? > INFO 2017-03-19 18:03:20,135 [StreamThread-10][StreamThread.java:1045] : > stream-thread [StreamThread-10] Closing a task 1_225 > INFO 2017-03-19 18:03:20,355 [StreamThread-10][StreamThread.java:544] : > stream-thread [StreamThread-10] Flushing state stores of task 1_225 > INFO 2017-03-19 18:03:20,355 [StreamThread-10][StreamThread.java:523] : > stream-thread [StreamThread-10]
[jira] [Created] (KAFKA-4919) Streams job fails with StreamsExceptio
Elias Levy created KAFKA-4919: - Summary: Streams job fails with StreamsExceptio Key: KAFKA-4919 URL: https://issues.apache.org/jira/browse/KAFKA-4919 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.2.0 Reporter: Elias Levy I have a streams job, that previously worked, that consumes and writes to a large number of topics with many partitions and that uses many threads. I upgraded the job to 0.10.2.0. The job now fails after a short time running, seemingly after a rebalance. {quote} WARN 2017-03-19 18:03:20,432 [StreamThread-10][StreamThread.java:160] : Unexpected state transition from RUNNING to NOT_RUNNING {quote} The first observation is that Streams is no longer outputting exceptions and backtraces. I had to add code to get this information. The exception: {quote} Exception: org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_225, processor=KSTREAM-SOURCE-03, topic=some_topic, partition=225, offset=266411 org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_225, processor=KSTREAM-SOURCE-03, topic=some_topic, partition=225, offset=266411 at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: Store someStore-201701060400 is currently closed at org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:205) at org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:221) at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:74) at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54) at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109) ... X more {quote} The error occurs for many partitions. This was preceded by (for this partition): {quote} INFO 2017-03-19 18:03:16,403 [StreamThread-10][ConsumerCoordinator.java:393] : Revoking previously assigned partitions [some_topic-225] for group some_job INFO 2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:254] : stream-thread [StreamThread-10] partitions [[some_topic-225]] revoked at the beginning of consumer rebalance. INFO 2017-03-19 18:03:16,403 [StreamThread-10][StreamThread.java:1056] : stream-thread [StreamThread-10] Closing a task's topology 1_225 INFO 2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:544] : stream-thread [StreamThread-10] Flushing state stores of task 1_225 INFO 2017-03-19 18:03:17,887 [StreamThread-10][StreamThread.java:534] : stream-thread [StreamThread-10] Committing consumer offsets of task 1_225 INFO 2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1012] : stream-thread [StreamThread-10] Updating suspended tasks to contain active tasks [[1_225, 0_445, 0_30]] INFO 2017-03-19 18:03:17,891 [StreamThread-10][StreamThread.java:1019] : stream-thread [StreamThread-10] Removing all active tasks [[1_225, 0_445, 0_30]] INFO 2017-03-19 18:03:19,925 [StreamThread-10][ConsumerCoordinator.java:252] : Setting newly assigned partitions [some_tpoic-225] for group some_job INFO 2017-03-19 18:03:19,927 [StreamThread-10][StreamThread.java:228] : stream-thread [StreamThread-10] New partitions [[some_topic-225]] assigned at the end of consumer rebalance. INFO 2017-03-19 18:03:19,929 [StreamThread-10][StreamTask.java:333] : task [1_225] Initializing processor nodes of the topology Something happens. What ??? INFO 2017-03-19 18:03:20,135 [StreamThread-10][StreamThread.java:1045] : stream-thread [StreamThread-10] Closing a task 1_225 INFO 2017-03-19 18:03:20,355 [StreamThread-10][StreamThread.java:544] : stream-thread [StreamThread-10] Flushing state stores of task 1_225 INFO 2017-03-19 18:03:20,355 [StreamThread-10][StreamThread.java:523] : stream-thread [StreamThread-10] Closing the state manager of task 1_225 INFO 2017-03-19 18:03:20,432 [StreamThread-10][StreamThread.java:1019] : stream-thread [StreamThread-10] Removing all active tasks [[1_225, 0_445, 0_30]] INFO 2017-03-19 18:03:20,432 [StreamThread-10][StreamThread.java:1034] : stream-thread [StreamThread-10] Removing all standby tasks [[]] INFO 2017-03-19 18:03:20,432 [StreamThread-10][StreamThread.java:427] : stream-thread [StreamThread-10] Stream thread shutdown complete {quote} --
[jira] [Commented] (KAFKA-4887) Enabling caching on a persistent window store breaks support for duplicate insertion
[ https://issues.apache.org/jira/browse/KAFKA-4887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15922826#comment-15922826 ] Elias Levy commented on KAFKA-4887: --- It is a bit complicated. I am abusing the {{RocksDBWnidowStore}} as a TTL cache (something I've covered in other JIRA tickets). In this particular case the store is used to perform a join between two streams where in theory one of the streams is keyed by two values, while the other stream is keyed only by a single of those values. In practice the streams are only keyed by their shared value and when items from the stream that in theory is keyed by two values, as they are inserted with a single key, there may be collisions. To get around that I turn on the feature to allow duplicates and the join is performed by iterating over duplicates and matching the second key value. > Enabling caching on a persistent window store breaks support for duplicate > insertion > > > Key: KAFKA-4887 > URL: https://issues.apache.org/jira/browse/KAFKA-4887 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Elias Levy > > {{CachingWindowStore}} and {{RocksDBWindowStore}} interact badly when > duplicate insertion support is enabled by passing {{true}} as the fourth > argument to {{windowed}} in the state store supplier. > When the feature is enabled, {{RocksDBWindowStore}} correct handles > duplicates by assigning a unique sequence number to each element on insertion > and using the number within the key. > When caching is enabled by calling {{enableCaching}} on the supplier, > {{CachingWindowStore}} fails to the the same. Thus, of multiple values > inserted with the same key, only the last one survives. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4887) Enabling caching on a persistent window store breaks support for duplicate insertion
Elias Levy created KAFKA-4887: - Summary: Enabling caching on a persistent window store breaks support for duplicate insertion Key: KAFKA-4887 URL: https://issues.apache.org/jira/browse/KAFKA-4887 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.2.0 Reporter: Elias Levy {{CachingWindowStore}} and {{RocksDBWindowStore}} interact badly when duplicate insertion support is enabled by passing {{true}} as the fourth argument to {{windowed}} in the state store supplier. When the feature is enabled, {{RocksDBWindowStore}} correct handles duplicates by assigning a unique sequence number to each element on insertion and using the number within the key. When caching is enabled by calling {{enableCaching}} on the supplier, {{CachingWindowStore}} fails to the the same. Thus, of multiple values inserted with the same key, only the last one survives. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.
[ https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15868971#comment-15868971 ] Elias Levy commented on KAFKA-2729: --- Hit this again during testing with 0.10.0.1 on a 10 node broker cluster with a 3 node ZK ensemble. This should have priority Blocker instead of Major. > Cached zkVersion not equal to that in zookeeper, broker not recovering. > --- > > Key: KAFKA-2729 > URL: https://issues.apache.org/jira/browse/KAFKA-2729 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 >Reporter: Danil Serdyuchenko > > After a small network wobble where zookeeper nodes couldn't reach each other, > we started seeing a large number of undereplicated partitions. The zookeeper > cluster recovered, however we continued to see a large number of > undereplicated partitions. Two brokers in the kafka cluster were showing this > in the logs: > {code} > [2015-10-27 11:36:00,888] INFO Partition > [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for > partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 > (kafka.cluster.Partition) > [2015-10-27 11:36:00,891] INFO Partition > [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] > not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) > {code} > For all of the topics on the effected brokers. Both brokers only recovered > after a restart. Our own investigation yielded nothing, I was hoping you > could shed some light on this issue. Possibly if it's related to: > https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using > 0.8.2.1. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4748) Need a way to shutdown all workers in a Streams application at the same time
Elias Levy created KAFKA-4748: - Summary: Need a way to shutdown all workers in a Streams application at the same time Key: KAFKA-4748 URL: https://issues.apache.org/jira/browse/KAFKA-4748 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.1.1 Reporter: Elias Levy If you have a fleet of Stream workers for an application and attempt to shut them down simultaneously (e.g. via SIGTERM and Runtime.getRuntime().addShutdownHook() and streams.close())), a large number of the workers fail to shutdown. The problem appears to be a race condition between the shutdown signal and the consumer rebalancing that is triggered by some of the workers existing before others. Apparently, workers that receive the signal later fail to exit apparently as they are caught in the rebalance. Terminating workers in a rolling fashion is not advisable in some situations. The rolling shutdown will result in many unnecessary rebalances and may fail, as the application may have large amount of local state that a smaller number of nodes may not be able to store. It would appear that there is a need for a protocol change to allow the coordinator to signal a consumer group to shutdown without leading to rebalancing. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4144) Allow per stream/table timestamp extractor
[ https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836739#comment-15836739 ] Elias Levy commented on KAFKA-4144: --- What Matthias said. The issue has nothing to do with different applications. The problem is that the single configurable TimestampExtractor is used for all input topics. That means you must dump the timestamps extraction logic for all topics, which may be quite different, into a single TimestampExtractor. Imagine if you could only configure a single SerDe that would have to handle all topics, instead of being able to configure a SerDe per topic. > Allow per stream/table timestamp extractor > -- > > Key: KAFKA-4144 > URL: https://issues.apache.org/jira/browse/KAFKA-4144 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Jeyhun Karimov > Labels: api > > At the moment the timestamp extractor is configured via a StreamConfig value > to KafkaStreams. That means you can only have a single timestamp extractor > per app, even though you may be joining multiple streams/tables that require > different timestamp extraction methods. > You should be able to specify a timestamp extractor via > KStreamBuilder.stream/table, just like you can specify key and value serdes > that override the StreamConfig defaults. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4144) Allow per stream/table timestamp extractor
[ https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833913#comment-15833913 ] Elias Levy commented on KAFKA-4144: --- Jeyhun, sorry if I was not clear. My comment about not being able to configure {{TimestampExtractor}} is only applicable to the current implementation, as it is instantiated by {{StreamTask}} and the current interface of {{TimestampExtractor}} provides not access to configuration data. So right now if you are consuming multiple distinct topics as different streams and/or tables, you have to create a case statement in your {{TimestampExtractor}} to handle all the topics with a single extractor, as that is all that you can configure right now via the {{timestamp.extractor}} configuration property. And because {{TimestampExtractor}} is instantiated by {{StreamTask}} and the extractor does not have access to configuration data, you must hardcode the topic names in the extractor. That means you can't change input topic names dynamically. You can to recompile to change them. Obviously these observation is are longer applicable if the application can instantiate it's own {{TimestampExtractor}}s and pass them as an argument to {{TopologyBuilder.addSource}}, {{KStreamBuilder.source}}, and {{KStreamBuilder.table}}. As for Matthias' comments, I agree with #1 and #3. Not quite sure what he means by #2. Surely I could create a {{TimestampExtractor}} passing the constructor a topic name read from a config file while defining the topology, just like I can create SerDes that support the Confluent schema registry, where the registry endpoint is read from the config and use those while defining the topology. > Allow per stream/table timestamp extractor > -- > > Key: KAFKA-4144 > URL: https://issues.apache.org/jira/browse/KAFKA-4144 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Jeyhun Karimov > Labels: api > > At the moment the timestamp extractor is configured via a StreamConfig value > to KafkaStreams. That means you can only have a single timestamp extractor > per app, even though you may be joining multiple streams/tables that require > different timestamp extraction methods. > You should be able to specify a timestamp extractor via > KStreamBuilder.stream/table, just like you can specify key and value serdes > that override the StreamConfig defaults. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4144) Allow per stream/table timestamp extractor
[ https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15833649#comment-15833649 ] Elias Levy commented on KAFKA-4144: --- That's fine. I am less concerned about the implementation details than about the developer's API experience. E.g. having {{TopologyBuilder.addSource}}, {{KStreamBuilder.source}}, and {{KStreamBuilder.table}} take a {{TimestampExtractor}}. There is another problem with {{TimestampExtractor}}. It is instantiated within StreamTask and the instances cannot be configured. That means that the topics that {{TimestampExtractor}} supports are hardcoded into it. That makes it impossible to have a Stream application that uses {{TimestampExtractor}} were the input topics can be configured at execution time. > Allow per stream/table timestamp extractor > -- > > Key: KAFKA-4144 > URL: https://issues.apache.org/jira/browse/KAFKA-4144 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Jeyhun Karimov > Labels: api > > At the moment the timestamp extractor is configured via a StreamConfig value > to KafkaStreams. That means you can only have a single timestamp extractor > per app, even though you may be joining multiple streams/tables that require > different timestamp extraction methods. > You should be able to specify a timestamp extractor via > KStreamBuilder.stream/table, just like you can specify key and value serdes > that override the StreamConfig defaults. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4683) Mismatch between Stream windowed store and broker log retention logic
Elias Levy created KAFKA-4683: - Summary: Mismatch between Stream windowed store and broker log retention logic Key: KAFKA-4683 URL: https://issues.apache.org/jira/browse/KAFKA-4683 Project: Kafka Issue Type: Bug Components: log, streams Affects Versions: 0.10.1.1 Reporter: Elias Levy The RocksDBWindowStore keeps key-value entries for a configurable retention period. The leading edge of the time period kept is determined the newest timestamp of an inserted KV. The trailing edge is this leading edge minus the requested retention period. If logging is enabled, changes to the store are written to a change log topic that is configured with a retention.ms value equal to the store retention period. The leading edge of the time period kept by the log is the current time. The trailing edge is the leading edge minus the requested retention period. The difference on how the leading edge is determined can result in unexpected behavior. If the stream application is processing data older than the retention period and storing it in a windowed store, the store will have data for the retention period looking back from the newest timestamp of the processed message. But the messages written to the state changeling will almost immediately be deleted by the broker, as they will fall outside of the retention window as it computes it. If the application is stopped and restarted in this state, and if the local state has been lost of some reason, the application won't be able to recover the sate from the broker, as it broker has deleted it. In addition, I've noticed that there is a discrepancy on what timestamp is used between the store and the change log. The store will use the timestamp passed as an argument to {{put}}, or if no timestamp is passed, fallback to {{context.timestamp}}. But {{StoreChangeLogger.logChange}} does not take a timestamp. Instead is always uses {{context.timestamp}} to write the change to the broker. Thus it is possible that the state store and the change log to use different timestamps for the same KV. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4639) Kafka Streams metrics are undocumented
Elias Levy created KAFKA-4639: - Summary: Kafka Streams metrics are undocumented Key: KAFKA-4639 URL: https://issues.apache.org/jira/browse/KAFKA-4639 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.1.1 Reporter: Elias Levy Priority: Minor The documentation is silent on the metrics collected by the Streams framework. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4608) RocksDBWindowStore.fetch() is inefficient for large ranges
Elias Levy created KAFKA-4608: - Summary: RocksDBWindowStore.fetch() is inefficient for large ranges Key: KAFKA-4608 URL: https://issues.apache.org/jira/browse/KAFKA-4608 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.1.1 Reporter: Elias Levy It is not unreasonable for a user to call {{RocksDBWindowStore.fetch}} to scan for a key across a large time range. For instance, someone may call it with a {{timeFrom}} of zero or a {{timeTo}} of max long in an attempt to fetch keys matching across all time forwards or backwards. But if you do so, {{fetch}} will peg the CPU, as it attempts to iterate over every single segment id in the range. That is obviously very inefficient. {{fetch}} should trim the {{timeFrom}}/{{timeTo}} range based on the available time range in the {{segments}} hash map, so that it only iterates over the available time range. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4217) KStream.transform equivalent of flatMap
[ https://issues.apache.org/jira/browse/KAFKA-4217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15556371#comment-15556371 ] Elias Levy commented on KAFKA-4217: --- That would work as well. > KStream.transform equivalent of flatMap > --- > > Key: KAFKA-4217 > URL: https://issues.apache.org/jira/browse/KAFKA-4217 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Guozhang Wang > > {{KStream.transform}} gives you access to state stores while allowing you to > return zero or one transformed {{KeyValue}}. Alas, it is unclear what method > you should use if you want to access state stores and return zero or multiple > {{KeyValue}}. Presumably you can use {{transform}}, always return {{null}}, > and use {{ProcessorContext.forward}} to emit {{KeyValues}}. > It may be good to introduce a {{transform}}-like {{flatMap}} equivalent, or > allow store access from other {{KStream}} methods, such as {{flatMap}} itself. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4217) KStream.transform equivalent of flatMap
[ https://issues.apache.org/jira/browse/KAFKA-4217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1654#comment-1654 ] Elias Levy commented on KAFKA-4217: --- It would seem to be the same request: allow a transform that emits multiple values. Like Fodor, I am using {{ProcessorContext.forward}} to emit multiple values from {{Transformer.transform}}. Unlike him, I return {{null}} from the method instead of returning dummy values that must be filtered. At the very least, it should be documented that one can use {{ProcessorContext.forward}} to emit multiple values from {{Transformer.transform}}. Ideally, {{Transformer.transform}} would be modified to allow returning multiple values, or a variant of {{Transformer}} would allow you to do so. > KStream.transform equivalent of flatMap > --- > > Key: KAFKA-4217 > URL: https://issues.apache.org/jira/browse/KAFKA-4217 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Guozhang Wang > > {{KStream.transform}} gives you access to state stores while allowing you to > return zero or one transformed {{KeyValue}}. Alas, it is unclear what method > you should use if you want to access state stores and return zero or multiple > {{KeyValue}}. Presumably you can use {{transform}}, always return {{null}}, > and use {{ProcessorContext.forward}} to emit {{KeyValues}}. > It may be good to introduce a {{transform}}-like {{flatMap}} equivalent, or > allow store access from other {{KStream}} methods, such as {{flatMap}} itself. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
[ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15531353#comment-15531353 ] Elias Levy commented on KAFKA-4212: --- I am using the {{KStream.transform}} API, the {{Transform}} interface, and making use of state stores that I create. So very close to using the low level {{Processor}} API. But even so, the logic is essentially the same of the high level join, using a sliding window, except that one of the streams has a composite key. I don't see what hopping windows have to do with the use case. > Add a key-value store that is a TTL persistent cache > > > Key: KAFKA-4212 > URL: https://issues.apache.org/jira/browse/KAFKA-4212 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Guozhang Wang > > Some jobs needs to maintain as state a large set of key-values for some > period of time. I.e. they need to maintain a TTL cache of values potentially > larger than memory. > Currently Kafka Streams provides non-windowed and windowed key-value stores. > Neither is an exact fit to this use case. > The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as > required, but does not support expiration. The TTL option of RocksDB is > explicitly not used. > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. > But this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. > KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here > we desire a variable-capacity memory-overflowing TTL caching store. > Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be > useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
[ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15531116#comment-15531116 ] Elias Levy commented on KAFKA-4212: --- But joins are not performed on hopping windows, they are performed on a single sliding window. > Add a key-value store that is a TTL persistent cache > > > Key: KAFKA-4212 > URL: https://issues.apache.org/jira/browse/KAFKA-4212 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Guozhang Wang > > Some jobs needs to maintain as state a large set of key-values for some > period of time. I.e. they need to maintain a TTL cache of values potentially > larger than memory. > Currently Kafka Streams provides non-windowed and windowed key-value stores. > Neither is an exact fit to this use case. > The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as > required, but does not support expiration. The TTL option of RocksDB is > explicitly not used. > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. > But this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. > KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here > we desire a variable-capacity memory-overflowing TTL caching store. > Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be > useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
[ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15531022#comment-15531022 ] Elias Levy commented on KAFKA-4212: --- Not sure I follow. > Add a key-value store that is a TTL persistent cache > > > Key: KAFKA-4212 > URL: https://issues.apache.org/jira/browse/KAFKA-4212 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Guozhang Wang > > Some jobs needs to maintain as state a large set of key-values for some > period of time. I.e. they need to maintain a TTL cache of values potentially > larger than memory. > Currently Kafka Streams provides non-windowed and windowed key-value stores. > Neither is an exact fit to this use case. > The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as > required, but does not support expiration. The TTL option of RocksDB is > explicitly not used. > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. > But this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. > KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here > we desire a variable-capacity memory-overflowing TTL caching store. > Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be > useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
[ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15530995#comment-15530995 ] Elias Levy commented on KAFKA-4212: --- I would described it as a TTL, not LRU. We want the records to expire based on the record timestamp, not when the record was last accessed. > Add a key-value store that is a TTL persistent cache > > > Key: KAFKA-4212 > URL: https://issues.apache.org/jira/browse/KAFKA-4212 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Guozhang Wang > > Some jobs needs to maintain as state a large set of key-values for some > period of time. I.e. they need to maintain a TTL cache of values potentially > larger than memory. > Currently Kafka Streams provides non-windowed and windowed key-value stores. > Neither is an exact fit to this use case. > The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as > required, but does not support expiration. The TTL option of RocksDB is > explicitly not used. > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. > But this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. > KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here > we desire a variable-capacity memory-overflowing TTL caching store. > Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be > useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
[ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527841#comment-15527841 ] Elias Levy commented on KAFKA-4212: --- Looks like using {{WindowStore}} is not quite so simple, as you can't actually iterate it in reverse order. It can be still be used, but you must iterate forward until the last status update and last client status query for each client. All the more reason for a TTL enabled {{KeyValueStore}}. > Add a key-value store that is a TTL persistent cache > > > Key: KAFKA-4212 > URL: https://issues.apache.org/jira/browse/KAFKA-4212 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Guozhang Wang > > Some jobs needs to maintain as state a large set of key-values for some > period of time. I.e. they need to maintain a TTL cache of values potentially > larger than memory. > Currently Kafka Streams provides non-windowed and windowed key-value stores. > Neither is an exact fit to this use case. > The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as > required, but does not support expiration. The TTL option of RocksDB is > explicitly not used. > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. > But this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. > KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here > we desire a variable-capacity memory-overflowing TTL caching store. > Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be > useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4144) Allow per stream/table timestamp extractor
[ https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527301#comment-15527301 ] Elias Levy commented on KAFKA-4144: --- Indeed, that is how it is handled today. But it's not a very clean way to do so. > Allow per stream/table timestamp extractor > -- > > Key: KAFKA-4144 > URL: https://issues.apache.org/jira/browse/KAFKA-4144 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Guozhang Wang > > At the moment the timestamp extractor is configured via a StreamConfig value > to KafkaStreams. That means you can only have a single timestamp extractor > per app, even though you may be joining multiple streams/tables that require > different timestamp extraction methods. > You should be able to specify a timestamp extractor via > KStreamBuilder.stream/table, just like you can specify key and value serdes > that override the StreamConfig defaults. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
[ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524513#comment-15524513 ] Elias Levy commented on KAFKA-4212: --- More generally, the use case is: I've told a bunch of folks about some property of some type of object, and I would like to notify those folks every time that property changes for for the specific objects they have asked about during some configurable time period since the last time they've asked me about the object. > Add a key-value store that is a TTL persistent cache > > > Key: KAFKA-4212 > URL: https://issues.apache.org/jira/browse/KAFKA-4212 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Guozhang Wang > > Some jobs needs to maintain as state a large set of key-values for some > period of time. I.e. they need to maintain a TTL cache of values potentially > larger than memory. > Currently Kafka Streams provides non-windowed and windowed key-value stores. > Neither is an exact fit to this use case. > The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as > required, but does not support expiration. The TTL option of RocksDB is > explicitly not used. > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. > But this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. > KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here > we desire a variable-capacity memory-overflowing TTL caching store. > Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be > useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
[ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15524402#comment-15524402 ] Elias Levy commented on KAFKA-4212: --- The general use case is the joining of updates to two tables over a limited period of time. Consider a hypothetical monitoring service that that allows clients to query the status of nodes. The application may wish to inform the clients whenever the status of a node that they have queried changes, but only if the client has queried the status during the past 24 hours and if the last status for a node is different from the last status the client received. To do so the service can consume a stream of client node status queries with their responses and node status updates. From the stream of client node status queries the service would maintain a cache of the last status for a node sent to to a client such that entries expire after 24 hours. From the node status updates the service would maintain a mapping of node to latest status. When a client query is received, the service can check on the node status mapping to see if there is a newer status, and if there is, generate a notification. When a node status update is received, the service can check the last status sent to clients in the cache and generate a notification with the new status to all clients that previously queried for a node's status. As an optimization the mapping of nodes to latest status can also be a cache with a TTL, since you don't need to keep the statuses of a nodes that haven't changed in more than 24 hours, as you'll never receive a delayed node status query to match it against. Abstractly this is equivalent to a {{KTable}}-{{KTable}} inner join where entries in each {{KTable}} expire after some TTL, and where one table has a composite primary key (node id and client id on one {{KTable}} vs just node it on the other). It could also be though as a windowed {{KTable}} - {{KTable}} join (although in such case records that fall outside the window would never be used and are just wasting space), or a windowed {{KStream}}-{{KStream}} join of table updates where only the latest updated values are used (i.e. discard updates in the window if there is a newer update). Although, again, these would be joins where the primary keys are not identical as one is a composite. Alas, Kafka Streams does not support windowed {{KTable}}-{{KTable}} joins, TTL'ed {{KeyValueStore}} s, or joins across {{KTable}} s and/or {{KStream}} s with different keys. That said, the above service can be implemented by joining the client status query and client status updates streams using custom processors and by abusing {{WindowStore}}. {{WindowStore}} can be used as a form of TTL'ed {{KeyValueStore}}, as it will drop old values that fall out of its window, and by iterating in reverse order and only using the latest value. And since it allows you to store multiple values for the same key (node id), you can record the node status you handed out to clients (node id key; client id, status, and timestamp as value) and then iterate over all of them for a given node id keeping only the latest one for each client id when a node status update comes in an you perform the join. > Add a key-value store that is a TTL persistent cache > > > Key: KAFKA-4212 > URL: https://issues.apache.org/jira/browse/KAFKA-4212 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Guozhang Wang > > Some jobs needs to maintain as state a large set of key-values for some > period of time. I.e. they need to maintain a TTL cache of values potentially > larger than memory. > Currently Kafka Streams provides non-windowed and windowed key-value stores. > Neither is an exact fit to this use case. > The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as > required, but does not support expiration. The TTL option of RocksDB is > explicitly not used. > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. > But this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. > KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here > we desire a variable-capacity memory-overflowing TTL caching store. > Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be > useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
[ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15521287#comment-15521287 ] Elias Levy edited comment on KAFKA-4212 at 9/25/16 7:17 PM: It should be noted that a variable-capacity memory-overflowing TTL caching store is semantically equivalent to a KTable that expires entries via a TTL. Such a KTable may be a viable alternative or at least a useful additional abstraction. was (Author: elevy): I should be noted that a variable-capacity memory-overflowing TTL caching store is semantically equivalent to a KTable that expires entries via a TTL. Such a KTable may be a viable alternative or at least a useful additional abstraction. > Add a key-value store that is a TTL persistent cache > > > Key: KAFKA-4212 > URL: https://issues.apache.org/jira/browse/KAFKA-4212 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Guozhang Wang > > Some jobs needs to maintain as state a large set of key-values for some > period of time. I.e. they need to maintain a TTL cache of values potentially > larger than memory. > Currently Kafka Streams provides non-windowed and windowed key-value stores. > Neither is an exact fit to this use case. > The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as > required, but does not support expiration. The TTL option of RocksDB is > explicitly not used. > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. > But this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. > KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here > we desire a variable-capacity memory-overflowing TTL caching store. > Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be > useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
[ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15521287#comment-15521287 ] Elias Levy commented on KAFKA-4212: --- I should be noted that a variable-capacity memory-overflowing TTL caching store is semantically equivalent to a KTable that expires entries via a TTL. Such a KTable may be a viable alternative or at least a useful additional abstraction. > Add a key-value store that is a TTL persistent cache > > > Key: KAFKA-4212 > URL: https://issues.apache.org/jira/browse/KAFKA-4212 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Guozhang Wang > > Some jobs needs to maintain as state a large set of key-values for some > period of time. I.e. they need to maintain a TTL cache of values potentially > larger than memory. > Currently Kafka Streams provides non-windowed and windowed key-value stores. > Neither is an exact fit to this use case. > The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as > required, but does not support expiration. The TTL option of RocksDB is > explicitly not used. > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. > But this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. > KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here > we desire a variable-capacity memory-overflowing TTL caching store. > Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be > useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4219) Permit setting of event time in stream processor
Elias Levy created KAFKA-4219: - Summary: Permit setting of event time in stream processor Key: KAFKA-4219 URL: https://issues.apache.org/jira/browse/KAFKA-4219 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.0.1 Reporter: Elias Levy Assignee: Guozhang Wang Event time is assigned in stream sources via {{TimestampExtractor}}. Once the event time has been assigned, it remains the same, regardless of any downstream processing in the topology. This is insufficient for many processing jobs, particularly when the output of the job is written back into a Kafka topic, where the record's time is encoded outside of the record's value. For instance: * When performing windowed aggregations it may be desirable for the timestamp of the emitted record to be lower or higher limits of the time window, rather than the timestamp of the last processed element, which may be anywhere within the time window. * When joining two streams, it is non-deterministic which of the two record's timestamps will be the timestamp of the emitted record. It would be either one depending on what order the records are processed. Even where this deterministic, it may be desirable for the emitted timestamp to be altogether different from the timestamp of the joined records. For instance, setting the timestamp to the current processing time may be desirable. * In general, lower level processors may wish to set the timestamp of emitted records to an arbitrary value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4120) byte[] keys in RocksDB state stores do not work as expected
[ https://issues.apache.org/jira/browse/KAFKA-4120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15521271#comment-15521271 ] Elias Levy commented on KAFKA-4120: --- {{o.a.k.common.utils.Bytes}} is not a documented class. > byte[] keys in RocksDB state stores do not work as expected > --- > > Key: KAFKA-4120 > URL: https://issues.apache.org/jira/browse/KAFKA-4120 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Greg Fodor >Assignee: Guozhang Wang > > We ran into an issue using a byte[] key in a RocksDB state store (with the > byte array serde.) Internally, the RocksDB store keeps a LRUCache that is > backed by a LinkedHashMap that sits between the callers and the actual db. > The problem is that while the underlying rocks db will persist byte arrays > with equal data as equivalent keys, the LinkedHashMap uses byte[] reference > equality from Object.equals/hashcode. So, this can result in multiple entries > in the cache for two different byte arrays that have the same contents and > are backed by the same key in the db, resulting in unexpected behavior. > One such behavior that manifests from this is if you store a value in the > state store with a specific key, if you re-read that key with the same byte > array you will get the new value, but if you re-read that key with a > different byte array with the same bytes, you will get a stale value until > the db is flushed. (This made it particularly tricky to track down what was > happening :)) > The workaround for us is to convert the keys from raw byte arrays to a > deserialized avro structure that provides proper hashcode/equals semantics > for the intermediate cache. In general this seems like good practice, so one > of the proposed solutions is to simply emit a warning or exception if a key > type with breaking semantics like this is provided. > A few proposed solutions: > - When the state store is defined on array keys, ensure that the cache map > does proper comparisons on array values not array references. This would fix > this problem, but seems a bit strange to special case. However, I have a hard > time of thinking of other examples where this behavior would burn users. > - Change the LRU cache to deserialize and serialize all keys to bytes and use > a value based comparison for the map. This would be the most correct, as it > would ensure that both the rocks db and the cache have identical key spaces > and equality/hashing semantics. However, this is probably slow, and since the > general case of using avro record types as keys works fine, it will largely > be unnecessary overhead. > - Don't change anything about the behavior, but trigger a warning in the log > or fail to start if a state store is defined on array keys (or possibly any > key type that fails to properly override Object.equals/hashcode.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4218) Enable access to key in {{ValueTransformer}}
Elias Levy created KAFKA-4218: - Summary: Enable access to key in {{ValueTransformer}} Key: KAFKA-4218 URL: https://issues.apache.org/jira/browse/KAFKA-4218 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.0.1 Reporter: Elias Levy Assignee: Guozhang Wang While transforming values via {{KStream.transformValues}} and {{ValueTransformer}}, the key associated with the value may be needed, even if it is not changed. For instance, it may be used to access stores. As of now, the key is not available within these methods and interfaces, leading to the use of {{KStream.transform}} and {{Transformer}}, and the unnecessary creation of new {{KeyValue}} objects. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4217) KStream.transform equivalent of flatMap
Elias Levy created KAFKA-4217: - Summary: KStream.transform equivalent of flatMap Key: KAFKA-4217 URL: https://issues.apache.org/jira/browse/KAFKA-4217 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.0.1 Reporter: Elias Levy Assignee: Guozhang Wang {{KStream.transform}} gives you access to state stores while allowing you to return zero or one transformed {{KeyValue}}s. Alas, it is unclear what method you should use if you want to access state stores and return zero or multiple {{KeyValue}}s. Presumably you can use {{transform}}, always return {{null}}, and use {{ProcessorContext.forward}} to emit {{KeyValues}}s. It may be good to introduce a {{transform}}-like {{flatMap}} equivalent, or allow store access from other {{KStream}} methods, such as {{flatMap}} itself. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
Elias Levy created KAFKA-4212: - Summary: Add a key-value store that is a TTL persistent cache Key: KAFKA-4212 URL: https://issues.apache.org/jira/browse/KAFKA-4212 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.0.1 Reporter: Elias Levy Assignee: Guozhang Wang Some jobs needs to maintain as state a large set of key-values for some period of time. I.e. they need to maintain a TTL cache of values potentially larger than memory. Currently Kafka Streams provides non-windowed and windowed key-value stores. Neither is an exact fit to this use case. The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as required, but does not support expiration. The TTL option of RocksDB is explicitly not used. The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment dropping, but it stores multiple items per key, based on their timestamp. But this store can be repurposed as a cache by fetching the items in reverse chronological order and returning the first item found. KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here we desire a variable-capacity memory-overflowing TTL caching store. Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window
[ https://issues.apache.org/jira/browse/KAFKA-4153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15491553#comment-15491553 ] Elias Levy commented on KAFKA-4153: --- I've updated the PR to reverse the before & after semantics as you've pointed out. > Incorrect KStream-KStream join behavior with asymmetric time window > --- > > Key: KAFKA-4153 > URL: https://issues.apache.org/jira/browse/KAFKA-4153 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Guozhang Wang > > Using Kafka 0.10.0.1, if joining records in two streams separated by some > time, but only when records from one stream are newer than records from the > other, i.e. doing: > {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}} > One would expect that the following would be equivalent: > {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}} > Alas, that this is not the case. Instead, this generates the same output as > the first example: > {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}} > The problem is that the > [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697] > implementation in {{KStreamImpl}} fails to reverse the {{before}} and > {{after}} values when creates the {{KStreamKStreamJoin}} for the other > stream, even though is calls {{reverseJoiner}} to reverse the joiner. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window
[ https://issues.apache.org/jira/browse/KAFKA-4153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485826#comment-15485826 ] Elias Levy commented on KAFKA-4153: --- Why would the other value be undefined? Default it to zero. > Incorrect KStream-KStream join behavior with asymmetric time window > --- > > Key: KAFKA-4153 > URL: https://issues.apache.org/jira/browse/KAFKA-4153 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Guozhang Wang > > Using Kafka 0.10.0.1, if joining records in two streams separated by some > time, but only when records from one stream are newer than records from the > other, i.e. doing: > {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}} > One would expect that the following would be equivalent: > {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}} > Alas, that this is not the case. Instead, this generates the same output as > the first example: > {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}} > The problem is that the > [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697] > implementation in {{KStreamImpl}} fails to reverse the {{before}} and > {{after}} values when creates the {{KStreamKStreamJoin}} for the other > stream, even though is calls {{reverseJoiner}} to reverse the joiner. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window
[ https://issues.apache.org/jira/browse/KAFKA-4153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485700#comment-15485700 ] Elias Levy commented on KAFKA-4153: --- You could do that, but it would be non-obvious to someone reading the code what the semantics of that window should be. I would prefer seeing {{before}} and {{after}} have static alternatives, so you could write {{JoinWindows.before(100).after(50)}} or similar. > Incorrect KStream-KStream join behavior with asymmetric time window > --- > > Key: KAFKA-4153 > URL: https://issues.apache.org/jira/browse/KAFKA-4153 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Guozhang Wang > > Using Kafka 0.10.0.1, if joining records in two streams separated by some > time, but only when records from one stream are newer than records from the > other, i.e. doing: > {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}} > One would expect that the following would be equivalent: > {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}} > Alas, that this is not the case. Instead, this generates the same output as > the first example: > {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}} > The problem is that the > [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697] > implementation in {{KStreamImpl}} fails to reverse the {{before}} and > {{after}} values when creates the {{KStreamKStreamJoin}} for the other > stream, even though is calls {{reverseJoiner}} to reverse the joiner. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window
[ https://issues.apache.org/jira/browse/KAFKA-4153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485561#comment-15485561 ] Elias Levy commented on KAFKA-4153: --- As a side note, the API for {{JoinWindows}} has become mangled since 0.10.0.1. Now if you want to instantiate an asymetric {{JoinWindows}} you must first call {{JoinWindows.of(0)}} before you can call {{before}} or {{after}} to set to actual window boundaries. The class should provide alternative static method for both {{before}} and {{after}} to instantiate the class without having to first call {{of}}. > Incorrect KStream-KStream join behavior with asymmetric time window > --- > > Key: KAFKA-4153 > URL: https://issues.apache.org/jira/browse/KAFKA-4153 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Guozhang Wang > > Using Kafka 0.10.0.1, if joining records in two streams separated by some > time, but only when records from one stream are newer than records from the > other, i.e. doing: > {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}} > One would expect that the following would be equivalent: > {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}} > Alas, that this is not the case. Instead, this generates the same output as > the first example: > {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}} > The problem is that the > [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697] > implementation in {{KStreamImpl}} fails to reverse the {{before}} and > {{after}} values when creates the {{KStreamKStreamJoin}} for the other > stream, even though is calls {{reverseJoiner}} to reverse the joiner. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window
[ https://issues.apache.org/jira/browse/KAFKA-4153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elias Levy updated KAFKA-4153: -- Flags: Patch > Incorrect KStream-KStream join behavior with asymmetric time window > --- > > Key: KAFKA-4153 > URL: https://issues.apache.org/jira/browse/KAFKA-4153 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Guozhang Wang > > Using Kafka 0.10.0.1, if joining records in two streams separated by some > time, but only when records from one stream are newer than records from the > other, i.e. doing: > {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}} > One would expect that the following would be equivalent: > {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}} > Alas, that this is not the case. Instead, this generates the same output as > the first example: > {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}} > The problem is that the > [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697] > implementation in {{KStreamImpl}} fails to reverse the {{before}} and > {{after}} values when creates the {{KStreamKStreamJoin}} for the other > stream, even though is calls {{reverseJoiner}} to reverse the joiner. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4153) Incorrect KStream-KStream join behavior with asymmetric time window
Elias Levy created KAFKA-4153: - Summary: Incorrect KStream-KStream join behavior with asymmetric time window Key: KAFKA-4153 URL: https://issues.apache.org/jira/browse/KAFKA-4153 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.0.1 Reporter: Elias Levy Assignee: Guozhang Wang Using Kafka 0.10.0.1, if joining records in two streams separated by some time, but only when records from one stream are newer than records from the other, i.e. doing: {{stream1.join(stream2, valueJoiner, JoinWindows.of("X").after(1))}} One would expect that the following would be equivalent: {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").before(1))}} Alas, that this is not the case. Instead, this generates the same output as the first example: {{stream2.join(stream1, valueJoiner, JoinWindows.of("X").after(1))}} The problem is that the [{{DefaultJoin}}|https://github.com/apache/kafka/blob/caa9bd0fcd2fab4758791408e2b145532153910e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L692-L697] implementation in {{KStreamImpl}} fails to reverse the {{before}} and {{after}} values when creates the {{KStreamKStreamJoin}} for the other stream, even though is calls {{reverseJoiner}} to reverse the joiner. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-4146) Kafka Stream ignores Serde exceptions leading to silently broken apps
[ https://issues.apache.org/jira/browse/KAFKA-4146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elias Levy resolved KAFKA-4146. --- Resolution: Invalid > Kafka Stream ignores Serde exceptions leading to silently broken apps > - > > Key: KAFKA-4146 > URL: https://issues.apache.org/jira/browse/KAFKA-4146 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Assignee: Guozhang Wang > > It appears that Kafka Streams silently ignores Serde exceptions, leading to > app that are silently broken. > E.g. if you make use of {{Stream.throough("topic")}} and the default Serde is > inappropriate for the type, the app will silently drop the data in the floor > without even the courtesy of printing a single error message. > At the very least an initial error message should be generated, with the > option to generate messages for each such failure or a sampling of such > failure. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4146) Kafka Stream ignores Serde exceptions leading to silently broken apps
Elias Levy created KAFKA-4146: - Summary: Kafka Stream ignores Serde exceptions leading to silently broken apps Key: KAFKA-4146 URL: https://issues.apache.org/jira/browse/KAFKA-4146 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.0.1 Reporter: Elias Levy Assignee: Guozhang Wang It appears that Kafka Streams silently ignores Serde exceptions, leading to app that are silently broken. E.g. if you make use of {{Stream.throough("topic")}} and the default Serde is inappropriate for the type, the app will silently drop the data in the floor without even the courtesy of printing a single error message. At the very least an initial error message should be generated, with the option to generate messages for each such failure or a sampling of such failure. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4144) Allow per stream/table timestamp extractor
Elias Levy created KAFKA-4144: - Summary: Allow per stream/table timestamp extractor Key: KAFKA-4144 URL: https://issues.apache.org/jira/browse/KAFKA-4144 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.10.0.1 Reporter: Elias Levy Assignee: Guozhang Wang At the moment the timestamp extractor is configured via a StreamConfig value to KafkaStreams. That means you can only have a single timestamp extractor per app, even though you may be joining multiple streams/tables that require different timestamp extraction methods. You should be able to specify a timestamp extractor via KStreamBuilder.stream/table, just like you can specify key and value serdes that override the StreamConfig defaults. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3932) Consumer fails to consume in a round robin fashion
Elias Levy created KAFKA-3932: - Summary: Consumer fails to consume in a round robin fashion Key: KAFKA-3932 URL: https://issues.apache.org/jira/browse/KAFKA-3932 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.10.0.0 Reporter: Elias Levy The Java consumer fails consume messages in a round robin fashion. This can lead to an unbalance consumption. In our use case we have a set of consumer that can take a significant amount of time consuming messages off a topic. For this reason, we are using the pause/poll/resume pattern to ensure the consumer session is not timeout. The topic that is being consumed has been preloaded with message. That means there is a significant message lag when the consumer is first started. To limit how many messages are consumed at a time, the consumer has been configured with max.poll.records=1. The first initial observation is that the client receive a large batch of messages for the first partition it decides to consume from and will consume all those messages before moving on, rather than returning a message from a different partition for each call to poll. We solved this issue by configuring max.partition.fetch.bytes to be small enough that only a single message will be returned by the broker on each fetch, although this would not be feasible if message size were highly variable. The behavior of the consumer after this change is to largely consume from a small number of partitions, usually just two, iterating between them, until it exhausts them, before moving to another partition. This behavior is problematic if the messages have some rough time semantics and need to be process roughly time ordered across all partitions. It would be useful if the consumer has a pluggable API that allowed custom logic to select which partition to consume from next, thus enabling the creation of a round robin partition consumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2359) New consumer - partitions auto assigned only on poll
[ https://issues.apache.org/jira/browse/KAFKA-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346678#comment-15346678 ] Elias Levy commented on KAFKA-2359: --- I wonder if the calling pause() before calling poll() and seekToBeging() or seekToEnd() will solve your problem. The pause should stop the fetching of records, while the poll() triggers the interaction with the coordinator to perform the partition assignment before calling seekToBegining and seekToEnd. > New consumer - partitions auto assigned only on poll > > > Key: KAFKA-2359 > URL: https://issues.apache.org/jira/browse/KAFKA-2359 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Stevo Slavic >Priority: Minor > > In the new consumer I encountered unexpected behavior. After constructing > {{KafkaConsumer}} instance with configured consumer rebalance callback > handler, and subscribing to a topic with "consumer.subscribe(topic)", > retrieving subscriptions would return empty set and callback handler would > not get called (no partitions ever assigned or revoked), no matter how long > instance was up. > Then I found by inspecting {{KafkaConsumer}} code that partition assignment > will only be triggered on first {{poll}}, since {{pollOnce}} has: > {noformat} > // ensure we have partitions assigned if we expect to > if (subscriptions.partitionsAutoAssigned()) > coordinator.ensurePartitionAssignment(); > {noformat} > I'm proposing to fix this by including same {{ensurePartitionAssignment}} > fragment in {{KafkaConsumer.subscriptions}} accessor as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.
[ https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15133716#comment-15133716 ] Elias Levy edited comment on KAFKA-2729 at 2/5/16 5:55 AM: --- Had the same issue happen here while testing a 5 node Kafka cluster with a 3 node ZK ensemble on Kubernetes on AWS. After running for a while broker 2 started showing the "Cached zkVersion [29] not equal to that in zookeeper, skip updating ISR" error message for al the partitions it leads. For those partition it is the only in sync replica. That has led to the Samza jobs I was running to stop. I should note that I am running 0.9.0.0. was (Author: elevy): Had the same issue happen here while testing a 5 node Kafka cluster with a 3 node ZK ensemble on Kubernetes on AWS. After running for a while broker 2 started showing the "Cached zkVersion [29] not equal to that in zookeeper, skip updating ISR" error message for al the partitions it leads. For those partition it is the only in sync replica. That has led to the Samza jobs I was running to stop. > Cached zkVersion not equal to that in zookeeper, broker not recovering. > --- > > Key: KAFKA-2729 > URL: https://issues.apache.org/jira/browse/KAFKA-2729 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 >Reporter: Danil Serdyuchenko > > After a small network wobble where zookeeper nodes couldn't reach each other, > we started seeing a large number of undereplicated partitions. The zookeeper > cluster recovered, however we continued to see a large number of > undereplicated partitions. Two brokers in the kafka cluster were showing this > in the logs: > {code} > [2015-10-27 11:36:00,888] INFO Partition > [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for > partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 > (kafka.cluster.Partition) > [2015-10-27 11:36:00,891] INFO Partition > [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] > not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) > {code} > For all of the topics on the effected brokers. Both brokers only recovered > after a restart. Our own investigation yielded nothing, I was hoping you > could shed some light on this issue. Possibly if it's related to: > https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using > 0.8.2.1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.
[ https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15133716#comment-15133716 ] Elias Levy commented on KAFKA-2729: --- Had the same issue happen here while testing a 5 node Kafka cluster with a 3 node ZK ensemble on Kubernetes on AWS. After running for a while broker 2 started showing the "Cached zkVersion [29] not equal to that in zookeeper, skip updating ISR" error message for al the partitions it leads. For those partition it is the only in sync replica. That has led to the Samza jobs I was running to stop. > Cached zkVersion not equal to that in zookeeper, broker not recovering. > --- > > Key: KAFKA-2729 > URL: https://issues.apache.org/jira/browse/KAFKA-2729 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 >Reporter: Danil Serdyuchenko > > After a small network wobble where zookeeper nodes couldn't reach each other, > we started seeing a large number of undereplicated partitions. The zookeeper > cluster recovered, however we continued to see a large number of > undereplicated partitions. Two brokers in the kafka cluster were showing this > in the logs: > {code} > [2015-10-27 11:36:00,888] INFO Partition > [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for > partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 > (kafka.cluster.Partition) > [2015-10-27 11:36:00,891] INFO Partition > [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] > not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) > {code} > For all of the topics on the effected brokers. Both brokers only recovered > after a restart. Our own investigation yielded nothing, I was hoping you > could shed some light on this issue. Possibly if it's related to: > https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using > 0.8.2.1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)