Hi,
In continuation of the CommitFailedException what we observe is that when
this happens first time

ConsumerCoordinator invokes onPartitionsRevoked on StreamThread.
This calls suspendTasksAndState() which again tries to commit offset and
then again the same exception is thrown.
This gets handled at ConsumerCoordinator and it logs it and then re assigns
new partition.

But stream thread before rethrowing the exception calls
rebalanceException = t;
https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L261

So now when runLoop executes it gets this exception and stream thread exits.

Here are the logs

 -----   Exception - happend

Unsubscribed all topics or patterns and assigned partitions
stream-thread [StreamThread-1] Updating suspended tasks to contain active
tasks [[0_32, 0_3, 0_20, 0_8]]
stream-thread [StreamThread-1] Removing all active tasks [[0_32, 0_3, 0_20,
0_8]]
stream-thread [StreamThread-1] Removing all standby tasks [[]]
User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$1 for group
new-part-advice failed on partition revocation
 -----   Revocation failed and exception handled

(Re-)joining group new-part-advice
stream-thread [StreamThread-1] found [advice-stream] topics possibly
matching regex
stream-thread [StreamThread-1] updating builder with
SubscriptionUpdates{updatedTopicSubscriptions=[advice-stream]} topic(s)
with possible matching regex subscription(s)
Sending JoinGroup ((type: JoinGroupRequest, groupId=new-part-advice,
sessionTimeout=10000, rebalanceTimeout=300000, memberId=,
protocolType=consumer,
groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@300c8a1f))
to coordinator 192.168.73.198:9092 (id: 2147483643 rack: null)

stream-thread [StreamThread-1] New partitions [[advice-stream-8,
advice-stream-32, advice-stream-3, advice-stream-20]] assigned at the end
of consumer rebalance.

 -----   Same new partitions assigned on reblance

stream-thread [StreamThread-1] recycling old task 0_32
stream-thread [StreamThread-1] recycling old task 0_3
stream-thread [StreamThread-1] recycling old task 0_20
stream-thread [StreamThread-1] recycling old task 0_8

 -----   It then shuts down

stream-thread [StreamThread-1] Shutting down
stream-thread [StreamThread-1] shutdownTasksAndState: shutting down all
active tasks [[0_32, 0_3, 0_20, 0_8]] and standby tasks [[]]

Uncaught exception at : Tue Feb 07 21:43:15 IST 2017
org.apache.kafka.streams.errors.StreamsException: stream-thread
[StreamThread-1] Failed to rebalance
    at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:612)
~[kafka-streams-0.10.2.0.jar:na]
    at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
~[kafka-streams-0.10.2.0.jar:na]
Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread
[StreamThread-1] failed to suspend stream tasks


So is there a way we can restart that stream thread again. Also in this
case should we assign it rebalanceException, because this is
CommitFailedException and new partitions are already assigned and same
thread can continue processing new partitions.

Thanks
Sachin


On Wed, Feb 8, 2017 at 2:19 PM, Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi All,
> I am trying out the 0.10.2.0 rc.
> We have a source stream of 40 partitions.
>
> We start one instance with 4 threads.
> After that we start second instance with same config on a different
> machine and then same way third instance.
>
> After application reaches steady state we start getting
> CommitFailedException.
> Firstly I feel is that message for CommitFailedException should change. It
> is thrown from number of places and sometimes is just not related to
> subsequent calls to poll() was longer than the configured
> max.poll.interval.ms.
>
> We have verified this and this is never the case on our case when we get
> the exception. So perhaps introduce another Exception class, but this
> message is very misleading and we end up investigation wrong areas.
>
> Anyway on the places where this exception gets thrown in our cases
> 1. Exception trace is this
> stream-thread [StreamThread-4] Failed while executing StreamTask 0_5 due
> to commit consumer offsets:
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced and assigned the
> partitions to another member. This means that the time between subsequent
> calls to poll() was longer than the configured max.poll.interval.ms,
> which typically implies that the poll loop is spending too much time
> message processing. You can address this either by increasing the session
> timeout or by reducing the maximum size of batches returned in poll() with
> max.poll.records.
>     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> sendOffsetCommitRequest(ConsumerCoordinator.java:698)
> [kafka-clients-0.10.2.0.jar:na]
>     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> commitOffsetsSync(ConsumerCoordinator.java:577)
> [kafka-clients-0.10.2.0.jar:na]
>     at org.apache.kafka.clients.consumer.KafkaConsumer.
> commitSync(KafkaConsumer.java:1125) [kafka-clients-0.10.2.0.jar:na]
>
> in the code that line is
> // if the generation is null, we are not part of an active group (and we
> expect to be).
> // the only thing we can do is fail the commit and let the user rejoin
> the group in poll()
> if (generation == null)
> return RequestFuture.failure(new CommitFailedException());
>
> After we investigate what caused the generation to be null we found that
> sometime before that following was thrown
>
> Attempt to heartbeat failed for group new-part-advice since member id is
> not valid.
>
> We check the code and we see this
> if (error == Errors.UNKNOWN_MEMBER_ID) {
> log.debug("Attempt to heartbeat failed for group {} since member id is
> not valid.", groupId);
> resetGeneration();
> future.raise(Errors.UNKNOWN_MEMBER_ID);
> }
>
> We check the code resetGeneration and find that it does
> this.state = MemberState.UNJOINED;
>
> And then in method generation we do something like this
> if (this.state != MemberState.STABLE)
>             return null;
>
> So why are we not returning Generation.NO_GENERATION which was being set
> by resetGenerationearlier.
>
> Would this line better?
> if (this.state != MemberState.STABLE && !this.rejoinNeeded)
>             return null;
>
> Also why are we getting UNKNOWN_MEMBER_ID exception in first place.
>
> 2. This is the second case:
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced and assigned the
> partitions to another member. This means that the time between subsequent
> calls to poll() was longer than the configured max.poll.interval.ms,
> which typically implies that the poll loop is spending too much time
> message processing. You can address this either by increasing the session
> timeout or by reducing the maximum size of batches returned in poll() with
> max.poll.records.
>     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$
> OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:766)
> ~[kafka-clients-0.10.2.0.jar:na]
>     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$
> OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:712)
> ~[kafka-clients-0.10.2.0.jar:na]
>
> Here again when we check the code we see that it failed inside
> OffsetCommitResponseHandler
>
> if (error == Errors.UNKNOWN_MEMBER_ID
> || error == Errors.ILLEGAL_GENERATION
> || error == Errors.REBALANCE_IN_PROGRESS) {
> // need to re-join group
> log.debug("Offset commit for group {} failed: {}", groupId, error.
> message());
> resetGeneration();
> future.raise(new CommitFailedException());
> return;
> }
>
> And we check check the log message before this we find
> Offset commit for group new-part-advice failed: The coordinator is not
> aware of this member.
>
> This error is again due to UNKNOWN_MEMBER_ID error.
> The handling in this case is similar to one we do for heartbeat fail.
>
> So again why are we getting this UNKNOWN_MEMBER_ID error and should not we
> handle it with a different exception as these errors are not related to
> poll time at all.
>
> So please let us know what could be the issue here and how can we fix this.
>
> Also not that we are running an identical single thread application which
> source from identical single partitioned source topic, and that application
> never fails.
> So it is hard to understand if we are doing something wrong in our logic.
>
> Also from logs and JMX metrics collection we see that poll is called well
> withing default 30 seconds, usually it is around 5 seconds.
>
> Please share any ideas.
>
> Thanks
> Sachin
>
>

Reply via email to