一、背景 1、在standalone部署模式下,分别有1个jobmanager和1个taskmanager 2、网络环境要求按端口实际使用开放进出(非NAT) 3、依据flink最新文档要求,有关address和port的设置都已作固定设置 如:jobmanager(cdc-master 192.168.10.11):18081(web ui端口);16123(rpc端口);50101(metrics端口);51101(blob端口) taskmanager(cdc-worker 192.168.10.12):19123(data端口);17123(rpc端口);50101(metrics端口) 4、flink使用场景为CDC,CDC操作由apache paimon负责写入到hdfs/hive metastore中;后通过sql-client在streaming模式下进行select流读测试 具体流读操作文档:https://paimon.apache.org/docs/master/how-to/querying-tables/ *flink版本为1.17.0,flink-cdc-mysql为2.3.0,flink-paimon为0.5
二、问题记录 1、在开启了sql-client的select显示模式下,通过修改paimon cdc任务监听的对应数据库表的数据后,sql-client中并没有出现任何增量数据显示(不管过了多久) 三、问题调查 1、apache paimon任务正常运行,hdfs中相关文件均已生成;jobmanager的web ui中的paimon cdc job也没有Exception出现 2、修改sql-client为batch模式后,界面中也能查询到数据变更后的最新数据 3、开启flink日志的DEBUG级别,重新运行任务后发现以下情况: 3.1、在jobmanager的log文件夹中的flink-xxx-standalonesession-0-xxx.log中存在以下报错信息: 2023-06-02 12:42:37,577 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Collect table sink (1/1) (ac56691af108802708be4598061d14b3_9dd63673dd41ea021b896d5203f3ba7c_0_0) switched from DEPLOYING to INITIALIZING. 2023-06-02 12:42:37,589 INFO org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator [] - Received sink socket server address: cdc-worker/192.168.10.12:37824 2023-06-02 12:42:37,590 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Collect table sink (1/1) (ac56691af108802708be4598061d14b3_9dd63673dd41ea021b896d5203f3ba7c_0_0) switched from INITIALIZING to RUNNING. 2023-06-02 12:42:37,615 DEBUG org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator [] - Collect sink coordinator encounters an exception java.net.NoRouteToHostException: 没有到主机的路由 (Host unreachable) at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:?] at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412) ~[?:?] at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255) ~[?:?] at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237) ~[?:?] at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:?] at java.net.Socket.connect(Socket.java:608) ~[?:?] at java.net.Socket.connect(Socket.java:557) ~[?:?] at org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator.handleRequestImpl(CollectSinkOperatorCoordinator.java:132) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator.lambda$handleCoordinationRequest$0(CollectSinkOperatorCoordinator.java:111) ~[flink-dist-1.17.0.jar:1.17.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:834) [?:?] 2023-06-02 12:42:37,701 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Collect table sink (1/1) (ac56691af108802708be4598061d14b3_9dd63673dd41ea021b896d5203f3ba7c_0_0) switched from RUNNING to FINISHED. 2023-06-02 12:42:37,701 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Ignoring transition of vertex Sink: Collect table sink (1/1) - execution #0 to FAILED while being FINISHED. 3.2、通过报错信息怀疑存在网络问题,因为目前存在严格ACL控制,所以大概率是存在未知端口的情况 4、看日志问题集中在CollectSinkOperatorCoordinator上,且该日志是jobmanager的,怀疑是存在jobmanager请求taskmanager的情况,但不清楚为何存在这种情况 5、查看CollectSinkOperatorCoordinator#handleRequestImpl源码后发现存在socket连接的情况,重点就在于该sinkAddress是如何被赋值的;查看其调用情况后发现该值是被同类中handleEventFromOperator方法赋值: address = ((CollectSinkAddressEvent) event).getAddress(); LOG.info("Received sink socket server address: " + address); 接着通过arthas查看其运行时的堆栈情况(watch/stack),发现该address在每次新建collect(流读)任务时均会有所变化,且方法调用链还与akka相关;因对该任务的新建流程尚不熟悉,所以调查了该调用链的所有相关类信息,最终在其中发现CollectSinkFunction类中存在ServerSocket(其实应该更早明白到跟socket是共同存在的,直接在flink中全局搜索相关信息能更快定位),同时使用关键字“ServerSocket”查询flink中所有相关项,发现只有该类存在使用ServerSocket,并进一步发现其存在原因 https://github.com/apache/flink/pull/12069 四、问题处理 1、目前通过修改ServerSocket初始化时的端口,重新编译后(flink master中的依然为0即系统随机一个端口),解决了严格ACL下sql-client流读的问题 五、建议 1、flink中还有类似情况存在吗?即需要网络通讯但又没有在文档中说明的 2、明白到通常集群都是可信的,但某些情况下并不都能完全放开,且该功能又偏调试性质(但个人认为还算比较重要),能否像其他有关network address/port一样通过配置文件指定?若能优化就更好了,如果可以的话请assign me(新手~)