大家晚上好,我们这里用flink消费kafka,往mysql写。topic有9个分区,flink任务并行度是3。任务却一直报这个错:2019-08-21 
19:33:07,058 WARN  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Async 
Kafka commit failed.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
completed since the group has already rebalanced and assigned the partitions to 
another member. This means that the time between subsequent calls to poll() was 
longer than the configured max.poll.interval.ms, which typically implies that 
the poll loop is spending too much time message processing. You can address 
this either by increasing the session timeout or by reducing the maximum size 
of batches returned in poll() with max.poll.records.
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:792)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:738)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
        at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)
我已经尝试将session.timeout.ms设置成60s,但结果还是一样的。flink任务的逻辑是kafka->map->filter->split,最后select出来两个流,插到两个不同的mysql表。这里是因为并行度是3的原因,所以导致了触发了consumer的rebalance吗?

回复