Re: kafka streams 0.10.2 failing while restoring offsets
Well lets say previous thread was processing a large block and while processing it got bumped out. So new thread which got this partition may have different offset when it starts the restore and by the time it completes the restore. If that is the case then it should just ignore that task for that cycle and move on to next task. When it returns again it can again try to restore the state and if by that time there is no other thread processing the partition it can start processing. I see no reason to raise the exception and kill the thread entirely. Thanks Sachin On Mon, Mar 27, 2017 at 3:56 PM, Damian Guywrote: > Yes, but we don't know why it is still processing the data. We don't want > to have multiple processes acting on the same tasks, hence the exception. > What if for some reason the other task is processing a large backlog, how > long do we wait before we give up? > > I think in this case the exception is the right thing to do > > On Mon, 27 Mar 2017 at 09:24 Sachin Mittal wrote: > > Hi, > These are the logs > https://www.dropbox.com/s/2t4ysfdqbtmcusq/complete_84_85_87_log.zip?dl=0 > > I think this may not always be the case especially if previous owner is on > a different machine. > > Say it is processing and it takes more than the poll timeout to process or > commit the offset. > > The group bumps this thread and assigns its task to a different thread on > maybe a different machine. > > All this while this client may be pushing the changelog data and other > thread restoring the state from the same partition. > > So between the time it starts and it seeks to the end of the changelog it > may be possible that previous thread which was still in process since it > did not know it got bumped out added some more data to that changelog. > > Only when previous thread tries to commit the offset it gets to know that > it is no longer the owner of the partition and then issues a rejoin > request. > > I think in such a case should be handled within streams application. > > Thanks > Sachin > > > > > > On Mon, Mar 27, 2017 at 1:25 PM, Damian Guy wrote: > > > Hi Sachin, > > > > This should not happen. The previous owner of the task should have > stopped > > processing before the restoration begins. So if this is happening, then > > that signals a bug. Do you have more logs? > > > > Thanks, > > Damian > > > > On Sat, 25 Mar 2017 at 08:20 Sachin Mittal wrote: > > > > > Hi, > > > So recently we fixed the deadlock issue and also built the streams jar > by > > > copying the rocks db configs from trunk. > > > So we don't get any deadlock issue now and also we see that the wait > time > > > of CPU cores stays around 5% (down from 50% earlier). > > > > > > However we now get a new exception which is not handled by streams > > > application and causes the instance to shutdown. > > > > > > org.apache.kafka.streams.errors.StreamsException: stream-thread > > > [StreamThread-2] Failed to rebalance > > > at > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > StreamThread.java:622) > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > StreamThread.run(StreamThread.java:378) > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > > Caused by: java.lang.IllegalStateException: task [0_9] Log end offset > of > > > new-part-advice-key-table-changelog-9 should not change while > restoring: > > > old end offset 647352, current offset 647632 > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager. > > restoreActiveState(ProcessorStateManager.java:240) > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager. > > register(ProcessorStateManager.java:193) > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > > at > > > > > > org.apache.kafka.streams.processor.internals.AbstractProcessorContext. > > register(AbstractProcessorContext.java:99) > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > > at > > > > > > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore. > > init(RocksDBSegmentedBytesStore.java:101) > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > > at > > > > > > org.apache.kafka.streams.state.internals. > ChangeLoggingSegmentedBytesSto > > re.init(ChangeLoggingSegmentedBytesStore.java:68) > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > > at > > > > > > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore. > > init(MeteredSegmentedBytesStore.java:66) > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > > at > > > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init( > > RocksDBWindowStore.java:76) > > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > > at > > > > > > org.apache.kafka.streams.processor.internals.AbstractTask. > >
Re: kafka streams 0.10.2 failing while restoring offsets
Yes, but we don't know why it is still processing the data. We don't want to have multiple processes acting on the same tasks, hence the exception. What if for some reason the other task is processing a large backlog, how long do we wait before we give up? I think in this case the exception is the right thing to do On Mon, 27 Mar 2017 at 09:24 Sachin Mittalwrote: Hi, These are the logs https://www.dropbox.com/s/2t4ysfdqbtmcusq/complete_84_85_87_log.zip?dl=0 I think this may not always be the case especially if previous owner is on a different machine. Say it is processing and it takes more than the poll timeout to process or commit the offset. The group bumps this thread and assigns its task to a different thread on maybe a different machine. All this while this client may be pushing the changelog data and other thread restoring the state from the same partition. So between the time it starts and it seeks to the end of the changelog it may be possible that previous thread which was still in process since it did not know it got bumped out added some more data to that changelog. Only when previous thread tries to commit the offset it gets to know that it is no longer the owner of the partition and then issues a rejoin request. I think in such a case should be handled within streams application. Thanks Sachin On Mon, Mar 27, 2017 at 1:25 PM, Damian Guy wrote: > Hi Sachin, > > This should not happen. The previous owner of the task should have stopped > processing before the restoration begins. So if this is happening, then > that signals a bug. Do you have more logs? > > Thanks, > Damian > > On Sat, 25 Mar 2017 at 08:20 Sachin Mittal wrote: > > > Hi, > > So recently we fixed the deadlock issue and also built the streams jar by > > copying the rocks db configs from trunk. > > So we don't get any deadlock issue now and also we see that the wait time > > of CPU cores stays around 5% (down from 50% earlier). > > > > However we now get a new exception which is not handled by streams > > application and causes the instance to shutdown. > > > > org.apache.kafka.streams.errors.StreamsException: stream-thread > > [StreamThread-2] Failed to rebalance > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:622) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:378) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > Caused by: java.lang.IllegalStateException: task [0_9] Log end offset of > > new-part-advice-key-table-changelog-9 should not change while restoring: > > old end offset 647352, current offset 647632 > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager. > restoreActiveState(ProcessorStateManager.java:240) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager. > register(ProcessorStateManager.java:193) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals.AbstractProcessorContext. > register(AbstractProcessorContext.java:99) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > at > > > > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore. > init(RocksDBSegmentedBytesStore.java:101) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > at > > > > org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesSto > re.init(ChangeLoggingSegmentedBytesStore.java:68) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > at > > > > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore. > init(MeteredSegmentedBytesStore.java:66) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > at > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init( > RocksDBWindowStore.java:76) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals.AbstractTask. > initializeStateStores(AbstractTask.java:86) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals. > StreamTask.(StreamTask.java:141) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > > > What I check from logs is this > > DEBUG 2017-03-25 02:07:24,499 [StreamThread-2]: > > org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > > [StreamThread-2] creating new task 0_9 > > So it creates the task at this time. > > > > To create the local state store from the chnagelog topic it starts at > > > > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]: > > org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to > > partition(s): new-part-advice-key-table-changelog-9 > > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]: > > org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to end of > > partition
Re: kafka streams 0.10.2 failing while restoring offsets
Hi, These are the logs https://www.dropbox.com/s/2t4ysfdqbtmcusq/complete_84_85_87_log.zip?dl=0 I think this may not always be the case especially if previous owner is on a different machine. Say it is processing and it takes more than the poll timeout to process or commit the offset. The group bumps this thread and assigns its task to a different thread on maybe a different machine. All this while this client may be pushing the changelog data and other thread restoring the state from the same partition. So between the time it starts and it seeks to the end of the changelog it may be possible that previous thread which was still in process since it did not know it got bumped out added some more data to that changelog. Only when previous thread tries to commit the offset it gets to know that it is no longer the owner of the partition and then issues a rejoin request. I think in such a case should be handled within streams application. Thanks Sachin On Mon, Mar 27, 2017 at 1:25 PM, Damian Guywrote: > Hi Sachin, > > This should not happen. The previous owner of the task should have stopped > processing before the restoration begins. So if this is happening, then > that signals a bug. Do you have more logs? > > Thanks, > Damian > > On Sat, 25 Mar 2017 at 08:20 Sachin Mittal wrote: > > > Hi, > > So recently we fixed the deadlock issue and also built the streams jar by > > copying the rocks db configs from trunk. > > So we don't get any deadlock issue now and also we see that the wait time > > of CPU cores stays around 5% (down from 50% earlier). > > > > However we now get a new exception which is not handled by streams > > application and causes the instance to shutdown. > > > > org.apache.kafka.streams.errors.StreamsException: stream-thread > > [StreamThread-2] Failed to rebalance > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:622) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:378) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > Caused by: java.lang.IllegalStateException: task [0_9] Log end offset of > > new-part-advice-key-table-changelog-9 should not change while restoring: > > old end offset 647352, current offset 647632 > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager. > restoreActiveState(ProcessorStateManager.java:240) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager. > register(ProcessorStateManager.java:193) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals.AbstractProcessorContext. > register(AbstractProcessorContext.java:99) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > at > > > > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore. > init(RocksDBSegmentedBytesStore.java:101) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > at > > > > org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesSto > re.init(ChangeLoggingSegmentedBytesStore.java:68) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > at > > > > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore. > init(MeteredSegmentedBytesStore.java:66) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > at > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init( > RocksDBWindowStore.java:76) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals.AbstractTask. > initializeStateStores(AbstractTask.java:86) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > at > > > > org.apache.kafka.streams.processor.internals. > StreamTask.(StreamTask.java:141) > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > > > What I check from logs is this > > DEBUG 2017-03-25 02:07:24,499 [StreamThread-2]: > > org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > > [StreamThread-2] creating new task 0_9 > > So it creates the task at this time. > > > > To create the local state store from the chnagelog topic it starts at > > > > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]: > > org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to > > partition(s): new-part-advice-key-table-changelog-9 > > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]: > > org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to end of > > partition new-part-advice-key-table-changelog-9 > > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]: > > org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset > for > > partition new-part-advice-key-table-changelog-9 to latest offset. > > DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]: > > org.apache.kafka.clients.consumer.internals.Fetcher - Handling > > ListOffsetResponse response for
Re: kafka streams 0.10.2 failing while restoring offsets
Hi Sachin, This should not happen. The previous owner of the task should have stopped processing before the restoration begins. So if this is happening, then that signals a bug. Do you have more logs? Thanks, Damian On Sat, 25 Mar 2017 at 08:20 Sachin Mittalwrote: > Hi, > So recently we fixed the deadlock issue and also built the streams jar by > copying the rocks db configs from trunk. > So we don't get any deadlock issue now and also we see that the wait time > of CPU cores stays around 5% (down from 50% earlier). > > However we now get a new exception which is not handled by streams > application and causes the instance to shutdown. > > org.apache.kafka.streams.errors.StreamsException: stream-thread > [StreamThread-2] Failed to rebalance > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:622) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:378) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > Caused by: java.lang.IllegalStateException: task [0_9] Log end offset of > new-part-advice-key-table-changelog-9 should not change while restoring: > old end offset 647352, current offset 647632 > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:240) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.init(RocksDBSegmentedBytesStore.java:101) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > > org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.init(ChangeLoggingSegmentedBytesStore.java:68) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.init(MeteredSegmentedBytesStore.java:66) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:76) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > at > > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141) > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > > What I check from logs is this > DEBUG 2017-03-25 02:07:24,499 [StreamThread-2]: > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-2] creating new task 0_9 > So it creates the task at this time. > > To create the local state store from the chnagelog topic it starts at > > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]: > org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to > partition(s): new-part-advice-key-table-changelog-9 > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]: > org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to end of > partition new-part-advice-key-table-changelog-9 > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]: > org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for > partition new-part-advice-key-table-changelog-9 to latest offset. > DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]: > org.apache.kafka.clients.consumer.internals.Fetcher - Handling > ListOffsetResponse response for new-part-advice-key-table-changelog-9. > Fetched offset 647352, timestamp -1 > DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]: > org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to beginning of > partition new-part-advice-key-table-changelog-9 > DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]: > org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for > partition new-part-advice-key-table-changelog-9 to earliest offset. > > and process is over at > DEBUG 2017-03-25 02:10:21,225 [StreamThread-2]: > org.apache.kafka.clients.consumer.internals.Fetcher - Sending fetch for > partitions [new-part-advice-key-table-changelog-9] to broker > 192.168.73.199:9092 (id: 5 rack: null) > DEBUG 2017-03-25 02:10:21,230 [StreamThread-2]: > org.apache.kafka.clients.consumer.KafkaConsumer - Unsubscribed all topics > or patterns and assigned partitions > > And the exception is thrown at: > ERROR 2017-03-25 02:10:21,232 [StreamThread-2]: > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - User > provided listener > org.apache.kafka.streams.processor.internals.StreamThread$1 for group > new-part-advice failed on partition assignment > java.lang.IllegalStateException: task [0_9] Log end offset of >