Hi,

I think the preceding message that the consumer is not a member of the
group suggests that there is some connectivity issue.
Perhaps, heartbeats are timing out in which case you might want to
increase session.timeout.ms [1] and heartbeat.interval.ms.

[1]
https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms

Regards,
Roman

On Fri, Aug 27, 2021 at 11:43 AM Qingsheng Ren <renqs...@gmail.com> wrote:
>
> Hi Hemant,
>
> One possible reason is that another Kafka consumer is using the same consumer 
> group id as the one in FlinkKafkaConsumer. You can try to use another 
> group.id in FlinkKafkaConsumer to validate this.
>
> If it’s not group id’s problem, there are some Kafka consumer metrics [1] 
> that might be helpful for debugging this, such as “time-between-poll-avg”, 
> “heartbeat-rate” and so forth, to check whether it’s poll interval’s problem 
> as suggested by Kafka’s exception. All Kafka consumer metrics are registered 
> under metric group “KafkaConsumer” in Flink’s metric system.
>
> Besides, it might be helpful to set logging level of 
> org.apache.kafka.clients.consumer to DEBUG or TRACE,  which can provide more 
> information about why offset commit is failed.
>
> Hope this can help you~
>
> [1] https://kafka.apache.org/documentation/#consumer_monitoring
>
> --
> Best Regards,
>
> Qingsheng Ren
> Email: renqs...@gmail.com
> On Aug 26, 2021, 10:25 PM +0800, bat man <tintin0...@gmail.com>, wrote:
>
> Hi,
>
> I am using flink 12.1 to consume data from kafka in a streaming job. Using 
> the flink-connector-kafka_2.12:1.12.1. Kafka broker version is 2.2.1
>  In logs I see warnings like this -
>
> 2021-08-26 13:36:49,903 WARN 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher [] - 
> Committing offsets to Kafka failed. This does not compromise Flink's 
> checkpoints.
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
> completed since the group has already rebalanced and assigned the partitions 
> to another member.
> This means that the time between subsequent calls to poll() was longer than 
> the configured max.poll.interval.ms, which typically implies that the poll 
> loop is spending too much time message processing.
> You can address this either by increasing max.poll.interval.ms or by reducing 
> the maximum size of batches returned in poll() with max.poll.records.
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:840)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:790)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:910)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:890)
>     at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>     at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>     at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1256)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1135)
>     at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:258)
>
> I understand that this might not cause an issue as checkpointing is not 
> impacted, however metrics monitoring might as I am using burrow to monitor 
> group offsets. I have already tried to change below properties in kafka 
> producer configs -
>
>         kafkaProps.setProperty("max.poll.interval.ms","900000");
>         kafkaProps.setProperty("max.poll.records","200");
>         kafkaProps.setProperty("heartbeat.interval.ms","1000");
>         kafkaProps.setProperty("request.timeout.ms","40000");
>         kafkaProps.setProperty("session.timeout.ms","10000");
> But the warnings are still present in the logs.
>
> In addition I see this error just before this warn -
> ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - 
> [Consumer clientId=consumer-3, groupId=xxx] Offset commit failed on partition 
> xxx-1 at offset 33651:
> The coordinator is not aware of this member.
>
> The code uses watermarkstrategy to extract timestamp and emit watermark.
>
> Any clue is much appreciated.
>
> Thanks,
> Hemant

Reply via email to