Re: kafka streams 0.10.2 failing while restoring offsets

2017-03-27 Thread Sachin Mittal
Well lets say previous thread was processing a large block and while
processing it got bumped out.
So new thread which got this partition may have different offset when it
starts the restore and by the time it completes the restore.

If that is the case then it should just ignore that task for that cycle and
move on to next task.
When it returns again it can again try to restore the state and if by that
time there is no other thread processing the partition it can start
processing.

I see no reason to raise the exception and kill the thread entirely.

Thanks
Sachin


On Mon, Mar 27, 2017 at 3:56 PM, Damian Guy  wrote:

> Yes, but we don't know why it is still processing the data. We don't want
> to have multiple processes acting on the same tasks, hence the exception.
> What if for some reason the other task is processing a large backlog, how
> long do we wait before we give up?
>
> I think in this case the exception is the right thing to do
>
> On Mon, 27 Mar 2017 at 09:24 Sachin Mittal  wrote:
>
> Hi,
> These are the logs
> https://www.dropbox.com/s/2t4ysfdqbtmcusq/complete_84_85_87_log.zip?dl=0
>
> I think this may not always be the case especially if previous owner is on
> a different machine.
>
> Say it is processing and it takes more than the poll timeout to process or
> commit the offset.
>
> The group bumps this thread and assigns its task to a different thread on
> maybe a different machine.
>
> All this while this client may be pushing the changelog data and other
> thread restoring the state from the same partition.
>
> So between the time it starts and it seeks to the end of the changelog it
> may be possible that previous thread which was still in process since it
> did not know it got bumped out added some more data to that changelog.
>
> Only when previous thread tries to commit the offset it gets to know that
> it is no longer the owner of the partition and then issues a rejoin
> request.
>
> I think in such a case should be handled within streams application.
>
> Thanks
> Sachin
>
>
>
>
>
> On Mon, Mar 27, 2017 at 1:25 PM, Damian Guy  wrote:
>
> > Hi Sachin,
> >
> > This should not happen. The previous owner of the task should have
> stopped
> > processing before the restoration begins. So if this is happening, then
> > that signals a bug. Do you have more logs?
> >
> > Thanks,
> > Damian
> >
> > On Sat, 25 Mar 2017 at 08:20 Sachin Mittal  wrote:
> >
> > > Hi,
> > > So recently we fixed the deadlock issue and also built the streams jar
> by
> > > copying the rocks db configs from trunk.
> > > So we don't get any deadlock issue now and also we see that the wait
> time
> > > of CPU cores stays around 5% (down from 50% earlier).
> > >
> > > However we now get a new exception which is not handled by streams
> > > application and causes the instance to shutdown.
> > >
> > > org.apache.kafka.streams.errors.StreamsException: stream-thread
> > > [StreamThread-2] Failed to rebalance
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:622)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:378)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > Caused by: java.lang.IllegalStateException: task [0_9] Log end offset
> of
> > > new-part-advice-key-table-changelog-9 should not change while
> restoring:
> > > old end offset 647352, current offset 647632
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > restoreActiveState(ProcessorStateManager.java:240)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > register(ProcessorStateManager.java:193)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
> > register(AbstractProcessorContext.java:99)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.
> > init(RocksDBSegmentedBytesStore.java:101)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.
> ChangeLoggingSegmentedBytesSto
> > re.init(ChangeLoggingSegmentedBytesStore.java:68)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.
> > init(MeteredSegmentedBytesStore.java:66)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> > RocksDBWindowStore.java:76)
> > > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.AbstractTask.
> > 

Re: kafka streams 0.10.2 failing while restoring offsets

2017-03-27 Thread Damian Guy
Yes, but we don't know why it is still processing the data. We don't want
to have multiple processes acting on the same tasks, hence the exception.
What if for some reason the other task is processing a large backlog, how
long do we wait before we give up?

I think in this case the exception is the right thing to do

On Mon, 27 Mar 2017 at 09:24 Sachin Mittal  wrote:

Hi,
These are the logs
https://www.dropbox.com/s/2t4ysfdqbtmcusq/complete_84_85_87_log.zip?dl=0

I think this may not always be the case especially if previous owner is on
a different machine.

Say it is processing and it takes more than the poll timeout to process or
commit the offset.

The group bumps this thread and assigns its task to a different thread on
maybe a different machine.

