Re: Getting CommitFailedException in 0.10.2.0 due to member id is not valid or unknown

2017-02-10 Thread Sachin Mittal
Hi,
Understood.
Just need to figure out the cause of these frequent re-balances. Somehow it
seems to be pointing to rocksdb, but need to debug more.

The pressing issue now is, to not kill the thread if there are commit
failed exception on partition revoked (we anyway catch this at consumer
coordinator level).

We now know that there will be commit failures and partitions reassigned
(it cannot be avoided). However this should not causes stream thread to die.

Based on logs I see no reason why stream thread should die in this case.

Please let us know your thoughts in this case.

Thanks
Sachin



On Fri, Feb 10, 2017 at 6:01 PM, Damian Guy  wrote:

> On 10 February 2017 at 11:18, Sachin Mittal  wrote:
>
> > The heartbeat exception while rebalancing is OK. However I had some
> > different scenario which I wanted to understand.
> >
> > Please check line 42428 of https://dl.dropboxusercontent.com/u/46450177/
> > TestKafkaAdvice.StreamThread-1.log
>
>
> If you look at line 42006 you will see that the group is rebalancing.
> Attempt to heartbeat failed for group new-part-advice since it is
> rebalancing.
>
> This is the likely cause of the message at line 42428 because that member
> has been kicked out of the group.
>
> >
> >
> > Attempt to heartbeat failed for group new-part-advice since member id is
> > not valid.
> >
> > Why do we get this exception?
> >
> > Also on line 87485 of https://dl.dropboxusercontent.com/u/46450177/
> > TestKafkaAdvice.StreamThread-4.log
> > We get
> > Offset commit for group new-part-advice failed: The coordinator is not
> > aware of this member.
> >
> > Both the errors look same, and seems to be failing due to member id
> > unknown.
> >
> > What could be the reason for the same?
> >
>
> Again if you look a bit before that at line 87248, you'll see that the
> group has rebalanced
>
>
> >
> > Regarding
> > w.r.t to continuing after a CommitFailedException during
> > onPartitionsRevoked - will have to think through the scenarios.
> >
> > Please look at the log: https://dl.dropboxusercontent.com/u/46450177/
> > TestKafkaAdvice.StreamThread-4.log
> > Line 88714
> > stream-thread [StreamThread-1] Updating suspended tasks to contain active
> > tasks [[0_17, 0_19, 0_24, 0_31]]
> > stream-thread [StreamThread-1] Removing all active tasks [[0_17, 0_19,
> > 0_24, 0_31]]
> > stream-thread [StreamThread-1] Removing all standby tasks [[]]
> > User provided listener
> > org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> > new-part-advice failed on partition revocation
> > This gets handled and logged and stream thread resumes with new
> partitions
> >
> > Check line 90005
> > stream-thread [StreamThread-1] New partitions [[advice-stream-12,
> > advice-stream-15, advice-stream-1, advice-stream-21]] assigned at the end
> > of consumer rebalance.
> >
> > Now at this stage stream thread should continue.
> > However since we assign
> > rebalanceException = t;
> > in https://github.com/apache/kafka/blob/0.10.2/streams/src/
> > main/java/org/apache/kafka/streams/processor/internals/
> > StreamThread.java#L261
> >
> > It shuts down when it again tries to execute runLoop
> >
> > Please check line 91074
> > stream-thread [StreamThread-1] Shutting down
> >
> > Please let me know if this case if valid and we should not shut down the
> > thread itself.
> >
> > I know this log is a mess and we have made due corrections.
> >
> > We ran the streams application again now with 12 threads (3 machines) and
> > 12 partitions. It ran well for few hours, but then 2 of the machines and
> 2
> > threads of the third machine died.
> > This was much better result than 40 partitions and same 12 threads
> scenario
> > where application used to go in perpetual rebalance state within couple
> of
> > hours.
> > We are in process of analysing those logs and would present out findings.
> >
> > Thanks
> > Sachin
> >
> >
> > On Fri, Feb 10, 2017 at 3:42 PM, Damian Guy 
> wrote:
> >
> > > Hi Sachin,
> > >
> > > The CommitFailedException are thrown because the group is rebalancing.
> > You
> > > can see log messages like below happening before the commit failed
> > > exception:
> > >
> > > Attempt to heartbeat failed for group new-part-advice since it is
> > > rebalancing.
> > >
> > > It isn't clear from the logs why the rebalancing is happening. The
> broker
> > > logs would have been helpful. Also, timestamps on the kafka streams
> logs
> > > would also be useful.
> > >
> > > w.r.t to continuing after a CommitFailedException during
> > > onPartitionsRevoked - will have to think through the scenarios.
> > >
> > > On Fri, 10 Feb 2017 at 04:35 Sachin Mittal  wrote:
> > >
> > > > Hi,
> > > > I could manage the streams client log, the server logs were deleted
> > since
> > > > time had elapsed and it got rolled over.
> > > > See if you can figure out something from these. These are not best of
> > > logs
> > > > generated.
> > > >
> 

Re: Getting CommitFailedException in 0.10.2.0 due to member id is not valid or unknown

2017-02-10 Thread Damian Guy
On 10 February 2017 at 11:18, Sachin Mittal  wrote:

> The heartbeat exception while rebalancing is OK. However I had some
> different scenario which I wanted to understand.
>
> Please check line 42428 of https://dl.dropboxusercontent.com/u/46450177/
> TestKafkaAdvice.StreamThread-1.log


If you look at line 42006 you will see that the group is rebalancing.
Attempt to heartbeat failed for group new-part-advice since it is
rebalancing.

This is the likely cause of the message at line 42428 because that member
has been kicked out of the group.

>
>
> Attempt to heartbeat failed for group new-part-advice since member id is
> not valid.
>
> Why do we get this exception?
>
> Also on line 87485 of https://dl.dropboxusercontent.com/u/46450177/
> TestKafkaAdvice.StreamThread-4.log
> We get
> Offset commit for group new-part-advice failed: The coordinator is not
> aware of this member.
>
> Both the errors look same, and seems to be failing due to member id
> unknown.
>
> What could be the reason for the same?
>

Again if you look a bit before that at line 87248, you'll see that the
group has rebalanced


>
> Regarding
> w.r.t to continuing after a CommitFailedException during
> onPartitionsRevoked - will have to think through the scenarios.
>
> Please look at the log: https://dl.dropboxusercontent.com/u/46450177/
> TestKafkaAdvice.StreamThread-4.log
> Line 88714
> stream-thread [StreamThread-1] Updating suspended tasks to contain active
> tasks [[0_17, 0_19, 0_24, 0_31]]
> stream-thread [StreamThread-1] Removing all active tasks [[0_17, 0_19,
> 0_24, 0_31]]
> stream-thread [StreamThread-1] Removing all standby tasks [[]]
> User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> new-part-advice failed on partition revocation
> This gets handled and logged and stream thread resumes with new partitions
>
> Check line 90005
> stream-thread [StreamThread-1] New partitions [[advice-stream-12,
> advice-stream-15, advice-stream-1, advice-stream-21]] assigned at the end
> of consumer rebalance.
>
> Now at this stage stream thread should continue.
> However since we assign
> rebalanceException = t;
> in https://github.com/apache/kafka/blob/0.10.2/streams/src/
> main/java/org/apache/kafka/streams/processor/internals/
> StreamThread.java#L261
>
> It shuts down when it again tries to execute runLoop
>
> Please check line 91074
> stream-thread [StreamThread-1] Shutting down
>
> Please let me know if this case if valid and we should not shut down the
> thread itself.
>
> I know this log is a mess and we have made due corrections.
>
> We ran the streams application again now with 12 threads (3 machines) and
> 12 partitions. It ran well for few hours, but then 2 of the machines and 2
> threads of the third machine died.
> This was much better result than 40 partitions and same 12 threads scenario
> where application used to go in perpetual rebalance state within couple of
> hours.
> We are in process of analysing those logs and would present out findings.
>
> Thanks
> Sachin
>
>
> On Fri, Feb 10, 2017 at 3:42 PM, Damian Guy  wrote:
>
> > Hi Sachin,
> >
> > The CommitFailedException are thrown because the group is rebalancing.
> You
> > can see log messages like below happening before the commit failed
> > exception:
> >
> > Attempt to heartbeat failed for group new-part-advice since it is
> > rebalancing.
> >
> > It isn't clear from the logs why the rebalancing is happening. The broker
> > logs would have been helpful. Also, timestamps on the kafka streams logs
> > would also be useful.
> >
> > w.r.t to continuing after a CommitFailedException during
> > onPartitionsRevoked - will have to think through the scenarios.
> >
> > On Fri, 10 Feb 2017 at 04:35 Sachin Mittal  wrote:
> >
> > > Hi,
> > > I could manage the streams client log, the server logs were deleted
> since
> > > time had elapsed and it got rolled over.
> > > See if you can figure out something from these. These are not best of
> > logs
> > > generated.
> > >
> > >
> > > https://dl.dropboxusercontent.com/u/46450177/
> > TestKafkaAdvice.StreamThread-1.log
> > > The above log pay attention to StreamThread-1 which fails due to
> > > CommitFailedException
> > > This mostly seem to be caused by OffsetCommitResponseHandler due to
> > unknown
> > > member id.
> > >
> > >
> > > https://dl.dropboxusercontent.com/u/46450177/
> > TestKafkaAdvice.StreamThread-4.log
> > > The above log pay attention to StreamThread-4 which fails due to
> > > CommitFailedException during sendOffsetCommitRequest.
> > > This mostly seem to be caused by Attempt to heartbeat failed for group
> > > new-part-advice since member id is not valid.
> > >
> > > Also please let us know why we shutdown the thread if we get
> > > CommitFailedException, ideally we catch this exception in
> > > onPartitionsRevokedand
> > > then thread should continue processing the new partitions assigned.
> > >
> > > Please let 

Re: Getting CommitFailedException in 0.10.2.0 due to member id is not valid or unknown

2017-02-10 Thread Sachin Mittal
The heartbeat exception while rebalancing is OK. However I had some
different scenario which I wanted to understand.

Please check line 42428 of https://dl.dropboxusercontent.com/u/46450177/
TestKafkaAdvice.StreamThread-1.log

Attempt to heartbeat failed for group new-part-advice since member id is
not valid.

Why do we get this exception?

Also on line 87485 of https://dl.dropboxusercontent.com/u/46450177/
TestKafkaAdvice.StreamThread-4.log
We get
Offset commit for group new-part-advice failed: The coordinator is not
aware of this member.

Both the errors look same, and seems to be failing due to member id
unknown.

What could be the reason for the same?

Regarding
w.r.t to continuing after a CommitFailedException during
onPartitionsRevoked - will have to think through the scenarios.

Please look at the log: https://dl.dropboxusercontent.com/u/46450177/
TestKafkaAdvice.StreamThread-4.log
Line 88714
stream-thread [StreamThread-1] Updating suspended tasks to contain active
tasks [[0_17, 0_19, 0_24, 0_31]]
stream-thread [StreamThread-1] Removing all active tasks [[0_17, 0_19,
0_24, 0_31]]
stream-thread [StreamThread-1] Removing all standby tasks [[]]
User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$1 for group
new-part-advice failed on partition revocation
This gets handled and logged and stream thread resumes with new partitions

Check line 90005
stream-thread [StreamThread-1] New partitions [[advice-stream-12,
advice-stream-15, advice-stream-1, advice-stream-21]] assigned at the end
of consumer rebalance.

Now at this stage stream thread should continue.
However since we assign
rebalanceException = t;
in https://github.com/apache/kafka/blob/0.10.2/streams/src/
main/java/org/apache/kafka/streams/processor/internals/
StreamThread.java#L261

It shuts down when it again tries to execute runLoop

Please check line 91074
stream-thread [StreamThread-1] Shutting down

Please let me know if this case if valid and we should not shut down the
thread itself.

I know this log is a mess and we have made due corrections.

We ran the streams application again now with 12 threads (3 machines) and
12 partitions. It ran well for few hours, but then 2 of the machines and 2
threads of the third machine died.
This was much better result than 40 partitions and same 12 threads scenario
where application used to go in perpetual rebalance state within couple of
hours.
We are in process of analysing those logs and would present out findings.

Thanks
Sachin


On Fri, Feb 10, 2017 at 3:42 PM, Damian Guy  wrote:

