[ 
https://issues.apache.org/jira/browse/FLINK-24923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ozan closed FLINK-24923.
------------------------
    Resolution: Fixed

Enable SSL/TLS for internal communications

> Flink v1.13.2 restarts itself while Tenable Nessus Vulnerability scans the 
> machines
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-24923
>                 URL: https://issues.apache.org/jira/browse/FLINK-24923
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.13.2
>            Reporter: ozan
>            Priority: Blocker
>
> Every day at the same time (1:00 AM), we are scanning all the machines with 
> nessus. But only flink machines fails and restart itself.
> We are using flink v.1.13.2, with java 8
> I have also opened stackoverflow issue, but it was not solved.
> In the log: 1.2.3.4 is the nessus scanner ip address:
> {code:java}
> 2021-11-16 01:02:25,020 WARN  akka.remote.transport.netty.NettyTransport      
>              [] - Remote connection to [/1.2.3.4:52128] failed with 
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
>  Adjusted frame length exceeds 10485760: 1195725860 - discarded
> 2021-11-16 01:02:25,021 WARN  akka.remote.transport.netty.NettyTransport      
>              [] - Remote connection to [/1.2.3.4:59658] failed with 
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
>  Adjusted frame length exceeds 10485760: 1195725860 - discarded
> 2021-11-16 01:02:27,872 INFO  org.apache.kafka.clients.FetchSessionHandler    
>              [] - [Consumer clientId=consumer-kafkaGroup-15, 
> groupId=kafkaGroup] Node 5 was unable to process the fetch request with 
> (sessionId=715318048, epoch=18189): FETCH_SESSION_ID_NOT_FOUND.
> 2021-11-16 01:02:28,837 INFO  org.apache.kafka.clients.FetchSessionHandler    
>              [] - [Consumer clientId=consumer-kafkaGroup-14, 
> groupId=kafkaGroup] Node 7 was unable to process the fetch request with 
> (sessionId=1922249004, epoch=18126): FETCH_SESSION_ID_NOT_FOUND.
> 2021-11-16 01:02:29,415 INFO  org.apache.kafka.clients.FetchSessionHandler    
>              [] - [Consumer clientId=consumer-kafkaGroup-12, 
> groupId=kafkaGroup] Node 5 was unable to process the fetch request with 
> (sessionId=511071171, epoch=18261): FETCH_SESSION_ID_NOT_FOUND.
> 2021-11-16 01:02:33,006 ERROR 
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue [] - 
> Encountered error while consuming partitions
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
>  readAddress(..) failed: Connection reset by peer
> 2021-11-16 01:02:33,062 WARN  akka.remote.transport.netty.NettyTransport      
>              [] - Remote connection to [/1.2.3.4:56542] failed with 
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
>  Adjusted frame length exceeds 10485760: 1224736772 - discarded
> 2021-11-16 01:02:33,063 WARN  akka.remote.transport.netty.NettyTransport      
>              [] - Remote connection to [/1.2.3.4:35858] failed with 
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
>  Adjusted frame length exceeds 10485760: 1224736772 - discarded
> 2021-11-16 01:02:33,069 ERROR akka.actor.OneForOneStrategy                    
>              [] - Error while decoding incoming Akka PDU of length: 22
> akka.remote.transport.AkkaProtocolException: Error while decoding incoming 
> Akka PDU of length: 22
> Caused by: akka.remote.transport.PduCodecException: Decoding PDU failed.
>     at 
> akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:174) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:658)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:412)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:375)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.processEvent(FSM.scala:684) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:678) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:672) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
> Caused by: akka.protobuf.InvalidProtocolBufferException: Protocol message 
> contained an invalid tag (zero).
>     at 
> akka.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:93)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.CodedInputStream.readTag(CodedInputStream.java:112) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:8964) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:8928) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:9024)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:9019)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:145) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:181) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:192) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:197) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:53) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.parseFrom(WireFormats.java:9142) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:175) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:658)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:412)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:375)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.processEvent(FSM.scala:684) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:678) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:672) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 2021-11-16 01:02:33,089 ERROR akka.actor.OneForOneStrategy                    
>              [] - Error while decoding incoming Akka PDU of length: 22
> akka.remote.transport.AkkaProtocolException: Error while decoding incoming 
> Akka PDU of length: 22
> Caused by: akka.remote.transport.PduCodecException: Decoding PDU failed.
>     at 
> akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:174) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:658)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:412)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:375)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.processEvent(FSM.scala:684) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:678) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:672) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> Caused by: akka.protobuf.InvalidProtocolBufferException: Protocol message 
> contained an invalid tag (zero).
>     at 
> akka.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:93)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.CodedInputStream.readTag(CodedInputStream.java:112) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:8964) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:8928) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:9024)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:9019)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:145) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:181) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:192) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:197) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:53) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.parseFrom(WireFormats.java:9142) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:175) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:658)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:412)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:375)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.processEvent(FSM.scala:684) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:678) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:672) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 2021-11-16 01:02:33,080 ERROR akka.actor.OneForOneStrategy                    
>              [] - Error while decoding incoming Akka PDU of length: 64
> akka.remote.transport.AkkaProtocolException: Error while decoding incoming 
> Akka PDU of length: 64
> Caused by: akka.remote.transport.PduCodecException: Decoding PDU failed.
>     at 
> akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:174) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:658)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:412)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:375)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.processEvent(FSM.scala:684) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:678) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:672) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
> Caused by: akka.protobuf.InvalidProtocolBufferException: Protocol message 
> contained an invalid tag (zero).
>     at 
> akka.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:93)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.CodedInputStream.readTag(CodedInputStream.java:112) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:8964) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:8928) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:9024)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:9019)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:145) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:181) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:192) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:197) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:53) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.parseFrom(WireFormats.java:9142) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:175) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:658)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:412)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:375)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.processEvent(FSM.scala:684) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:678) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:672) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 2021-11-16 01:02:33,098 ERROR akka.actor.OneForOneStrategy                    
>              [] - Error while decoding incoming Akka PDU of length: 64
> akka.remote.transport.AkkaProtocolException: Error while decoding incoming 
> Akka PDU of length: 64
> Caused by: akka.remote.transport.PduCodecException: Decoding PDU failed.
>     at 
> akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:174) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:658)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:412)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:375)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.processEvent(FSM.scala:684) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:678) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:672) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> Caused by: akka.protobuf.InvalidProtocolBufferException: Protocol message 
> contained an invalid tag (zero).
>     at 
> akka.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:93)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.CodedInputStream.readTag(CodedInputStream.java:112) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:8964) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:8928) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:9024)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:9019)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:145) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:181) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:192) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:197) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:53) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.parseFrom(WireFormats.java:9142) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:175) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:658)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:412)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:375)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.processEvent(FSM.scala:684) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:678) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:672) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 2021-11-16 01:02:33,123 WARN  akka.remote.transport.netty.NettyTransport      
>              [] - Remote connection to [/1.2.3.4:35898] failed with 
> java.io.IOException: Connection reset by peer
> 2021-11-16 01:02:33,123 ERROR 
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue [] - 
> Encountered error while consuming partitions
> org.apache.flink.shaded.netty4.io.netty.handler.codec.DecoderException: 
> java.lang.IllegalStateException: Network stream corrupted: received incorrect 
> magic number.
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:471)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: java.lang.IllegalStateException: Network stream corrupted: 
> received incorrect magic number.
>     at 
> org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:210)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:332)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     ... 14 more
> 2021-11-16 01:02:33,123 ERROR 
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue [] - 
> Encountered error while consuming partitions
> org.apache.flink.shaded.netty4.io.netty.handler.codec.DecoderException: 
> java.lang.IllegalStateException: Network stream corrupted: received incorrect 
> magic number.
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:471)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: java.lang.IllegalStateException: Network stream corrupted: 
> received incorrect magic number.
>     at 
> org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:210)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:332)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     ... 14 more
> 2021-11-16 01:02:33,127 ERROR 
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue [] - 
> Encountered error while consuming partitions
> org.apache.flink.shaded.netty4.io.netty.handler.codec.CorruptedFrameException:
>  Adjusted frame length (0) is less than lengthFieldEndOffset: 4
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder.failOnFrameLengthLessThanLengthFieldEndOffset(LengthFieldBasedFrameDecoder.java:358)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:415)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.io.network.netty.NettyMessage$NettyMessageDecoder.decode(NettyMessage.java:201)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:332)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:404)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:371)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at java.lang.Thread.run(Thread.java:829) [?:?]
> 2021-11-16 01:02:33,164 WARN  akka.remote.transport.netty.NettyTransport      
>              [] - Remote connection to [/1.2.3.4:56838] failed with 
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
>  Adjusted frame length exceeds 10485760: 1212501076 - discarded
> 2021-11-16 01:02:33,164 WARN  akka.remote.transport.netty.NettyTransport      
>              [] - Remote connection to [/1.2.3.4:36148] failed with 
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
>  Adjusted frame length exceeds 10485760: 1212501076 - discarded
> 2021-11-16 01:02:37,009 ERROR 
> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue [] - 
> Encountered error while consuming partitions
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
>  readAddress(..) failed: Connection reset by peer 
> {code}
>  
> *------------------------------------------------------------------------------------*
>  
>  * *At the same time, here are the logs inside the standalonesession.log:*
>  
>  
> {code:java}
> 2021-11-16 01:02:36,256 WARN  akka.remote.transport.netty.NettyTransport      
>              [] - Remote connection to [/1.2.3.5:45502] failed with 
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
>  Adjusted frame length exceeds 10485760: 1195725860 - discarded
> 2021-11-16 01:02:36,256 WARN  akka.remote.transport.netty.NettyTransport      
>              [] - Remote connection to [/1.2.3.5:38880] failed with 
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
>  Adjusted frame length exceeds 10485760: 1195725860 - discarded
> 2021-11-16 01:02:36,245 ERROR 
> org.apache.flink.runtime.blob.BlobServerConnection           [] - Error while 
> executing BLOB connection.
> java.io.IOException: Unknown operation 71
>     at 
> org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:116)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
> 2021-11-16 01:02:44,246 WARN  akka.remote.transport.netty.NettyTransport      
>              [] - Remote connection to [/1.2.3.5:55498] failed with 
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
>  Adjusted frame length exceeds 10485760: 1224736772 - discarded
> 2021-11-16 01:02:44,252 ERROR akka.actor.OneForOneStrategy                    
>              [] - Error while decoding incoming Akka PDU of length: 64
> akka.remote.transport.AkkaProtocolException: Error while decoding incoming 
> Akka PDU of length: 64
> Caused by: akka.remote.transport.PduCodecException: Decoding PDU failed.
>     at 
> akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:174) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:658)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:412)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:375)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.processEvent(FSM.scala:684) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:678) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:672) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
> Caused by: akka.protobuf.InvalidProtocolBufferException: Protocol message 
> contained an invalid tag (zero).
>     at 
> akka.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:93)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.CodedInputStream.readTag(CodedInputStream.java:112) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:8964) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:8928) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:9024)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:9019)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:145) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:181) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:192) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:197) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:53) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.parseFrom(WireFormats.java:9142) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:175) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:658)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:412)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:375)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.processEvent(FSM.scala:684) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:678) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:672) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 2021-11-16 01:02:44,256 ERROR akka.actor.OneForOneStrategy                    
>              [] - Error while decoding incoming Akka PDU of length: 22
> akka.remote.transport.AkkaProtocolException: Error while decoding incoming 
> Akka PDU of length: 22
> Caused by: akka.remote.transport.PduCodecException: Decoding PDU failed.
>     at 
> akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:174) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:658)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:412)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:375)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.processEvent(FSM.scala:684) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:678) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:672) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
> Caused by: akka.protobuf.InvalidProtocolBufferException: Protocol message 
> contained an invalid tag (zero).
>     at 
> akka.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:93)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.CodedInputStream.readTag(CodedInputStream.java:112) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:8964) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:8928) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:9024)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:9019)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:145) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:181) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:192) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:197) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:53) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.parseFrom(WireFormats.java:9142) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:175) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:658)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:412)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:375)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.processEvent(FSM.scala:684) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:678) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:672) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 2021-11-16 01:02:44,280 WARN  akka.remote.transport.netty.NettyTransport      
>              [] - Remote connection to [/1.2.3.5:48956] failed with 
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
>  Adjusted frame length exceeds 10485760: 1224736772 - discarded
> 2021-11-16 01:02:44,303 ERROR 
> org.apache.flink.runtime.blob.BlobServerConnection           [] - Error while 
> executing BLOB connection.
> java.io.IOException: Unknown operation 73
>     at 
> org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:116)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
> 2021-11-16 01:02:44,310 ERROR akka.actor.OneForOneStrategy                    
>              [] - Error while decoding incoming Akka PDU of length: 64
> akka.remote.transport.AkkaProtocolException: Error while decoding incoming 
> Akka PDU of length: 64
> Caused by: akka.remote.transport.PduCodecException: Decoding PDU failed.
>     at 
> akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:174) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:658)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:412)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:375)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.processEvent(FSM.scala:684) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:678) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:672) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> Caused by: akka.protobuf.InvalidProtocolBufferException: Protocol message 
> contained an invalid tag (zero).
>     at 
> akka.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:93)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.CodedInputStream.readTag(CodedInputStream.java:112) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:8964) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:8928) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:9024)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:9019)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:145) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:181) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:192) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:197) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:53) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.parseFrom(WireFormats.java:9142) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:175) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:658)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:412)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:375)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.processEvent(FSM.scala:684) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:678) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:672) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 2021-11-16 01:02:44,314 ERROR akka.actor.OneForOneStrategy                    
>              [] - Error while decoding incoming Akka PDU of length: 22
> akka.remote.transport.AkkaProtocolException: Error while decoding incoming 
> Akka PDU of length: 22
> Caused by: akka.remote.transport.PduCodecException: Decoding PDU failed.
>     at 
> akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:174) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:658)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:412)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:375)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.processEvent(FSM.scala:684) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:678) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:672) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> Caused by: akka.protobuf.InvalidProtocolBufferException: Protocol message 
> contained an invalid tag (zero).
>     at 
> akka.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:93)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.CodedInputStream.readTag(CodedInputStream.java:112) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:8964) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:8928) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:9024)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:9019)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:145) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:181) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:192) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:197) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.protobuf.AbstractParser.parseFrom(AbstractParser.java:53) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.WireFormats$AkkaProtocolMessage.parseFrom(WireFormats.java:9142) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:175) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:658)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:412)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor$$anonfun$3.applyOrElse(AkkaProtocolTransport.scala:375)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.processEvent(FSM.scala:684) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:678) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:672) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> 2021-11-16 01:02:44,311 ERROR 
> org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler [] 
> - Caught exception
> java.io.IOException: Connection reset by peer
>     at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:?]
>     at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:?]
>     at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276) ~[?:?]
>     at sun.nio.ch.IOUtil.read(IOUtil.java:233) ~[?:?]
>     at sun.nio.ch.IOUtil.read(IOUtil.java:223) ~[?:?]
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356) ~[?:?]
>     at 
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at java.lang.Thread.run(Thread.java:829) [?:?]
> 2021-11-16 01:02:44,342 ERROR 
> org.apache.flink.runtime.blob.BlobServerConnection           [] - PUT 
> operation failed
> java.lang.IllegalArgumentException: Invalid BLOB addressing for permanent 
> BLOBs
>     at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:334)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:110)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
> 2021-11-16 01:02:44,345 ERROR 
> org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler [] 
> - Caught exception
> java.io.IOException: Connection reset by peer
>     at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:?]
>     at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:?]
>     at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276) ~[?:?]
>     at sun.nio.ch.IOUtil.read(IOUtil.java:233) ~[?:?]
>     at sun.nio.ch.IOUtil.read(IOUtil.java:223) ~[?:?]
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356) ~[?:?]
>     at 
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at java.lang.Thread.run(Thread.java:829) [?:?]
> 2021-11-16 01:02:44,817 ERROR 
> org.apache.flink.runtime.blob.BlobServerConnection           [] - PUT 
> operation failed
> java.lang.IllegalArgumentException: Invalid BLOB addressing for permanent 
> BLOBs
>     at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) 
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:334)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:110)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
> 2021-11-16 01:02:44,875 ERROR 
> org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler [] 
> - Caught exception
> java.io.IOException: Connection reset by peer
>     at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:?]
>     at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:?]
>     at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276) ~[?:?]
>     at sun.nio.ch.IOUtil.read(IOUtil.java:233) ~[?:?]
>     at sun.nio.ch.IOUtil.read(IOUtil.java:223) ~[?:?]
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356) ~[?:?]
>     at 
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at java.lang.Thread.run(Thread.java:829) [?:?]
> 2021-11-16 01:02:44,903 WARN  akka.remote.transport.netty.NettyTransport      
>              [] - Remote connection to [/1.2.3.5:56582] failed with 
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
>  Adjusted frame length exceeds 10485760: 1212501076 - discarded
> 2021-11-16 01:02:44,916 WARN  akka.remote.transport.netty.NettyTransport      
>              [] - Remote connection to [/1.2.3.5:49988] failed with 
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
>  Adjusted frame length exceeds 10485760: 1212501076 - discarded
> 2021-11-16 01:02:44,925 ERROR 
> org.apache.flink.runtime.blob.BlobServerConnection           [] - Error while 
> executing BLOB connection.
> java.io.IOException: Unknown operation 72
>     at 
> org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:116)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
> 2021-11-16 01:02:46,149 ERROR 
> org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler [] 
> - Caught exception
> java.io.IOException: Connection reset by peer
>     at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:?]
>     at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:?]
>     at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276) ~[?:?]
>     at sun.nio.ch.IOUtil.read(IOUtil.java:233) ~[?:?]
>     at sun.nio.ch.IOUtil.read(IOUtil.java:223) ~[?:?]
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356) ~[?:?]
>     at 
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at java.lang.Thread.run(Thread.java:829) [?:?]
> 2021-11-16 01:02:49,630 WARN  akka.remote.transport.netty.NettyTransport      
>              [] - Remote connection to [/1.2.3.5:59724] failed with 
> org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException:
>  Adjusted frame length exceeds 10485760: 50331671 - discarded
> 2021-11-16 01:02:51,250 ERROR 
> org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler [] 
> - Caught exception
> java.io.IOException: Connection reset by peer
>     at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:?]
>     at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:?]
>     at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276) ~[?:?]
>     at sun.nio.ch.IOUtil.read(IOUtil.java:233) ~[?:?]
>     at sun.nio.ch.IOUtil.read(IOUtil.java:223) ~[?:?]
>     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356) ~[?:?]
>     at 
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
>  ~[flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  [flink-dist_2.11-1.13.2.jar:1.13.2]
>     at java.lang.Thread.run(Thread.java:829) [?:?]
> 2021-11-16 01:02:51,285 WARN  
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   [] - Unhandled 
> exception
> java.io.IOException: Connection reset by peer {code}
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to