Hi!

这看起来像是一个 bug,能否提供一下 hive 表的 DDL 还有运行的 query 语句,这样大家可以更好地调查这个问题?

Asahi Lee <asahi....@qq.com.invalid> 于2022年1月24日周一 09:53写道:

> 2022-01-23 04:31:39,568 INFO&nbsp;
> org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; &nbsp; &nbsp;
> &nbsp;[] - Source:
> HiveSource-cosldatacenter.ods_rimdrill_dailyincidentsevents -&gt;
> Calc(select=[jobid, reportno, dayssincelast], where=[(idno = 1:BIGINT)])
> (1/1) (7533d77baa7eb16e8242ae63e0706dff) switched from RUNNING to CANCELING.
> 2022-01-23 04:31:39,570 INFO&nbsp;
> org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; &nbsp; &nbsp;
> &nbsp;[] - Discarding the results produced by task execution
> 07b2cd514c6b6d85f79ab5b953971f82.
> 2022-01-23 04:31:39,570 INFO&nbsp;
> org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; &nbsp; &nbsp;
> &nbsp;[] - MultipleInput(readOrder=[0,0,1],
> members=[\nHashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND
> ($f109 = reportno0))], select=[jobid, reportno, reportdate, depthprogress,
> numperstype1, numperstype2, numperstype3, numperstype4, numperstype5,
> numperstype6, currentops, futureops, dayssincelast, $f109, jobid0,
> reportno0, boptestdate, bopfunctiontestdate], build=[right])\n:-
> Calc(select=[jobid, reportno, reportdate, depthprogress, numperstype1,
> numperstype2, numperstype3, numperstype4, numperstype5, numperstype6,
> currentops, futureops, dayssincelast, bigint(reportno) AS $f109])\n:&nbsp;
> +- HashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND ($f94 =
> reportno0))], select=[jobid, reportno, reportdate, depthprogress,
> numperstype1, numperstype2, numperstype3, numperstype4, numperstype5,
> numperstype6, currentops, futureops, $f94, jobid0, reportno0,
> dayssincelast], build=[right])\n:&nbsp; &nbsp; &nbsp;:- Calc(select=[jobid,
> reportno, reportdate, depthprogress, numperstype1, numperstype2,
> numperstype3, numperstype4, numperstype5, numperstype6, currentops,
> futureops, bigint(reportno) AS $f94])\n:&nbsp; &nbsp; &nbsp;:&nbsp; +- [#3]
> Exchange(distribution=[hash[jobid]])\n:&nbsp; &nbsp; &nbsp;+- [#2]
> Exchange(distribution=[hash[jobid]])\n+- [#1]
> Exchange(distribution=[hash[jobid]])\n]) -&gt; Calc(select=[jobid AS $f0,
> reportno AS $f1, dayssincelast AS $f2, depthprogress AS $f3, currentops AS
> $f4, futureops AS $f5, reportdate AS $f6, numperstype1 AS $f7, numperstype2
> AS $f8, numperstype3 AS $f9, numperstype4 AS $f10, numperstype5 AS $f11,
> numperstype6 AS $f12, boptestdate AS $f13, bopfunctiontestdate AS $f14])
> -&gt; HashAggregate(isMerge=[false], groupBy=[$f0, $f1, $f2, $f3, $f4, $f5,
> $f6, $f7, $f8, $f9, $f10, $f11, $f12], select=[$f0, $f1, $f2, $f3, $f4,
> $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12, MAX($f13) AS lasboptestdate,
> MAX($f14) AS lasbopfunctiontestdate]) -&gt; Calc(select=[$f0 AS jobid, $f1
> AS reportno, string($f6) AS reportdate, bigint((((((nvl($f7, 0) + nvl($f8,
> 0)) + nvl($f9, 0)) + nvl($f10, 0)) + nvl($f11, 0)) + nvl($f12, 0))) AS
> pobcnt, $f2 AS dayssincelast, $f3 AS depthprogress, $f4 AS currentops, $f5
> AS futureops, lasboptestdate, lasbopfunctiontestdate]) -&gt; Map -&gt;
> Sink: Unnamed (1/1) (3c555cbd6bf411a6111cf7eaab527d33) switched from
> CREATED to CANCELING.
> 2022-01-23 04:31:39,570 INFO&nbsp;
> org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; &nbsp; &nbsp;
> &nbsp;[] - MultipleInput(readOrder=[0,0,1],
> members=[\nHashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND
> ($f109 = reportno0))], select=[jobid, reportno, reportdate, depthprogress,
> numperstype1, numperstype2, numperstype3, numperstype4, numperstype5,
> numperstype6, currentops, futureops, dayssincelast, $f109, jobid0,
> reportno0, boptestdate, bopfunctiontestdate], build=[right])\n:-
> Calc(select=[jobid, reportno, reportdate, depthprogress, numperstype1,
> numperstype2, numperstype3, numperstype4, numperstype5, numperstype6,
> currentops, futureops, dayssincelast, bigint(reportno) AS $f109])\n:&nbsp;
> +- HashJoin(joinType=[LeftOuterJoin], where=[((jobid = jobid0) AND ($f94 =
> reportno0))], select=[jobid, reportno, reportdate, depthprogress,
> numperstype1, numperstype2, numperstype3, numperstype4, numperstype5,
> numperstype6, currentops, futureops, $f94, jobid0, reportno0,
> dayssincelast], build=[right])\n:&nbsp; &nbsp; &nbsp;:- Calc(select=[jobid,
> reportno, reportdate, depthprogress, numperstype1, numperstype2,
> numperstype3, numperstype4, numperstype5, numperstype6, currentops,
> futureops, bigint(reportno) AS $f94])\n:&nbsp; &nbsp; &nbsp;:&nbsp; +- [#3]
> Exchange(distribution=[hash[jobid]])\n:&nbsp; &nbsp; &nbsp;+- [#2]
> Exchange(distribution=[hash[jobid]])\n+- [#1]
> Exchange(distribution=[hash[jobid]])\n]) -&gt; Calc(select=[jobid AS $f0,
> reportno AS $f1, dayssincelast AS $f2, depthprogress AS $f3, currentops AS
> $f4, futureops AS $f5, reportdate AS $f6, numperstype1 AS $f7, numperstype2
> AS $f8, numperstype3 AS $f9, numperstype4 AS $f10, numperstype5 AS $f11,
> numperstype6 AS $f12, boptestdate AS $f13, bopfunctiontestdate AS $f14])
> -&gt; HashAggregate(isMerge=[false], groupBy=[$f0, $f1, $f2, $f3, $f4, $f5,
> $f6, $f7, $f8, $f9, $f10, $f11, $f12], select=[$f0, $f1, $f2, $f3, $f4,
> $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12, MAX($f13) AS lasboptestdate,
> MAX($f14) AS lasbopfunctiontestdate]) -&gt; Calc(select=[$f0 AS jobid, $f1
> AS reportno, string($f6) AS reportdate, bigint((((((nvl($f7, 0) + nvl($f8,
> 0)) + nvl($f9, 0)) + nvl($f10, 0)) + nvl($f11, 0)) + nvl($f12, 0))) AS
> pobcnt, $f2 AS dayssincelast, $f3 AS depthprogress, $f4 AS currentops, $f5
> AS futureops, lasboptestdate, lasbopfunctiontestdate]) -&gt; Map -&gt;
> Sink: Unnamed (1/1) (3c555cbd6bf411a6111cf7eaab527d33) switched from
> CANCELING to CANCELED.
> 2022-01-23 04:31:39,570 INFO&nbsp;
> org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; &nbsp; &nbsp;
> &nbsp;[] - Discarding the results produced by task execution
> 3c555cbd6bf411a6111cf7eaab527d33.
> 2022-01-23 04:32:09,578 INFO&nbsp;
> org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; &nbsp; &nbsp;
> &nbsp;[] - Source:
> HiveSource-cosldatacenter.ods_rimdrill_dailyincidentsevents -&gt;
> Calc(select=[jobid, reportno, dayssincelast], where=[(idno = 1:BIGINT)])
> (1/1) (7533d77baa7eb16e8242ae63e0706dff) switched from CANCELING to
> CANCELED.
> 2022-01-23 04:32:09,579 INFO&nbsp;
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager
> [] - Clearing resource requirements of job b2f265c16fd565ef75edbd79e064b1a9
> 2022-01-23 04:32:09,579 INFO&nbsp;
> org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; &nbsp; &nbsp;
> &nbsp;[] - Job 75a7a9e49181415ba817eb945e2e8080_product_143
> (b2f265c16fd565ef75edbd79e064b1a9) switched from state FAILING to FAILED.
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
>         at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_301]
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_301]
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_301]
>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_301]
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>         at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.13.2.jar:1.13.2]
> Caused by: java.lang.RuntimeException
>         at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:93)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:188)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
> ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
> ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:128)
> ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_301]
> Caused by: java.lang.NullPointerException
>         at
> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:118)
> ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204)
> ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103)
> ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48)
> ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.serializeRecord(RecordWriter.java:130)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at 
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:93)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:188)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
> ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
> ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:128)
> ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>         at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_301]

回复