Hi, 感谢反馈,看上去是一个 bug。可以在 Apache JIRA [1] 上新建一个 ticket 吗?
[1] https://issues.apache.org/jira > On May 25, 2022, at 11:35, 邹璨 <zou....@navercorp.com> wrote: > > flink版本: 1.14.3 > 模块:connectors/kafka > 问题描述: > 我在使用kafka分区动态发现时发生了WakeupException,导致Job失败。异常信息如下 > > 2022-05-10 15:08:03 > java.lang.RuntimeException: One or more fetchers have encountered exception > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350) > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received > unexpected exception while polling the records > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > Caused by: org.apache.kafka.common.errors.WakeupException > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1726) > at > org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1684) > at > org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:315) > at > org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:200) > at > org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) > ... 6 more > > > > 根据异常栈轨迹简单查阅源码发现,KafkaSource在处理分区变更时,会通过调用consumer.wakeup中断正在拉取数据的consumer。 > 随后在处理分区变更时会调用consumer.position方法,由于consumer已经被唤醒,此时会抛出WakeupException。 > > > > > > > 此电子邮件及其包含的信息将仅发送给上面列出的收件人,必须加以保护,并且可能包含法律或其他原因禁止披露的信息。 > 如果您不是此电子邮件的预期收件人,未经许可,您不得存储、复制、发送、分发或披露它。 禁止存储、复制、发送、分发或披露电子邮件的任何部分。 > 如果此电子邮件发送不正确,请立即联系 NAVER > Security(dl_naversecur...@navercorp.com)。然后删除所有原件、副本和附件。谢谢您的合作。 > > This email and the information contained in this email are intended solely > for the recipient(s) addressed above and may contain information that is > confidential and/or privileged or whose disclosure is prohibited by law or > other reasons. > If you are not the intended recipient of this email, please be advised that > any unauthorized storage, duplication, dissemination, distribution or > disclosure of all or part of this email is strictly prohibited. > If you received this email in error, please immediately contact NAVER > Security (dl_naversecur...@navercorp.com) and delete this email and any > copies and attachments from your system. Thank you for your cooperation.