????????????????????Kafka 
Connector??????Api??????????????????IDEA????????????????????????????????????????????Jira
 https://issues.apache.org/jira/browse/FLINK-28758????????????????????????????


------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<hinobl...@gmail.com&gt;;
????????:&nbsp;2022??8??23??(??????) ????11:09
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;Re: Re: flink1.15.1 stop ????????



1 ????????source?????????????? savepoint ?? trigger ??????
2 ???????? cancel ?? stop ????????????????
3 ??????????????kafka 
source???????????????????????????????????????????????????????? kafka ?????? 
client????

yidan zhao <hinobl...@gmail.com&gt; ??2022??8??23?????? 23:06??????
&gt;
&gt; ??????????????????
&gt; ???? flink cancel -s ??????????flink stop ??????????????????????????????web
&gt; ui??????????savepoint????????0/841????????????????????????????
&gt; ????4????????
&gt; ????1
&gt; 2022-08-23 22:47:37,093 WARN
&gt; 
org.apache.flink.runtime.taskmanager.Task&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 [] -
&gt; Source: JobConfig -&gt; Split(JobName_configType)
&gt;&nbsp; (1/1)#0 (b5076938b231fb9d33e582104292ebd5) switched from RUNNING to
&gt; FAILED with failure cause:
&gt; org.apache.flink.util.FlinkRuntimeException: S
&gt; top-with-savepoint failed.
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1339)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1364)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskAction
&gt; Executor.java:93)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:33
&gt; 8)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
java.lang.Thread.run(Thread.java:748)
&gt; ???????????? free task??unregister????????
&gt;
&gt; ????2
&gt; ...
&gt; ???????????????? Attempt to cancel ????????????????????cancel????????
&gt;
&gt; Xuyang <xyzhong...@163.com&gt; ??2022??8??23?????? 22:25??????
&gt; &gt;
&gt; &gt; Hi, TM????????????????????????????????????????????????cp??????
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; --
&gt; &gt;
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; Best??
&gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp; Xuyang
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; Hi, TM????????????????????????????????????????????????cp??????
&gt; &gt; ?? 2022-08-23 20:41:59??"yidan zhao" <hinobl...@gmail.com&gt; ??????
&gt; &gt; &gt;??????????????
&gt; &gt; &gt;?????????????? flink savepoint xxx 
????????????????JM??????????????
&gt; &gt; &gt;2022-08-23 20:33:22,307 INFO
&gt; &gt; 
&gt;org.apache.flink.runtime.jobmaster.JobMaster&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 [] -
&gt; &gt; &gt;Triggering savepoint for job 8d231de75b8227a1b
&gt; &gt; &gt;715b1aa665caa91.
&gt; &gt; &gt;
&gt; &gt; &gt;2022-08-23 20:33:22,318 INFO
&gt; &gt; 
&gt;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp; 
[] -
&gt; &gt; &gt;Triggering checkpoint 5 (type=SavepointType{na
&gt; &gt; &gt;me='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) 
@
&gt; &gt; &gt;1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.
&gt; &gt; &gt;
&gt; &gt; &gt;2022-08-23 20:33:23,701 INFO
&gt; &gt; 
&gt;org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
&gt; &gt; &gt;[] - Cannot create recoverable writer
&gt; &gt; &gt; due to Recoverable writers on Hadoop are only supported for HDFS,
&gt; &gt; &gt;will use the ordinary writer.
&gt; &gt; &gt;
&gt; &gt; &gt;2022-08-23 20:33:23,908 INFO
&gt; &gt; 
&gt;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp; 
[] -
&gt; &gt; &gt;Completed checkpoint 5 for job 8d231de75b8227a1b715b1aa665caa91
&gt; &gt; &gt;(1638207 bytes, checkpointDuration=1600 ms, finalizationTime=1 
ms).
&gt; &gt; &gt;
&gt; &gt; &gt;
&gt; &gt; &gt;?????? stop xxx ????????????????JM??????????????????
&gt; &gt; &gt;
&gt; &gt; &gt;2022-08-23 20:35:01,834 INFO
&gt; &gt; 
&gt;org.apache.flink.runtime.jobmaster.JobMaster&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 [] -
&gt; &gt; &gt;Triggering stop-with-savepoint for job
&gt; &gt; &gt;8d231de75b8227a1b715b1aa665caa91.
&gt; &gt; &gt;
&gt; &gt; &gt;2022-08-23 20:35:01,842 INFO
&gt; &gt; 
&gt;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp; 
[] -
&gt; &gt; &gt;Triggering checkpoint 6 (type=SavepointType{name='Suspend 
Savepoint',
&gt; &gt; &gt;postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 
1661258101834
&gt; &gt; &gt;for job 8d231de75b8227a1b715b1aa665caa91.
&gt; &gt; &gt;
&gt; &gt; &gt;2022-08-23 20:35:02,083 INFO
&gt; &gt; 
&gt;org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp;&nbsp;&nbsp; 
[] -
&gt; &gt; &gt;Decline checkpoint 6 by task a65383dad01bc15f654c4afe4aa63b6d of 
job
&gt; &gt; &gt;8d231de75b8227a1b715b1aa665caa91 at 10.35.95.150:13151-3dfdc5 @
&gt; &gt; &gt;xxx.xxx.com (dataPort=13156).
&gt; &gt; &gt;????????????????decline?????????? task failed????
&gt; &gt; &gt;org.apache.flink.util.SerializedThrowable: Task name with subtask 
:
&gt; &gt; &gt;Source: XXX_Kafka(startTs:latest) -&gt;... -&gt;... -&gt;... 
(10/10)#2 Failure
&gt; &gt; &gt;reason: Task has failed.
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
&gt; &gt; &gt;~[?:1.8.0_251]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
&gt; &gt; &gt;~[?:1.8.0_251]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
&gt; &gt; &gt;~[?:1.8.0_251]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
&gt; &gt; &gt;~[?:1.8.0_251]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;Caused by: org.apache.flink.util.SerializedThrowable:
&gt; &gt; 
&gt;org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
&gt; &gt; &gt;~[?:1.8.0_251]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
&gt; &gt; &gt;~[?:1.8.0_251]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
&gt; &gt; &gt;~[?:1.8.0_251]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
&gt; &gt; &gt;~[?:1.8.0_251]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 3 more
&gt; &gt; &gt;Caused by: org.apache.flink.util.SerializedThrowable
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
&gt; &gt; &gt;~[?:?]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
&gt; &gt; &gt;~[?:?]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:1002)
&gt; &gt; &gt;~[?:?]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
&gt; &gt; &gt;~[flink-dist-1.15.1.jar:1.15.1]
&gt; &gt; &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
&gt; &gt; &gt;
&gt; &gt; &gt;yidan zhao <hinobl...@gmail.com&gt; ??2022??8??23?????? 
20:31??????
&gt; &gt; &gt;&gt;
&gt; &gt; &gt;&gt; ??????stop????????????????????????
&gt; &gt; &gt;&gt; ?????? cancel??cancel -s ???????????? cancel -s 
????????????????????????
&gt; &gt; &gt;&gt;
&gt; &gt; &gt;&gt; stop??????????????????
&gt; &gt; &gt;&gt; Could not stop with a savepoint job 
"1b87f308e2582f3cc0e3ccc812471201"
&gt; &gt; &gt;&gt; ...
&gt; &gt; &gt;&gt; Caused by: java.util.concurrent.ExecutionException:
&gt; &gt; &gt;&gt; java.util.concurrent.CompletionException:
&gt; &gt; &gt;&gt; org.apache.flink.runtime.checkpoint.CheckpointEx
&gt; &gt; &gt;&gt; ception: Task has failed.
&gt; &gt; &gt;&gt; ...
&gt; &gt; &gt;&gt; Caused by: org.apache.flink.util.SerializedThrowable:
&gt; &gt; &gt;&gt; org.apache.flink.runtime.checkpoint.CheckpointException: 
Task has
&gt; &gt; &gt;&gt; failed.
&gt; &gt; &gt;&gt; ...
&gt; &gt; &gt;&gt; Caused by: org.apache.flink.util.SerializedThrowable: Task 
has failed.
&gt; &gt; &gt;&gt; ...
&gt; &gt; &gt;&gt;
&gt; &gt; &gt;&gt; ______??????????

回复