All this while this client may be pushing the changelog data and other
thread restoring the state from the same partition.

So between the time it starts and it seeks to the end of the changelog it
may be possible that previous thread which was still in process since it
did not know it got bumped out added some more data to that changelog.

Only when previous thread tries to commit the offset it gets to know that
it is no longer the owner of the partition and then issues a rejoin request.

I think in such a case should be handled within streams application.

Thanks
Sachin





On Mon, Mar 27, 2017 at 1:25 PM, Damian Guy  wrote:

> Hi Sachin,
>
> This should not happen. The previous owner of the task should have stopped
> processing before the restoration begins. So if this is happening, then
> that signals a bug. Do you have more logs?
>
> Thanks,
> Damian
>
> On Sat, 25 Mar 2017 at 08:20 Sachin Mittal  wrote:
>
> > Hi,
> > So recently we fixed the deadlock issue and also built the streams jar
by
> > copying the rocks db configs from trunk.
> > So we don't get any deadlock issue now and also we see that the wait
time
> > of CPU cores stays around 5% (down from 50% earlier).
> >
> > However we now get a new exception which is not handled by streams
> > application and causes the instance to shutdown.
> >
> > org.apache.kafka.streams.errors.StreamsException: stream-thread
> > [StreamThread-2] Failed to rebalance
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:622)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:378)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > Caused by: java.lang.IllegalStateException: task [0_9] Log end offset of
> > new-part-advice-key-table-changelog-9 should not change while restoring:
> > old end offset 647352, current offset 647632
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> restoreActiveState(ProcessorStateManager.java:240)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> register(ProcessorStateManager.java:193)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
> register(AbstractProcessorContext.java:99)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.
> init(RocksDBSegmentedBytesStore.java:101)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesSto
> re.init(ChangeLoggingSegmentedBytesStore.java:68)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.
> init(MeteredSegmentedBytesStore.java:66)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> RocksDBWindowStore.java:76)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.AbstractTask.
> initializeStateStores(AbstractTask.java:86)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.(StreamTask.java:141)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >
> > What I check from logs is this
> > DEBUG 2017-03-25 02:07:24,499 [StreamThread-2]:
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [StreamThread-2] creating new task 0_9
> > So it creates the task at this time.
> >
> > To create the local state store from the chnagelog topic it starts at
> >
> > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> > org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to
> > partition(s): new-part-advice-key-table-changelog-9
> > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> > org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to end of
> > partition 

Re: kafka streams 0.10.2 failing while restoring offsets

2017-03-27 Thread Sachin Mittal
Hi,
These are the logs
https://www.dropbox.com/s/2t4ysfdqbtmcusq/complete_84_85_87_log.zip?dl=0

I think this may not always be the case especially if previous owner is on
a different machine.

Say it is processing and it takes more than the poll timeout to process or
commit the offset.

The group bumps this thread and assigns its task to a different thread on
maybe a different machine.

All this while this client may be pushing the changelog data and other
thread restoring the state from the same partition.

So between the time it starts and it seeks to the end of the changelog it
may be possible that previous thread which was still in process since it
did not know it got bumped out added some more data to that changelog.

Only when previous thread tries to commit the offset it gets to know that
it is no longer the owner of the partition and then issues a rejoin request.

I think in such a case should be handled within streams application.

Thanks
Sachin





On Mon, Mar 27, 2017 at 1:25 PM, Damian Guy  wrote:

> Hi Sachin,
>
> This should not happen. The previous owner of the task should have stopped
> processing before the restoration begins. So if this is happening, then
> that signals a bug. Do you have more logs?
>
> Thanks,
> Damian
>
> On Sat, 25 Mar 2017 at 08:20 Sachin Mittal  wrote:
>
> > Hi,
> > So recently we fixed the deadlock issue and also built the streams jar by
> > copying the rocks db configs from trunk.
> > So we don't get any deadlock issue now and also we see that the wait time
> > of CPU cores stays around 5% (down from 50% earlier).
> >
> > However we now get a new exception which is not handled by streams
> > application and causes the instance to shutdown.
> >
> > org.apache.kafka.streams.errors.StreamsException: stream-thread
> > [StreamThread-2] Failed to rebalance
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:622)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:378)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > Caused by: java.lang.IllegalStateException: task [0_9] Log end offset of
> > new-part-advice-key-table-changelog-9 should not change while restoring:
> > old end offset 647352, current offset 647632
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> restoreActiveState(ProcessorStateManager.java:240)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> register(ProcessorStateManager.java:193)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
> register(AbstractProcessorContext.java:99)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.
> init(RocksDBSegmentedBytesStore.java:101)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesSto
> re.init(ChangeLoggingSegmentedBytesStore.java:68)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.
> init(MeteredSegmentedBytesStore.java:66)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> RocksDBWindowStore.java:76)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.AbstractTask.
> initializeStateStores(AbstractTask.java:86)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.(StreamTask.java:141)
> > ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >
> > What I check from logs is this
> > DEBUG 2017-03-25 02:07:24,499 [StreamThread-2]:
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [StreamThread-2] creating new task 0_9
> > So it creates the task at this time.
> >
> > To create the local state store from the chnagelog topic it starts at
> >
> > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> > org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to
> > partition(s): new-part-advice-key-table-changelog-9
> > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> > org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to end of
> > partition new-part-advice-key-table-changelog-9
> > DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> > org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset
> for
> > partition new-part-advice-key-table-changelog-9 to latest offset.
> > DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]:
> > org.apache.kafka.clients.consumer.internals.Fetcher - Handling
> > ListOffsetResponse response for 

Re: kafka streams 0.10.2 failing while restoring offsets

2017-03-27 Thread Damian Guy
Hi Sachin,

This should not happen. The previous owner of the task should have stopped
processing before the restoration begins. So if this is happening, then
that signals a bug. Do you have more logs?

Thanks,
Damian

On Sat, 25 Mar 2017 at 08:20 Sachin Mittal  wrote:

> Hi,
> So recently we fixed the deadlock issue and also built the streams jar by
> copying the rocks db configs from trunk.
> So we don't get any deadlock issue now and also we see that the wait time
> of CPU cores stays around 5% (down from 50% earlier).
>
> However we now get a new exception which is not handled by streams
> application and causes the instance to shutdown.
>
> org.apache.kafka.streams.errors.StreamsException: stream-thread
> [StreamThread-2] Failed to rebalance
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:622)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:378)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> Caused by: java.lang.IllegalStateException: task [0_9] Log end offset of
> new-part-advice-key-table-changelog-9 should not change while restoring:
> old end offset 647352, current offset 647632
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:240)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
>
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
>
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.init(RocksDBSegmentedBytesStore.java:101)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
>
> org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.init(ChangeLoggingSegmentedBytesStore.java:68)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
>
> org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.init(MeteredSegmentedBytesStore.java:66)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
>
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:76)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>
> What I check from logs is this
> DEBUG 2017-03-25 02:07:24,499 [StreamThread-2]:
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [StreamThread-2] creating new task 0_9
> So it creates the task at this time.
>
> To create the local state store from the chnagelog topic it starts at
>
> DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to
> partition(s): new-part-advice-key-table-changelog-9
> DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to end of
> partition new-part-advice-key-table-changelog-9
> DEBUG 2017-03-25 02:07:24,550 [StreamThread-2]:
> org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for
> partition new-part-advice-key-table-changelog-9 to latest offset.
> DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]:
> org.apache.kafka.clients.consumer.internals.Fetcher - Handling
> ListOffsetResponse response for new-part-advice-key-table-changelog-9.
> Fetched offset 647352, timestamp -1
> DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]:
> org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to beginning of
> partition new-part-advice-key-table-changelog-9
> DEBUG 2017-03-25 02:07:24,552 [StreamThread-2]:
> org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for
> partition new-part-advice-key-table-changelog-9 to earliest offset.
>
> and process is over at
> DEBUG 2017-03-25 02:10:21,225 [StreamThread-2]:
> org.apache.kafka.clients.consumer.internals.Fetcher - Sending fetch for
> partitions [new-part-advice-key-table-changelog-9] to broker
> 192.168.73.199:9092 (id: 5 rack: null)
> DEBUG 2017-03-25 02:10:21,230 [StreamThread-2]:
> org.apache.kafka.clients.consumer.KafkaConsumer - Unsubscribed all topics
> or patterns and assigned partitions
>
> And the exception is thrown at:
> ERROR 2017-03-25 02:10:21,232 [StreamThread-2]:
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - User
> provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> new-part-advice failed on partition assignment
> java.lang.IllegalStateException: task [0_9] Log end offset of
>