Hi All,
I am trying out the 0.10.2.0 rc.
We have a source stream of 40 partitions.

We start one instance with 4 threads.
After that we start second instance with same config on a different machine
and then same way third instance.

After application reaches steady state we start getting
CommitFailedException.
Firstly I feel is that message for CommitFailedException should change. It
is thrown from number of places and sometimes is just not related to
subsequent calls to poll() was longer than the configured
max.poll.interval.ms.

We have verified this and this is never the case on our case when we get
the exception. So perhaps introduce another Exception class, but this
message is very misleading and we end up investigation wrong areas.

Anyway on the places where this exception gets thrown in our cases
1. Exception trace is this
stream-thread [StreamThread-4] Failed while executing StreamTask 0_5 due to
commit consumer offsets:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms, which
typically implies that the poll loop is spending too much time message
processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with
max.poll.records.
    at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
[kafka-clients-0.10.2.0.jar:na]
    at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
[kafka-clients-0.10.2.0.jar:na]
    at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
[kafka-clients-0.10.2.0.jar:na]

in the code that line is
// if the generation is null, we are not part of an active group (and we
expect to be).
// the only thing we can do is fail the commit and let the user rejoin the
group in poll()
if (generation == null)
return RequestFuture.failure(new CommitFailedException());

After we investigate what caused the generation to be null we found that
sometime before that following was thrown

Attempt to heartbeat failed for group new-part-advice since member id is
not valid.

We check the code and we see this
if (error == Errors.UNKNOWN_MEMBER_ID) {
log.debug("Attempt to heartbeat failed for group {} since member id is not
valid.", groupId);
resetGeneration();
future.raise(Errors.UNKNOWN_MEMBER_ID);
}

We check the code resetGeneration and find that it does
this.state = MemberState.UNJOINED;

And then in method generation we do something like this
if (this.state != MemberState.STABLE)
            return null;

So why are we not returning Generation.NO_GENERATION which was being set by
resetGenerationearlier.

Would this line better?
if (this.state != MemberState.STABLE && !this.rejoinNeeded)
            return null;

Also why are we getting UNKNOWN_MEMBER_ID exception in first place.

2. This is the second case:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms, which
typically implies that the poll loop is spending too much time message
processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with
max.poll.records.
    at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:766)
~[kafka-clients-0.10.2.0.jar:na]
    at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:712)
~[kafka-clients-0.10.2.0.jar:na]

Here again when we check the code we see that it failed inside
OffsetCommitResponseHandler

if (error == Errors.UNKNOWN_MEMBER_ID
|| error == Errors.ILLEGAL_GENERATION
|| error == Errors.REBALANCE_IN_PROGRESS) {
// need to re-join group
log.debug("Offset commit for group {} failed: {}", groupId, error.
message());
resetGeneration();
future.raise(new CommitFailedException());
return;
}

And we check check the log message before this we find
Offset commit for group new-part-advice failed: The coordinator is not
aware of this member.

This error is again due to UNKNOWN_MEMBER_ID error.
The handling in this case is similar to one we do for heartbeat fail.

So again why are we getting this UNKNOWN_MEMBER_ID error and should not we
handle it with a different exception as these errors are not related to
poll time at all.

So please let us know what could be the issue here and how can we fix this.

Also not that we are running an identical single thread application which
source from identical single partitioned source topic, and that application
never fails.
So it is hard to understand if we are doing something wrong in our logic.

Also from logs and JMX metrics collection we see that poll is called well
withing default 30 seconds, usually it is around 5 seconds.

Please share any ideas.

Thanks
Sachin

Reply via email to