嗯,我之前也试过了,kafkaSouce确实是可以的,就是FlinkKafkaConsumer不行。
yanfei lei <fredia...@gmail.com> 于2022年10月14日周五 14:22写道: > > Hi yidan && hjw, > 我用FlinkKafkaConsumer在本地也复现了这一问题,但用KafakaSource是可以正常做stop-with-savepoint的。FlinkKafkaConsumer在Flink > 1.15后被deprecated了[1],推荐用新的KafkaSource再试试。 > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sourcefunction > > Best, > Yanfei > > hjw <1010445...@qq.com.invalid> 于2022年8月23日周二 23:39写道: > > > 我认为这个问题应该是Kafka Connector用旧的Api导致的。这个问题在IDEA本地跑就可以复现。我针对这个问题已经提过相关Jira > > https://issues.apache.org/jira/browse/FLINK-28758。目前还没有收到社区的反馈。 > > > > > > ------------------ 原始邮件 ------------------ > > 发件人: > > "user-zh" > > < > > hinobl...@gmail.com>; > > 发送时间: 2022年8月23日(星期二) 晚上11:09 > > 收件人: "user-zh"<user-zh@flink.apache.org>; > > > > 主题: Re: Re: flink1.15.1 stop 任务失败 > > > > > > > > 1 大概率是source部分问题,或者 savepoint 的 trigger 层面。 > > 2 也可以从 cancel 和 stop 的区别上考虑下? > > 3 补充信息:我的kafka source是用的旧版本(没办法用新版本,原因是由于一些原因我必须用 kafka 低版本 client)。 > > > > yidan zhao <hinobl...@gmail.com> 于2022年8月23日周二 23:06写道: > > > > > > 看了下,报错很少。 > > > 反正 flink cancel -s 是可以的,flink stop 就不行。而且目测是瞬间失败。从web > > > ui来看,整个savepoint的完成是0/841,应该是几乎没开始就出错了。 > > > 目前4台机器: > > > 机器1 > > > 2022-08-23 22:47:37,093 WARN > > > > > org.apache.flink.runtime.taskmanager.Task > > [] - > > > Source: JobConfig -> Split(JobName_configType) > > > (1/1)#0 (b5076938b231fb9d33e582104292ebd5) switched from > > RUNNING to > > > FAILED with failure cause: > > > org.apache.flink.util.FlinkRuntimeException: S > > > top-with-savepoint failed. > > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1339) > > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364) > > > at > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskAction > > > Executor.java:93) > > > at > > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > > > at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:33 > > > 8) > > > at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) > > > at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > > > at > > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > > > at > > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > > > at > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > > > at > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > > > at > > java.lang.Thread.run(Thread.java:748) > > > 下面就是各种 free task,unregister扒拉的。 > > > > > > 机器2 > > > ... > > > 基本看起来都是从 Attempt to cancel 开始,没有先报错,再cancel的貌似。 > > > > > > Xuyang <xyzhong...@163.com> 于2022年8月23日周二 22:25写道: > > > > > > > > Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Best! > > > > Xuyang > > > > > > > > > > > > > > > > > > > > > > > > Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的 > > > > 在 2022-08-23 20:41:59,"yidan zhao" <hinobl...@gmail.com> 写道: > > > > >补充部分信息: > > > > >看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单: > > > > >2022-08-23 20:33:22,307 INFO > > > > > > >org.apache.flink.runtime.jobmaster.JobMaster > > [] - > > > > >Triggering savepoint for job 8d231de75b8227a1b > > > > >715b1aa665caa91. > > > > > > > > > >2022-08-23 20:33:22,318 INFO > > > > > > >org.apache.flink.runtime.checkpoint.CheckpointCoordinator > > [] - > > > > >Triggering checkpoint 5 (type=SavepointType{na > > > > >me='Savepoint', postCheckpointAction=NONE, > > formatType=CANONICAL}) @ > > > > >1661258002307 for job 8d231de75b8227a1b715b1aa665caa91. > > > > > > > > > >2022-08-23 20:33:23,701 INFO > > > > > > >org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream > > > > >[] - Cannot create recoverable writer > > > > > due to Recoverable writers on Hadoop are only supported for > > HDFS, > > > > >will use the ordinary writer. > > > > > > > > > >2022-08-23 20:33:23,908 INFO > > > > > > >org.apache.flink.runtime.checkpoint.CheckpointCoordinator > > [] - > > > > >Completed checkpoint 5 for job > > 8d231de75b8227a1b715b1aa665caa91 > > > > >(1638207 bytes, checkpointDuration=1600 ms, > > finalizationTime=1 ms). > > > > > > > > > > > > > > >如果是 stop xxx 这样停止任务,则JM日志(错误)如下: > > > > > > > > > >2022-08-23 20:35:01,834 INFO > > > > > > >org.apache.flink.runtime.jobmaster.JobMaster > > [] - > > > > >Triggering stop-with-savepoint for job > > > > >8d231de75b8227a1b715b1aa665caa91. > > > > > > > > > >2022-08-23 20:35:01,842 INFO > > > > > > >org.apache.flink.runtime.checkpoint.CheckpointCoordinator > > [] - > > > > >Triggering checkpoint 6 (type=SavepointType{name='Suspend > > Savepoint', > > > > >postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ > > 1661258101834 > > > > >for job 8d231de75b8227a1b715b1aa665caa91. > > > > > > > > > >2022-08-23 20:35:02,083 INFO > > > > > > >org.apache.flink.runtime.checkpoint.CheckpointCoordinator > > [] - > > > > >Decline checkpoint 6 by task > > a65383dad01bc15f654c4afe4aa63b6d of job > > > > >8d231de75b8227a1b715b1aa665caa91 at 10.35.95.150:13151-3dfdc5 > > @ > > > > >xxx.xxx.com (dataPort=13156). > > > > >(此处看起来是被decline了,原因是 task failed?) > > > > >org.apache.flink.util.SerializedThrowable: Task name with > > subtask : > > > > >Source: XXX_Kafka(startTs:latest) ->... ->... ->... > > (10/10)#2 Failure > > > > >reason: Task has failed. > > > > > at > > org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388) > > > > >~[flink-dist-1.15.1.jar:1.15.1] > > > > > at > > org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331) > > > > >~[flink-dist-1.15.1.jar:1.15.1] > > > > > at > > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) > > > > >~[?:1.8.0_251] > > > > > at > > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) > > > > >~[?:1.8.0_251] > > > > > at > > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > > > > >~[?:1.8.0_251] > > > > > at > > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) > > > > >~[?:1.8.0_251] > > > > > at > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343) > > > > >~[flink-dist-1.15.1.jar:1.15.1] > > > > >Caused by: org.apache.flink.util.SerializedThrowable: > > > > > > >org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException > > > > > at > > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > > > > >~[?:1.8.0_251] > > > > > at > > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > > > > >~[?:1.8.0_251] > > > > > at > > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) > > > > >~[?:1.8.0_251] > > > > > at > > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) > > > > >~[?:1.8.0_251] > > > > > ... 3 more > > > > >Caused by: org.apache.flink.util.SerializedThrowable > > > > > at > > org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177) > > > > >~[?:?] > > > > > at > > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164) > > > > >~[?:?] > > > > > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:1002) > > > > >~[?:?] > > > > > at > > org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128) > > > > >~[flink-dist-1.15.1.jar:1.15.1] > > > > > at > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305) > > > > >~[flink-dist-1.15.1.jar:1.15.1] > > > > > at > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285) > > > > >~[flink-dist-1.15.1.jar:1.15.1] > > > > > at > > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) > > > > >~[flink-dist-1.15.1.jar:1.15.1] > > > > > at > > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > > > > >~[flink-dist-1.15.1.jar:1.15.1] > > > > > at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) > > > > >~[flink-dist-1.15.1.jar:1.15.1] > > > > > at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) > > > > >~[flink-dist-1.15.1.jar:1.15.1] > > > > > at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) > > > > >~[flink-dist-1.15.1.jar:1.15.1] > > > > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > > > > >~[flink-dist-1.15.1.jar:1.15.1] > > > > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > > > > >~[flink-dist-1.15.1.jar:1.15.1] > > > > > at > > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > > > > >~[flink-dist-1.15.1.jar:1.15.1] > > > > > at > > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > > > > >~[flink-dist-1.15.1.jar:1.15.1] > > > > > at > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > > > > >~[flink-dist-1.15.1.jar:1.15.1] > > > > > at > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > > > > >~[flink-dist-1.15.1.jar:1.15.1] > > > > > at > > java.lang.Thread.run(Thread.java:748) [?:1.8.0_251] > > > > > > > > > >yidan zhao <hinobl...@gmail.com> 于2022年8月23日周二 20:31写道: > > > > >> > > > > >> 如题,stop,停止并保存检查点失败。 > > > > >> 测试看 cancel、cancel -s 方式都成功。 cancel -s 可成功生成检查点并退出。 > > > > >> > > > > >> stop则不行,报错主要是 > > > > >> Could not stop with a savepoint job > > "1b87f308e2582f3cc0e3ccc812471201" > > > > >> ... > > > > >> Caused by: java.util.concurrent.ExecutionException: > > > > >> java.util.concurrent.CompletionException: > > > > >> org.apache.flink.runtime.checkpoint.CheckpointEx > > > > >> ception: Task has failed. > > > > >> ... > > > > >> Caused by: org.apache.flink.util.SerializedThrowable: > > > > >> > > org.apache.flink.runtime.checkpoint.CheckpointException: Task has > > > > >> failed. > > > > >> ... > > > > >> Caused by: org.apache.flink.util.SerializedThrowable: > > Task has failed. > > > > >> ... > > > > >> > > > > >> ______详细日志: