2022-01-23 04:31:39,568 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph      
 [] - Source: HiveSource-cosldatacenter.ods_rimdrill_dailyincidentsevents 
-> Calc(select=[jobid, reportno, dayssincelast], where=[(idno = 1:BIGINT)]) 
(1/1) (7533d77baa7eb16e8242ae63e0706dff) switched from RUNNING to CANCELING.
2022-01-23 04:31:39,570 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph      
 [] - Discarding the results produced by task execution 
07b2cd514c6b6d85f79ab5b953971f82.
2022-01-23 04:31:39,570 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph      
 [] - MultipleInput(readOrder=[0,0,1], 
members=[\nHashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND 
($f109 = reportno0))], select=[jobid, reportno, reportdate, depthprogress, 
numperstype1, numperstype2, numperstype3, numperstype4, numperstype5, 
numperstype6, currentops, futureops, dayssincelast, $f109, jobid0, reportno0, 
boptestdate, bopfunctiontestdate], build=[right])\n:- Calc(select=[jobid, 
reportno, reportdate, depthprogress, numperstype1, numperstype2, numperstype3, 
numperstype4, numperstype5, numperstype6, currentops, futureops, dayssincelast, 
bigint(reportno) AS $f109])\n:  +- HashJoin(joinType=[LeftOuterJoin], 
where=[((jobid = jobid0) AND ($f94 = reportno0))], select=[jobid, reportno, 
reportdate, depthprogress, numperstype1, numperstype2, numperstype3, 
numperstype4, numperstype5, numperstype6, currentops, futureops, $f94, jobid0, 
reportno0, dayssincelast], build=[right])\n:     :- 
Calc(select=[jobid, reportno, reportdate, depthprogress, numperstype1, 
numperstype2, numperstype3, numperstype4, numperstype5, numperstype6, 
currentops, futureops, bigint(reportno) AS $f94])\n:     :  
+- [#3] Exchange(distribution=[hash[jobid]])\n:     +- [#2] 
Exchange(distribution=[hash[jobid]])\n+- [#1] 
Exchange(distribution=[hash[jobid]])\n]) -> Calc(select=[jobid AS $f0, 
reportno AS $f1, dayssincelast AS $f2, depthprogress AS $f3, currentops AS $f4, 
futureops AS $f5, reportdate AS $f6, numperstype1 AS $f7, numperstype2 AS $f8, 
numperstype3 AS $f9, numperstype4 AS $f10, numperstype5 AS $f11, numperstype6 
AS $f12, boptestdate AS $f13, bopfunctiontestdate AS $f14]) -> 
HashAggregate(isMerge=[false], groupBy=[$f0, $f1, $f2, $f3, $f4, $f5, $f6, $f7, 
$f8, $f9, $f10, $f11, $f12], select=[$f0, $f1, $f2, $f3, $f4, $f5, $f6, $f7, 
$f8, $f9, $f10, $f11, $f12, MAX($f13) AS lasboptestdate, MAX($f14) AS 
lasbopfunctiontestdate]) -> Calc(select=[$f0 AS jobid, $f1 AS reportno, 
string($f6) AS reportdate, bigint((((((nvl($f7, 0) + nvl($f8, 0)) + nvl($f9, 
0)) + nvl($f10, 0)) + nvl($f11, 0)) + nvl($f12, 0))) AS pobcnt, $f2 AS 
dayssincelast, $f3 AS depthprogress, $f4 AS currentops, $f5 AS futureops, 
lasboptestdate, lasbopfunctiontestdate]) -> Map -> Sink: Unnamed (1/1) 
(3c555cbd6bf411a6111cf7eaab527d33) switched from CREATED to CANCELING.
2022-01-23 04:31:39,570 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph      
 [] - MultipleInput(readOrder=[0,0,1], 
members=[\nHashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND 
($f109 = reportno0))], select=[jobid, reportno, reportdate, depthprogress, 
numperstype1, numperstype2, numperstype3, numperstype4, numperstype5, 
numperstype6, currentops, futureops, dayssincelast, $f109, jobid0, reportno0, 
boptestdate, bopfunctiontestdate], build=[right])\n:- Calc(select=[jobid, 
reportno, reportdate, depthprogress, numperstype1, numperstype2, numperstype3, 
numperstype4, numperstype5, numperstype6, currentops, futureops, dayssincelast, 
bigint(reportno) AS $f109])\n:  +- HashJoin(joinType=[LeftOuterJoin], 
where=[((jobid = jobid0) AND ($f94 = reportno0))], select=[jobid, reportno, 
reportdate, depthprogress, numperstype1, numperstype2, numperstype3, 
numperstype4, numperstype5, numperstype6, currentops, futureops, $f94, jobid0, 
reportno0, dayssincelast], build=[right])\n:     :- 
Calc(select=[jobid, reportno, reportdate, depthprogress, numperstype1, 
numperstype2, numperstype3, numperstype4, numperstype5, numperstype6, 
currentops, futureops, bigint(reportno) AS $f94])\n:     :  
+- [#3] Exchange(distribution=[hash[jobid]])\n:     +- [#2] 
Exchange(distribution=[hash[jobid]])\n+- [#1] 
Exchange(distribution=[hash[jobid]])\n]) -> Calc(select=[jobid AS $f0, 
reportno AS $f1, dayssincelast AS $f2, depthprogress AS $f3, currentops AS $f4, 
futureops AS $f5, reportdate AS $f6, numperstype1 AS $f7, numperstype2 AS $f8, 
numperstype3 AS $f9, numperstype4 AS $f10, numperstype5 AS $f11, numperstype6 
AS $f12, boptestdate AS $f13, bopfunctiontestdate AS $f14]) -> 
HashAggregate(isMerge=[false], groupBy=[$f0, $f1, $f2, $f3, $f4, $f5, $f6, $f7, 
$f8, $f9, $f10, $f11, $f12], select=[$f0, $f1, $f2, $f3, $f4, $f5, $f6, $f7, 
$f8, $f9, $f10, $f11, $f12, MAX($f13) AS lasboptestdate, MAX($f14) AS 
lasbopfunctiontestdate]) -> Calc(select=[$f0 AS jobid, $f1 AS reportno, 
string($f6) AS reportdate, bigint((((((nvl($f7, 0) + nvl($f8, 0)) + nvl($f9, 
0)) + nvl($f10, 0)) + nvl($f11, 0)) + nvl($f12, 0))) AS pobcnt, $f2 AS 
dayssincelast, $f3 AS depthprogress, $f4 AS currentops, $f5 AS futureops, 
lasboptestdate, lasbopfunctiontestdate]) -> Map -> Sink: Unnamed (1/1) 
(3c555cbd6bf411a6111cf7eaab527d33) switched from CANCELING to CANCELED.
2022-01-23 04:31:39,570 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph      
 [] - Discarding the results produced by task execution 