> Hi Sachin,
>
> The CommitFailedException are thrown because the group is rebalancing. You
> can see log messages like below happening before the commit failed
> exception:
>
> Attempt to heartbeat failed for group new-part-advice since it is
> rebalancing.
>
> It isn't clear from the logs why the rebalancing is happening. The broker
> logs would have been helpful. Also, timestamps on the kafka streams logs
> would also be useful.
>
> w.r.t to continuing after a CommitFailedException during
> onPartitionsRevoked - will have to think through the scenarios.
>
> On Fri, 10 Feb 2017 at 04:35 Sachin Mittal  wrote:
>
> > Hi,
> > I could manage the streams client log, the server logs were deleted since
> > time had elapsed and it got rolled over.
> > See if you can figure out something from these. These are not best of
> logs
> > generated.
> >
> >
> > https://dl.dropboxusercontent.com/u/46450177/
> TestKafkaAdvice.StreamThread-1.log
> > The above log pay attention to StreamThread-1 which fails due to
> > CommitFailedException
> > This mostly seem to be caused by OffsetCommitResponseHandler due to
> unknown
> > member id.
> >
> >
> > https://dl.dropboxusercontent.com/u/46450177/
> TestKafkaAdvice.StreamThread-4.log
> > The above log pay attention to StreamThread-4 which fails due to
> > CommitFailedException during sendOffsetCommitRequest.
> > This mostly seem to be caused by Attempt to heartbeat failed for group
> > new-part-advice since member id is not valid.
> >
> > Also please let us know why we shutdown the thread if we get
> > CommitFailedException, ideally we catch this exception in
> > onPartitionsRevokedand
> > then thread should continue processing the new partitions assigned.
> >
> > Please let me know if you need any more information from me.
> >
> > Thanks
> > Sachin
> >
> >
> > On Thu, Feb 9, 2017 at 2:14 PM, Damian Guy  wrote:
> >
> > > Might be easiest to just send all the logs if possible.
> > >
> > > On Thu, 9 Feb 2017 at 08:10 Sachin Mittal  wrote:
> > >
> > > > I would try to get the logs soon.
> > > > One quick question, I have three brokers which run in cluster with
> > > default
> > > > logging.
> > > >
> > > > Which log4j logs would be of interest at broker side and which broker
> > or
> > > do
> > > > I need to send logs from all three.
> > > >
> > > > My topic is partitioned and replicated 

Re: Getting CommitFailedException in 0.10.2.0 due to member id is not valid or unknown

2017-02-10 Thread Damian Guy
Hi Sachin,

The CommitFailedException are thrown because the group is rebalancing. You
can see log messages like below happening before the commit failed
exception:

Attempt to heartbeat failed for group new-part-advice since it is
rebalancing.

It isn't clear from the logs why the rebalancing is happening. The broker
logs would have been helpful. Also, timestamps on the kafka streams logs
would also be useful.

w.r.t to continuing after a CommitFailedException during
onPartitionsRevoked - will have to think through the scenarios.

On Fri, 10 Feb 2017 at 04:35 Sachin Mittal  wrote:

> Hi,
> I could manage the streams client log, the server logs were deleted since
> time had elapsed and it got rolled over.
> See if you can figure out something from these. These are not best of logs
> generated.
>
>
> https://dl.dropboxusercontent.com/u/46450177/TestKafkaAdvice.StreamThread-1.log
> The above log pay attention to StreamThread-1 which fails due to
> CommitFailedException
> This mostly seem to be caused by OffsetCommitResponseHandler due to unknown
> member id.
>
>
> https://dl.dropboxusercontent.com/u/46450177/TestKafkaAdvice.StreamThread-4.log
> The above log pay attention to StreamThread-4 which fails due to
> CommitFailedException during sendOffsetCommitRequest.
> This mostly seem to be caused by Attempt to heartbeat failed for group
> new-part-advice since member id is not valid.
>
> Also please let us know why we shutdown the thread if we get
> CommitFailedException, ideally we catch this exception in
> onPartitionsRevokedand
> then thread should continue processing the new partitions assigned.
>
> Please let me know if you need any more information from me.
>
> Thanks
> Sachin
>
>
> On Thu, Feb 9, 2017 at 2:14 PM, Damian Guy  wrote:
>
> > Might be easiest to just send all the logs if possible.
> >
> > On Thu, 9 Feb 2017 at 08:10 Sachin Mittal  wrote:
> >
> > > I would try to get the logs soon.
> > > One quick question, I have three brokers which run in cluster with
> > default
> > > logging.
> > >
> > > Which log4j logs would be of interest at broker side and which broker
> or
> > do
> > > I need to send logs from all three.
> > >
> > > My topic is partitioned and replicated on all three so kafka-logs dir
> > > contains same topic logs.
> > >
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Thu, Feb 9, 2017 at 1:32 PM, Damian Guy 
> wrote:
> > >
> > > > Sachin,
> > > >
> > > > Can you provide the full logs from the broker and the streams app? It
> > is
> > > > hard to understand what is going on with only snippets of
> information.
> > It
> > > > seems like the rebalance is taking too long, but i can't tell from
> > this.
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Thu, 9 Feb 2017 at 07:53 Sachin Mittal 
> wrote:
> > > >
> > > > > Hi,
> > > > > In continuation of the CommitFailedException what we observe is
> that
> > > when
> > > > > this happens first time
> > > > >
> > > > > ConsumerCoordinator invokes onPartitionsRevoked on StreamThread.
> > > > > This calls suspendTasksAndState() which again tries to commit
> offset
> > > and
> > > > > then again the same exception is thrown.
> > > > > This gets handled at ConsumerCoordinator and it logs it and then re
> > > > assigns
> > > > > new partition.
> > > > >
> > > > > But stream thread before rethrowing the exception calls
> > > > > rebalanceException = t;
> > > > >
> > > > > https://github.com/apache/kafka/blob/0.10.2/streams/src/
> > > > main/java/org/apache/kafka/streams/processor/internals/
> > > > StreamThread.java#L261
> > > > >
> > > > > So now when runLoop executes it gets this exception and stream
> thread
> > > > > exits.
> > > > >
> > > > > Here are the logs
> > > > >
> > > > >  -   Exception - happend
> > > > >
> > > > > Unsubscribed all topics or patterns and assigned partitions
> > > > > stream-thread [StreamThread-1] Updating suspended tasks to contain
> > > active
> > > > > tasks [[0_32, 0_3, 0_20, 0_8]]
> > > > > stream-thread [StreamThread-1] Removing all active tasks [[0_32,
> 0_3,
> > > > 0_20,
> > > > > 0_8]]
> > > > > stream-thread [StreamThread-1] Removing all standby tasks [[]]
> > > > > User provided listener
> > > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for
> > group
> > > > > new-part-advice failed on partition revocation
> > > > >  -   Revocation failed and exception handled
> > > > >
> > > > > (Re-)joining group new-part-advice
> > > > > stream-thread [StreamThread-1] found [advice-stream] topics
> possibly
> > > > > matching regex
> > > > > stream-thread [StreamThread-1] updating builder with
> > > > > SubscriptionUpdates{updatedTopicSubscriptions=[advice-stream]}
> > topic(s)
> > > > > with possible matching regex subscription(s)
> > > > > Sending JoinGroup ((type: JoinGroupRequest,
> groupId=new-part-advice,
> > > > > sessionTimeout=1, rebalanceTimeout=30, 

Re: Getting CommitFailedException in 0.10.2.0 due to member id is not valid or unknown

2017-02-09 Thread Sachin Mittal
Hi,
I could manage the streams client log, the server logs were deleted since
time had elapsed and it got rolled over.
See if you can figure out something from these. These are not best of logs
generated.

https://dl.dropboxusercontent.com/u/46450177/TestKafkaAdvice.StreamThread-1.log
The above log pay attention to StreamThread-1 which fails due to
CommitFailedException
This mostly seem to be caused by OffsetCommitResponseHandler due to unknown
member id.

https://dl.dropboxusercontent.com/u/46450177/TestKafkaAdvice.StreamThread-4.log
The above log pay attention to StreamThread-4 which fails due to
CommitFailedException during sendOffsetCommitRequest.
This mostly seem to be caused by Attempt to heartbeat failed for group
new-part-advice since member id is not valid.

Also please let us know why we shutdown the thread if we get
CommitFailedException, ideally we catch this exception in
onPartitionsRevokedand
then thread should continue processing the new partitions assigned.

Please let me know if you need any more information from me.

Thanks
Sachin


On Thu, Feb 9, 2017 at 2:14 PM, Damian Guy  wrote:

> Might be easiest to just send all the logs if possible.
>
> On Thu, 9 Feb 2017 at 08:10 Sachin Mittal  wrote:
>
> > I would try to get the logs soon.
> > One quick question, I have three brokers which run in cluster with
> default
> > logging.
> >
> > Which log4j logs would be of interest at broker side and which broker or
> do
> > I need to send logs from all three.
> >
> > My topic is partitioned and replicated on all three so kafka-logs dir
> > contains same topic logs.
> >
> >
> > Thanks
> > Sachin
> >
> >
> > On Thu, Feb 9, 2017 at 1:32 PM, Damian Guy  wrote:
> >
> > > Sachin,
> > >
> > > Can you provide the full logs from the broker and the streams app? It
> is
> > > hard to understand what is going on with only snippets of information.
> It
> > > seems like the rebalance is taking too long, but i can't tell from
> this.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Thu, 9 Feb 2017 at 07:53 Sachin Mittal  wrote:
> > >
> > > > Hi,
> > > > In continuation of the CommitFailedException what we observe is that
> > when
> > > > this happens first time
> > > >
> > > > ConsumerCoordinator invokes onPartitionsRevoked on StreamThread.
> > > > This calls suspendTasksAndState() which again tries to commit offset
> > and
> > > > then again the same exception is thrown.
> > > > This gets handled at ConsumerCoordinator and it logs it and then re
> > > assigns
> > > > new partition.
> > > >
> > > > But stream thread before rethrowing the exception calls
> > > > rebalanceException = t;
> > > >
> > > > https://github.com/apache/kafka/blob/0.10.2/streams/src/
> > > main/java/org/apache/kafka/streams/processor/internals/
> > > StreamThread.java#L261
> > > >
> > > > So now when runLoop executes it gets this exception and stream thread
> > > > exits.
> > > >
> > > > Here are the logs
> > > >
> > > >  -   Exception - happend
> > > >
> > > > Unsubscribed all topics or patterns and assigned partitions
> > > > stream-thread [StreamThread-1] Updating suspended tasks to contain
> > active
> > > > tasks [[0_32, 0_3, 0_20, 0_8]]
> > > > stream-thread [StreamThread-1] Removing all active tasks [[0_32, 0_3,
> > > 0_20,
> > > > 0_8]]
> > > > stream-thread [StreamThread-1] Removing all standby tasks [[]]
> > > > User provided listener
> > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for
> group
> > > > new-part-advice failed on partition revocation
> > > >  -   Revocation failed and exception handled
> > > >
> > > > (Re-)joining group new-part-advice
> > > > stream-thread [StreamThread-1] found [advice-stream] topics possibly
> > > > matching regex
> > > > stream-thread [StreamThread-1] updating builder with
> > > > SubscriptionUpdates{updatedTopicSubscriptions=[advice-stream]}
> topic(s)
> > > > with possible matching regex subscription(s)
> > > > Sending JoinGroup ((type: JoinGroupRequest, groupId=new-part-advice,
> > > > sessionTimeout=1, rebalanceTimeout=30, memberId=,
> > > > protocolType=consumer,
> > > >
> > > > groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$
> > > ProtocolMetadata@300c8a1f
> > > > ))
> > > > to coordinator 192.168.73.198:9092 (id: 2147483643
> <(214)%20748-3643>
> > <(214)%20748-3643>
> > > > rack: null)
> > > >
> > > > stream-thread [StreamThread-1] New partitions [[advice-stream-8,
> > > > advice-stream-32, advice-stream-3, advice-stream-20]] assigned at the
> > end
> > > > of consumer rebalance.
> > > >
> > > >  -   Same new partitions assigned on reblance
> > > >
> > > > stream-thread [StreamThread-1] recycling old task 0_32
> > > > stream-thread [StreamThread-1] recycling old task 0_3
> > > > stream-thread [StreamThread-1] recycling old task 0_20
> > > > stream-thread [StreamThread-1] recycling old task 0_8
> > > >
> > > >  -   It then shuts down
> > > >
> > > 

Re: Getting CommitFailedException in 0.10.2.0 due to member id is not valid or unknown

2017-02-09 Thread Sachin Mittal
I am getting the logs but could you please look at the line

rebalanceException = t;
https://github.com/apache/kafka/blob/0.10.2/streams/src/
main/java/org/apache/kafka/streams/processor/internals/
StreamThread.java#L261

Why are we setting rebalanceException in case of commit failed exception on
partition revoked.

What is happening is when runLoop rensures it encounters this and thread
shuts down . however it just got new partitions assigned and should have
continued.

Thanks
Sachin



On 9 Feb 2017 14:14, "Damian Guy"  wrote:

Might be easiest to just send all the logs if possible.

On Thu, 9 Feb 2017 at 08:10 Sachin Mittal  wrote:

> I would try to get the logs soon.
> One quick question, I have three brokers which run in cluster with default
> logging.
>
> Which log4j logs would be of interest at broker side and which broker or
do
> I need to send logs from all three.
>
> My topic is partitioned and replicated on all three so kafka-logs dir
> contains same topic logs.
>
>
> Thanks
> Sachin
>
>
> On Thu, Feb 9, 2017 at 1:32 PM, Damian Guy  wrote:
>
> > Sachin,
> >
> > Can you provide the full logs from the broker and the streams app? It is
> > hard to understand what is going on with only snippets of information.
It
> > seems like the rebalance is taking too long, but i can't tell from this.
> >
> > Thanks,
> > Damian
> >
> > On Thu, 9 Feb 2017 at 07:53 Sachin Mittal  wrote:
> >
> > > Hi,
> > > In continuation of the CommitFailedException what we observe is that
> when
> > > this happens first time
> > >
> > > ConsumerCoordinator invokes onPartitionsRevoked on StreamThread.
> > > This calls suspendTasksAndState() which again tries to commit offset
> and
> > > then again the same exception is thrown.
> > > This gets handled at ConsumerCoordinator and it logs it and then re
> > assigns
> > > new partition.
> > >
> > > But stream thread before rethrowing the exception calls
> > > rebalanceException = t;
> > >
> > > https://github.com/apache/kafka/blob/0.10.2/streams/src/
> > main/java/org/apache/kafka/streams/processor/internals/
> > StreamThread.java#L261
> > >
> > > So now when runLoop executes it gets this exception and stream thread
> > > exits.
> > >
> > > Here are the logs
> > >
> > >  -   Exception - happend
> > >
> > > Unsubscribed all topics or patterns and assigned partitions
> > > stream-thread [StreamThread-1] Updating suspended tasks to contain
> active
> > > tasks [[0_32, 0_3, 0_20, 0_8]]
> > > stream-thread [StreamThread-1] Removing all active tasks [[0_32, 0_3,
> > 0_20,
> > > 0_8]]
> > > stream-thread [StreamThread-1] Removing all standby tasks [[]]
> > > User provided listener
> > > org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> > > new-part-advice failed on partition revocation
> > >  -   Revocation failed and exception handled
> > >
> > > (Re-)joining group new-part-advice
> > > stream-thread [StreamThread-1] found [advice-stream] topics possibly
> > > matching regex
> > > stream-thread [StreamThread-1] updating builder with
> > > SubscriptionUpdates{updatedTopicSubscriptions=[advice-stream]}
topic(s)
> > > with possible matching regex subscription(s)
> > > Sending JoinGroup ((type: JoinGroupRequest, groupId=new-part-advice,
> > > sessionTimeout=1, rebalanceTimeout=30, memberId=,
> > > protocolType=consumer,
> > >
> > > groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$
> > ProtocolMetadata@300c8a1f
> > > ))
> > > to coordinator 192.168.73.198:9092 (id: 2147483643 <(214)%20748-3643>
> <(214)%20748-3643>
> > > rack: null)
> > >
> > > stream-thread [StreamThread-1] New partitions [[advice-stream-8,
> > > advice-stream-32, advice-stream-3, advice-stream-20]] assigned at the
> end
> > > of consumer rebalance.
> > >
> > >  -   Same new partitions assigned on reblance
> > >
> > > stream-thread [StreamThread-1] recycling old task 0_32
> > > stream-thread [StreamThread-1] recycling old task 0_3
> > > stream-thread [StreamThread-1] recycling old task 0_20
> > > stream-thread [StreamThread-1] recycling old task 0_8
> > >
> > >  -   It then shuts down
> > >
> > > stream-thread [StreamThread-1] Shutting down
> > > stream-thread [StreamThread-1] shutdownTasksAndState: shutting down
all
> > > active tasks [[0_32, 0_3, 0_20, 0_8]] and standby tasks [[]]
> > >
> > > Uncaught exception at : Tue Feb 07 21:43:15 IST 2017
> > > org.apache.kafka.streams.errors.StreamsException: stream-thread
> > > [StreamThread-1] Failed to rebalance
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:612)
> > > ~[kafka-streams-0.10.2.0.jar:na]
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:368)
> > > ~[kafka-streams-0.10.2.0.jar:na]
> > > Caused by: org.apache.kafka.streams.errors.StreamsException:
> > stream-thread
> > > [StreamThread-1] failed to 

