mark. 我也是第一个问题,暂时无解。
chaiyi <chaiyi22201...@163.com> 于2021年3月22日周一 下午12:28写道: > > 你好: > 最近建立一个3台机子的flink集群,版本是 zk-3.6.2 + hadoop-3.3.0 + > flink-1.11.2。3台机制是在同一个物理机上建立的虚拟机,应该来说不会出现网络波动导致的网络拒绝,但是为什么一直会出现网络拒绝 > 项目在运行一段时间以后,短则几个小时,长则3到5天,任务就会挂掉,一共出现了一下3种异常,全是网络连接方法的,请帮忙看看,是不是flink网络配置方面有问题。 > 1. 集群之间通信连接拒绝: > 2021-03-03 08:50:42,851 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > Window(ProcessingTimeSessionWindows(90000), ProcessingTimeTrigger, > FlightTrackAggregate, FlightTrackSectorResult) -> Sink: Unnamed (4/4) > (3097c00c09b475b35c23782a3b4a8eaa) switched from RUNNING to FAILED on > org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@4df09503. > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: > readAddress(..) failed: Connection reset by peer (connection to > '10.100.1.222/10.100.1.222:43156') > at > org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:297) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:276) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:268) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1388) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:297) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:276) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:918) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:730) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:820) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:424) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:326) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_152] > Caused by: > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: > readAddress(..) failed: Connection reset by peer > > > 2. 连接到ZK的请求失败, > 2021-03-02 20:27:13,487 INFO > org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - > Unable to read additional data from server sessionid 0x30018710a580007, > likely server has closed socket, closing socket connection and attempting > reconnect > 2021-03-02 20:27:13,588 INFO > org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager > [] - State change: SUSPENDED > 2021-03-02 20:27:13,590 WARN > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService [] - > Connection to ZooKeeper suspended. The contender LeaderContender: > DefaultDispatcherRunner no longer participates in the leader election. > 2021-03-02 20:27:13,591 WARN > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - > Connection to ZooKeeper suspended. Can no longer retrieve the leader from > ZooKeeper. > 2021-03-02 20:27:13,591 WARN > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService [] - > Connection to ZooKeeper suspended. The contender LeaderContender: > StandaloneResourceManager no longer participates in the leader election. > 2021-03-02 20:27:13,591 WARN > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService [] - > Connection to ZooKeeper suspended. The contender http://flink-02:8081 no > longer participates in the leader election. > 2021-03-02 20:27:13,591 WARN > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - > Connection to ZooKeeper suspended. Can no longer retrieve the leader from > ZooKeeper. > 2021-03-02 20:27:13,830 WARN > org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - SASL > configuration failed: javax.security.auth.login.LoginException: No JAAS > configuration section named 'Client' was found in specified JAAS > configuration file: '/tmp/jaas-7735477848930862421.conf'. Will continue > connection to Zookeeper server without SASL authentication, if Zookeeper > server allows it. > 2021-03-02 20:27:13,830 INFO > org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - > Opening socket connection to server flink-02/10.100.1.222:2181 > 2021-03-02 20:27:13,831 ERROR > org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState [] - > Authentication failed > 2021-03-02 20:27:13,831 INFO > org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - > Socket connection established to flink-02/10.100.1.222:2181, initiating > session > 2021-03-02 20:27:13,835 INFO > org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - > Session establishment complete on server flink-02/10.100.1.222:2181, > sessionid = 0x30018710a580007, negotiated timeout = 40000 > 2021-03-02 20:27:13,835 INFO > org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager > [] - State change: RECONNECTED > 2021-03-02 20:27:13,841 INFO > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService [] - > Connection to ZooKeeper was reconnected. Leader election can be restarted. > 2021-03-02 20:27:13,842 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - > Connection to ZooKeeper was reconnected. Leader retrieval can be restarted. > 2021-03-02 20:27:13,842 INFO > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService [] - > Connection to ZooKeeper was reconnected. Leader election can be restarted. > 2021-03-02 20:27:13,843 INFO > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService [] - > Connection to ZooKeeper was reconnected. Leader election can be restarted. > 2021-03-02 20:27:13,843 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - > Connection to ZooKeeper was reconnected. Leader retrieval can be restarted. > > > > > 3.连接到RabbitMQ的连接超时 > 2021-02-07 18:12:39,761 ERROR > com.rabbitmq.client.impl.ForgivingExceptionHandler [] - An > unexpected connection driver error occured > com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with > heartbeat = 60 seconds > at > com.rabbitmq.client.impl.AMQConnection.handleSocketTimeout(AMQConnection.java:732) > > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:651) > [blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) > [blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at > com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581) > [blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251] > 2021-02-07 18:12:40,764 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask [] - Error > during disposal of stream operator. > com.rabbitmq.client.AlreadyClosedException: connection is already closed due > to connection error; cause: com.rabbitmq.client.MissedHeartbeatException: > Heartbeat missing with heartbeat = 60 seconds > at > com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:875) > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:991) > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:920) > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:904) > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:896) > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at > com.rabbitmq.client.impl.recovery.AutorecoveringConnection.close(AutorecoveringConnection.java:216) > > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at > org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:182) > > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:729) > [flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:645) > [flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549) > [flink-dist_2.11-1.11.2.jar:1.11.2] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > [flink-dist_2.11-1.11.2.jar:1.11.2] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > [flink-dist_2.11-1.11.2.jar:1.11.2] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251] > 2021-02-07 18:12:40,766 WARN org.apache.flink.runtime.taskmanager.Task > [] - Source: Custom Source (1/1) > (912e629ede54920b0689fcf0b2398a7b) switched from RUNNING to FAILED. > java.lang.RuntimeException: Messages could not be acknowledged during > checkpoint creation. > at > org.apache.flink.streaming.connectors.rabbitmq.RMQSource.acknowledgeSessionIDs(RMQSource.java:236) > > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at > org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.acknowledgeIDs(MultipleIdsMessageAcknowledgingSourceBase.java:122) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.notifyCheckpointComplete(MessageAcknowledgingSourceBase.java:239) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:107) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:283) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:987) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$10(StreamTask.java:958) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$12(StreamTask.java:974) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536) > ~[flink-dist_2.11-1.11.2.jar:1.11.2] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > [flink-dist_2.11-1.11.2.jar:1.11.2] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > [flink-dist_2.11-1.11.2.jar:1.11.2] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251] > Caused by: java.io.IOException > at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:124) > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:120) > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:142) > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:1343) > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at > com.rabbitmq.client.impl.recovery.AutorecoveringChannel.txCommit(AutorecoveringChannel.java:503) > > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at > org.apache.flink.streaming.connectors.rabbitmq.RMQSource.acknowledgeSessionIDs(RMQSource.java:234) > > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > ... 18 more > Caused by: com.rabbitmq.client.ShutdownSignalException: connection error > at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) > > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:443) > > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:263) > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:136) > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:1343) > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at > com.rabbitmq.client.impl.recovery.AutorecoveringChannel.txCommit(AutorecoveringChannel.java:503) > > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at > org.apache.flink.streaming.connectors.rabbitmq.RMQSource.acknowledgeSessionIDs(RMQSource.java:234) > > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > ... 18 more > Caused by: com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing > with heartbeat = 60 seconds > at > com.rabbitmq.client.impl.AMQConnection.handleSocketTimeout(AMQConnection.java:732) > > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:651) > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > at > com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581) > ~[blob_p-1163a4c4e2f3886afc28a6972515e3fe1f2e0de4-226c4221bed34fa888edc5da88c55110:?] > > > Flink 配置文件如下: > jobmanager.rpc.address: localhost > jobmanager.rpc.port: 6123 > jobmanager.memory.process.size: 2048m > taskmanager.memory.process.size: 12288m > taskmanager.memory.jvm-metaspace.size: 256m > taskmanager.memory.managed.fraction: 0 > taskmanager.numberOfTaskSlots: 3 > parallelism.default: 3 > task.cancellation.timeout: 0 > high-availability: zookeeper > high-availability.cluster-id: /track_ns > high-availability.zookeeper.path.root: /flink_track > high-availability.storageDir: hdfs://yfcluster/flink/ha/ > recovery.zookeeper.storageDir: hdfs://yfcluster/zookeeper/ha/ > yarn.application-attempts: 10 > high-availability.zookeeper.quorum: flink-01:2181,flink-02:2181,flink-03:2181 > state.backend: rocksdb > state.checkpoints.dir: hdfs://yfcluster/flink/checkpoints > state.savepoints.dir: hdfs://yfcluster/flink/savepoints > state.backend.incremental: false > jobmanager.execution.failover-strategy: region > state.checkpoints.num-retained: 3 > state.backend.local-recovery: true > taskmanager.state.local.root-dirs: /opt/flink-tm-state > > > > > > > > > > > > > > > > >