3c555cbd6bf411a6111cf7eaab527d33.
2022-01-23 04:32:09,578 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph      
 [] - Source: HiveSource-cosldatacenter.ods_rimdrill_dailyincidentsevents 
-> Calc(select=[jobid, reportno, dayssincelast], where=[(idno = 1:BIGINT)]) 
(1/1) (7533d77baa7eb16e8242ae63e0706dff) switched from CANCELING to CANCELED.
2022-01-23 04:32:09,579 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Clearing resource requirements of job b2f265c16fd565ef75edbd79e064b1a9
2022-01-23 04:32:09,579 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph      
 [] - Job 75a7a9e49181415ba817eb945e2e8080_product_143 
(b2f265c16fd565ef75edbd79e064b1a9) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_301]
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_301]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_301]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_301]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.11-1.13.2.jar:1.13.2]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.11-1.13.2.jar:1.13.2]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.11-1.13.2.jar:1.13.2]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.11-1.13.2.jar:1.13.2]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
[flink-dist_2.11-1.13.2.jar:1.13.2]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.13.2.jar:1.13.2]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.11-1.13.2.jar:1.13.2]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
[flink-dist_2.11-1.13.2.jar:1.13.2]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.11-1.13.2.jar:1.13.2]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.11-1.13.2.jar:1.13.2]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.11-1.13.2.jar:1.13.2]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.11-1.13.2.jar:1.13.2]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.11-1.13.2.jar:1.13.2]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.11-1.13.2.jar:1.13.2]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.13.2.jar:1.13.2]
Caused by: java.lang.RuntimeException
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:93)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:188)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
 ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
 ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:128)
 ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) 
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_301]
Caused by: java.lang.NullPointerException
        at 
org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:118) 
~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204)
 ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103)
 ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48)
 ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:130)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:93)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:188)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
 ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
 ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:128)
 ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
 ~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) 
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_301]

回复