Re: Getting CommitFailedException in 0.10.2.0 due to member id is not valid or unknown

2017-02-09 Thread Damian Guy
Might be easiest to just send all the logs if possible.

On Thu, 9 Feb 2017 at 08:10 Sachin Mittal  wrote:

> I would try to get the logs soon.
> One quick question, I have three brokers which run in cluster with default
> logging.
>
> Which log4j logs would be of interest at broker side and which broker or do
> I need to send logs from all three.
>
> My topic is partitioned and replicated on all three so kafka-logs dir
> contains same topic logs.
>
>
> Thanks
> Sachin
>
>
> On Thu, Feb 9, 2017 at 1:32 PM, Damian Guy  wrote:
>
> > Sachin,
> >
> > Can you provide the full logs from the broker and the streams app? It is
> > hard to understand what is going on with only snippets of information. It
> > seems like the rebalance is taking too long, but i can't tell from this.
> >
> > Thanks,
> > Damian
> >
> > On Thu, 9 Feb 2017 at 07:53 Sachin Mittal  wrote:
> >
> > > Hi,
> > > In continuation of the CommitFailedException what we observe is that
> when
> > > this happens first time
> > >
> > > ConsumerCoordinator invokes onPartitionsRevoked on StreamThread.
> > > This calls suspendTasksAndState() which again tries to commit offset
> and
> > > then again the same exception is thrown.
> > > This gets handled at ConsumerCoordinator and it logs it and then re
> > assigns
> > > new partition.
> > >
> > > But stream thread before rethrowing the exception calls
> > > rebalanceException = t;
> > >
> > > https://github.com/apache/kafka/blob/0.10.2/streams/src/
> > main/java/org/apache/kafka/streams/processor/internals/
> > StreamThread.java#L261
> > >
> > > So now when runLoop executes it gets this exception and stream thread
> > > exits.
> > >
> > > Here are the logs
> > >
> > >  -   Exception - happend
> > >
> > > Unsubscribed all topics or patterns and assigned partitions
> > > stream-thread [StreamThread-1] Updating suspended tasks to contain
> active
> > > tasks [[0_32, 0_3, 0_20, 0_8]]
> > > stream-thread [StreamThread-1] Removing all active tasks [[0_32, 0_3,
> > 0_20,
> > > 0_8]]
> > > stream-thread [StreamThread-1] Removing all standby tasks [[]]
> > > User provided listener
> > > org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> > > new-part-advice failed on partition revocation
> > >  -   Revocation failed and exception handled
> > >
> > > (Re-)joining group new-part-advice
> > > stream-thread [StreamThread-1] found [advice-stream] topics possibly
> > > matching regex
> > > stream-thread [StreamThread-1] updating builder with
> > > SubscriptionUpdates{updatedTopicSubscriptions=[advice-stream]} topic(s)
> > > with possible matching regex subscription(s)
> > > Sending JoinGroup ((type: JoinGroupRequest, groupId=new-part-advice,
> > > sessionTimeout=1, rebalanceTimeout=30, memberId=,
> > > protocolType=consumer,
> > >
> > > groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$
> > ProtocolMetadata@300c8a1f
> > > ))
> > > to coordinator 192.168.73.198:9092 (id: 2147483643 <(214)%20748-3643>
> <(214)%20748-3643>
> > > rack: null)
> > >
> > > stream-thread [StreamThread-1] New partitions [[advice-stream-8,
> > > advice-stream-32, advice-stream-3, advice-stream-20]] assigned at the
> end
> > > of consumer rebalance.
> > >
> > >  -   Same new partitions assigned on reblance
> > >
> > > stream-thread [StreamThread-1] recycling old task 0_32
> > > stream-thread [StreamThread-1] recycling old task 0_3
> > > stream-thread [StreamThread-1] recycling old task 0_20
> > > stream-thread [StreamThread-1] recycling old task 0_8
> > >
> > >  -   It then shuts down
> > >
> > > stream-thread [StreamThread-1] Shutting down
> > > stream-thread [StreamThread-1] shutdownTasksAndState: shutting down all
> > > active tasks [[0_32, 0_3, 0_20, 0_8]] and standby tasks [[]]
> > >
> > > Uncaught exception at : Tue Feb 07 21:43:15 IST 2017
> > > org.apache.kafka.streams.errors.StreamsException: stream-thread
> > > [StreamThread-1] Failed to rebalance
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:612)
> > > ~[kafka-streams-0.10.2.0.jar:na]
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:368)
> > > ~[kafka-streams-0.10.2.0.jar:na]
> > > Caused by: org.apache.kafka.streams.errors.StreamsException:
> > stream-thread
> > > [StreamThread-1] failed to suspend stream tasks
> > >
> > >
> > > So is there a way we can restart that stream thread again. Also in this
> > > case should we assign it rebalanceException, because this is
> > > CommitFailedException and new partitions are already assigned and same
> > > thread can continue processing new partitions.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Wed, Feb 8, 2017 at 2:19 PM, Sachin Mittal 
> > wrote:
> > >
> > > > Hi All,
> > > > I am trying out the 0.10.2.0 rc.
> > > > We have a source stream of 40 partitions.
> > > >
> > > 

Re: Getting CommitFailedException in 0.10.2.0 due to member id is not valid or unknown

2017-02-09 Thread Sachin Mittal
I would try to get the logs soon.
One quick question, I have three brokers which run in cluster with default
logging.

Which log4j logs would be of interest at broker side and which broker or do
I need to send logs from all three.

My topic is partitioned and replicated on all three so kafka-logs dir
contains same topic logs.


Thanks
Sachin


On Thu, Feb 9, 2017 at 1:32 PM, Damian Guy  wrote:

> Sachin,
>
> Can you provide the full logs from the broker and the streams app? It is
> hard to understand what is going on with only snippets of information. It
> seems like the rebalance is taking too long, but i can't tell from this.
>
> Thanks,
> Damian
>
> On Thu, 9 Feb 2017 at 07:53 Sachin Mittal  wrote:
>
> > Hi,
> > In continuation of the CommitFailedException what we observe is that when
> > this happens first time
> >
> > ConsumerCoordinator invokes onPartitionsRevoked on StreamThread.
> > This calls suspendTasksAndState() which again tries to commit offset and
> > then again the same exception is thrown.
> > This gets handled at ConsumerCoordinator and it logs it and then re
> assigns
> > new partition.
> >
> > But stream thread before rethrowing the exception calls
> > rebalanceException = t;
> >
> > https://github.com/apache/kafka/blob/0.10.2/streams/src/
> main/java/org/apache/kafka/streams/processor/internals/
> StreamThread.java#L261
> >
> > So now when runLoop executes it gets this exception and stream thread
> > exits.
> >
> > Here are the logs
> >
> >  -   Exception - happend
> >
> > Unsubscribed all topics or patterns and assigned partitions
> > stream-thread [StreamThread-1] Updating suspended tasks to contain active
> > tasks [[0_32, 0_3, 0_20, 0_8]]
> > stream-thread [StreamThread-1] Removing all active tasks [[0_32, 0_3,
> 0_20,
> > 0_8]]
> > stream-thread [StreamThread-1] Removing all standby tasks [[]]
> > User provided listener
> > org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> > new-part-advice failed on partition revocation
> >  -   Revocation failed and exception handled
> >
> > (Re-)joining group new-part-advice
> > stream-thread [StreamThread-1] found [advice-stream] topics possibly
> > matching regex
> > stream-thread [StreamThread-1] updating builder with
> > SubscriptionUpdates{updatedTopicSubscriptions=[advice-stream]} topic(s)
> > with possible matching regex subscription(s)
> > Sending JoinGroup ((type: JoinGroupRequest, groupId=new-part-advice,
> > sessionTimeout=1, rebalanceTimeout=30, memberId=,
> > protocolType=consumer,
> >
> > groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$
> ProtocolMetadata@300c8a1f
> > ))
> > to coordinator 192.168.73.198:9092 (id: 2147483643 <(214)%20748-3643>
> > rack: null)
> >
> > stream-thread [StreamThread-1] New partitions [[advice-stream-8,
> > advice-stream-32, advice-stream-3, advice-stream-20]] assigned at the end
> > of consumer rebalance.
> >
> >  -   Same new partitions assigned on reblance
> >
> > stream-thread [StreamThread-1] recycling old task 0_32
> > stream-thread [StreamThread-1] recycling old task 0_3
> > stream-thread [StreamThread-1] recycling old task 0_20
> > stream-thread [StreamThread-1] recycling old task 0_8
> >
> >  -   It then shuts down
> >
> > stream-thread [StreamThread-1] Shutting down
> > stream-thread [StreamThread-1] shutdownTasksAndState: shutting down all
> > active tasks [[0_32, 0_3, 0_20, 0_8]] and standby tasks [[]]
> >
> > Uncaught exception at : Tue Feb 07 21:43:15 IST 2017
> > org.apache.kafka.streams.errors.StreamsException: stream-thread
> > [StreamThread-1] Failed to rebalance
> > at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:612)
> > ~[kafka-streams-0.10.2.0.jar:na]
> > at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:368)
> > ~[kafka-streams-0.10.2.0.jar:na]
> > Caused by: org.apache.kafka.streams.errors.StreamsException:
> stream-thread
> > [StreamThread-1] failed to suspend stream tasks
> >
> >
> > So is there a way we can restart that stream thread again. Also in this
> > case should we assign it rebalanceException, because this is
> > CommitFailedException and new partitions are already assigned and same
> > thread can continue processing new partitions.
> >
> > Thanks
> > Sachin
> >
> >
> > On Wed, Feb 8, 2017 at 2:19 PM, Sachin Mittal 
> wrote:
> >
> > > Hi All,
> > > I am trying out the 0.10.2.0 rc.
> > > We have a source stream of 40 partitions.
> > >
> > > We start one instance with 4 threads.
> > > After that we start second instance with same config on a different
> > > machine and then same way third instance.
> > >
> > > After application reaches steady state we start getting
> > > CommitFailedException.
> > > Firstly I feel is that message for CommitFailedException should change.
> > It
> > > is thrown from number of places and sometimes is just not related 

Re: Getting CommitFailedException in 0.10.2.0 due to member id is not valid or unknown

2017-02-09 Thread Damian Guy
Sachin,

Can you provide the full logs from the broker and the streams app? It is
hard to understand what is going on with only snippets of information. It
seems like the rebalance is taking too long, but i can't tell from this.

Thanks,
Damian

On Thu, 9 Feb 2017 at 07:53 Sachin Mittal  wrote:

> Hi,
> In continuation of the CommitFailedException what we observe is that when
> this happens first time
>
> ConsumerCoordinator invokes onPartitionsRevoked on StreamThread.
> This calls suspendTasksAndState() which again tries to commit offset and
> then again the same exception is thrown.
> This gets handled at ConsumerCoordinator and it logs it and then re assigns
> new partition.
>
> But stream thread before rethrowing the exception calls
> rebalanceException = t;
>
> https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L261
>
> So now when runLoop executes it gets this exception and stream thread
> exits.
>
> Here are the logs
>
>  -   Exception - happend
>
> Unsubscribed all topics or patterns and assigned partitions
> stream-thread [StreamThread-1] Updating suspended tasks to contain active
> tasks [[0_32, 0_3, 0_20, 0_8]]
> stream-thread [StreamThread-1] Removing all active tasks [[0_32, 0_3, 0_20,
> 0_8]]
> stream-thread [StreamThread-1] Removing all standby tasks [[]]
> User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> new-part-advice failed on partition revocation
>  -   Revocation failed and exception handled
>
> (Re-)joining group new-part-advice
> stream-thread [StreamThread-1] found [advice-stream] topics possibly
> matching regex
> stream-thread [StreamThread-1] updating builder with
> SubscriptionUpdates{updatedTopicSubscriptions=[advice-stream]} topic(s)
> with possible matching regex subscription(s)
> Sending JoinGroup ((type: JoinGroupRequest, groupId=new-part-advice,
> sessionTimeout=1, rebalanceTimeout=30, memberId=,
> protocolType=consumer,
>
> groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@300c8a1f
> ))
> to coordinator 192.168.73.198:9092 (id: 2147483643 <(214)%20748-3643>
> rack: null)
>
> stream-thread [StreamThread-1] New partitions [[advice-stream-8,
> advice-stream-32, advice-stream-3, advice-stream-20]] assigned at the end
> of consumer rebalance.
>
>  -   Same new partitions assigned on reblance
>
> stream-thread [StreamThread-1] recycling old task 0_32
> stream-thread [StreamThread-1] recycling old task 0_3
> stream-thread [StreamThread-1] recycling old task 0_20
> stream-thread [StreamThread-1] recycling old task 0_8
>
>  -   It then shuts down
>
> stream-thread [StreamThread-1] Shutting down
> stream-thread [StreamThread-1] shutdownTasksAndState: shutting down all
> active tasks [[0_32, 0_3, 0_20, 0_8]] and standby tasks [[]]
>
> Uncaught exception at : Tue Feb 07 21:43:15 IST 2017
> org.apache.kafka.streams.errors.StreamsException: stream-thread
> [StreamThread-1] Failed to rebalance
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:612)
> ~[kafka-streams-0.10.2.0.jar:na]
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> ~[kafka-streams-0.10.2.0.jar:na]
> Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread
> [StreamThread-1] failed to suspend stream tasks
>
>
> So is there a way we can restart that stream thread again. Also in this
> case should we assign it rebalanceException, because this is
> CommitFailedException and new partitions are already assigned and same
> thread can continue processing new partitions.
>
> Thanks
> Sachin
>
>
> On Wed, Feb 8, 2017 at 2:19 PM, Sachin Mittal  wrote:
>
> > Hi All,
> > I am trying out the 0.10.2.0 rc.
> > We have a source stream of 40 partitions.
> >
> > We start one instance with 4 threads.
> > After that we start second instance with same config on a different
> > machine and then same way third instance.
> >
> > After application reaches steady state we start getting
> > CommitFailedException.
> > Firstly I feel is that message for CommitFailedException should change.
> It
> > is thrown from number of places and sometimes is just not related to
> > subsequent calls to poll() was longer than the configured
> > max.poll.interval.ms.
> >
> > We have verified this and this is never the case on our case when we get
> > the exception. So perhaps introduce another Exception class, but this
> > message is very misleading and we end up investigation wrong areas.
> >
> > Anyway on the places where this exception gets thrown in our cases
> > 1. Exception trace is this
> > stream-thread [StreamThread-4] Failed while executing StreamTask 0_5 due
> > to commit consumer offsets:
> > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> > completed since the group has already rebalanced and 

Re: Getting CommitFailedException in 0.10.2.0 due to member id is not valid or unknown

2017-02-08 Thread Sachin Mittal
Hi,
In continuation of the CommitFailedException what we observe is that when
this happens first time

ConsumerCoordinator invokes onPartitionsRevoked on StreamThread.
This calls suspendTasksAndState() which again tries to commit offset and
then again the same exception is thrown.
This gets handled at ConsumerCoordinator and it logs it and then re assigns
new partition.

But stream thread before rethrowing the exception calls
rebalanceException = t;
https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L261

So now when runLoop executes it gets this exception and stream thread exits.

Here are the logs

 -   Exception - happend

Unsubscribed all topics or patterns and assigned partitions
stream-thread [StreamThread-1] Updating suspended tasks to contain active
tasks [[0_32, 0_3, 0_20, 0_8]]
stream-thread [StreamThread-1] Removing all active tasks [[0_32, 0_3, 0_20,
0_8]]
stream-thread [StreamThread-1] Removing all standby tasks [[]]
User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$1 for group
new-part-advice failed on partition revocation
 -   Revocation failed and exception handled

(Re-)joining group new-part-advice
stream-thread [StreamThread-1] found [advice-stream] topics possibly
matching regex
stream-thread [StreamThread-1] updating builder with
SubscriptionUpdates{updatedTopicSubscriptions=[advice-stream]} topic(s)
with possible matching regex subscription(s)
Sending JoinGroup ((type: JoinGroupRequest, groupId=new-part-advice,
sessionTimeout=1, rebalanceTimeout=30, memberId=,
protocolType=consumer,
groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@300c8a1f))
to coordinator 192.168.73.198:9092 (id: 2147483643 rack: null)

stream-thread [StreamThread-1] New partitions [[advice-stream-8,
advice-stream-32, advice-stream-3, advice-stream-20]] assigned at the end
of consumer rebalance.

 -   Same new partitions assigned on reblance

stream-thread [StreamThread-1] recycling old task 0_32
stream-thread [StreamThread-1] recycling old task 0_3
stream-thread [StreamThread-1] recycling old task 0_20
stream-thread [StreamThread-1] recycling old task 0_8

 -   It then shuts down

stream-thread [StreamThread-1] Shutting down
stream-thread [StreamThread-1] shutdownTasksAndState: shutting down all
active tasks [[0_32, 0_3, 0_20, 0_8]] and standby tasks [[]]

Uncaught exception at : Tue Feb 07 21:43:15 IST 2017
org.apache.kafka.streams.errors.StreamsException: stream-thread
[StreamThread-1] Failed to rebalance
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:612)
~[kafka-streams-0.10.2.0.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
~[kafka-streams-0.10.2.0.jar:na]
Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread
[StreamThread-1] failed to suspend stream tasks


So is there a way we can restart that stream thread again. Also in this
case should we assign it rebalanceException, because this is
CommitFailedException and new partitions are already assigned and same
thread can continue processing new partitions.

Thanks
Sachin


On Wed, Feb 8, 2017 at 2:19 PM, Sachin Mittal  wrote:

> Hi All,
> I am trying out the 0.10.2.0 rc.
> We have a source stream of 40 partitions.
>
> We start one instance with 4 threads.
> After that we start second instance with same config on a different
> machine and then same way third instance.
>
> After application reaches steady state we start getting
> CommitFailedException.
> Firstly I feel is that message for CommitFailedException should change. It
> is thrown from number of places and sometimes is just not related to
> subsequent calls to poll() was longer than the configured
> max.poll.interval.ms.
>
> We have verified this and this is never the case on our case when we get
> the exception. So perhaps introduce another Exception class, but this
> message is very misleading and we end up investigation wrong areas.
>
> Anyway on the places where this exception gets thrown in our cases
> 1. Exception trace is this
> stream-thread [StreamThread-4] Failed while executing StreamTask 0_5 due
> to commit consumer offsets:
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced and assigned the
> partitions to another member. This means that the time between subsequent
> calls to poll() was longer than the configured max.poll.interval.ms,
> which typically implies that the poll loop is spending too much time
> message processing. You can address this either by increasing the session
> timeout or by reducing the maximum size of batches returned in poll() with
> max.poll.records.
> at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> sendOffsetCommitRequest(ConsumerCoordinator.java:698)
> 

Getting CommitFailedException in 0.10.2.0 due to member id is not valid or unknown

2017-02-08 Thread Sachin Mittal
Hi All,
I am trying out the 0.10.2.0 rc.
We have a source stream of 40 partitions.

We start one instance with 4 threads.
After that we start second instance with same config on a different machine
and then same way third instance.

After application reaches steady state we start getting
CommitFailedException.
Firstly I feel is that message for CommitFailedException should change. It
is thrown from number of places and sometimes is just not related to
subsequent calls to poll() was longer than the configured
max.poll.interval.ms.

We have verified this and this is never the case on our case when we get
the exception. So perhaps introduce another Exception class, but this
message is very misleading and we end up investigation wrong areas.

Anyway on the places where this exception gets thrown in our cases
1. Exception trace is this
stream-thread [StreamThread-4] Failed while executing StreamTask 0_5 due to
commit consumer offsets:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms, which
typically implies that the poll loop is spending too much time message
processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with
max.poll.records.
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
[kafka-clients-0.10.2.0.jar:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
[kafka-clients-0.10.2.0.jar:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
[kafka-clients-0.10.2.0.jar:na]

in the code that line is
// if the generation is null, we are not part of an active group (and we
expect to be).
// the only thing we can do is fail the commit and let the user rejoin the
group in poll()
if (generation == null)
return RequestFuture.failure(new CommitFailedException());

After we investigate what caused the generation to be null we found that
sometime before that following was thrown

Attempt to heartbeat failed for group new-part-advice since member id is
not valid.

We check the code and we see this
if (error == Errors.UNKNOWN_MEMBER_ID) {
log.debug("Attempt to heartbeat failed for group {} since member id is not
valid.", groupId);
resetGeneration();
future.raise(Errors.UNKNOWN_MEMBER_ID);
}

We check the code resetGeneration and find that it does
this.state = MemberState.UNJOINED;

And then in method generation we do something like this
if (this.state != MemberState.STABLE)
return null;

So why are we not returning Generation.NO_GENERATION which was being set by
resetGenerationearlier.

Would this line better?
if (this.state != MemberState.STABLE && !this.rejoinNeeded)
return null;

Also why are we getting UNKNOWN_MEMBER_ID exception in first place.

2. This is the second case:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms, which
typically implies that the poll loop is spending too much time message
processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with
max.poll.records.
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:766)
~[kafka-clients-0.10.2.0.jar:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:712)
~[kafka-clients-0.10.2.0.jar:na]

Here again when we check the code we see that it failed inside
OffsetCommitResponseHandler

if (error == Errors.UNKNOWN_MEMBER_ID
|| error == Errors.ILLEGAL_GENERATION
|| error == Errors.REBALANCE_IN_PROGRESS) {
// need to re-join group
log.debug("Offset commit for group {} failed: {}", groupId, error.
message());
resetGeneration();
future.raise(new CommitFailedException());
return;
}

And we check check the log message before this we find
Offset commit for group new-part-advice failed: The coordinator is not
aware of this member.

This error is again due to UNKNOWN_MEMBER_ID error.
The handling in this case is similar to one we do for heartbeat fail.

So again why are we getting this UNKNOWN_MEMBER_ID error and should not we
handle it with a different exception as these errors are not related to
poll time at all.

So please let us know what could be the issue here and how can we fix this.

Also not that we are running an identical single thread application which
source from identical