flink1.11.1使用Table API Hive方言的executSql报错
Hi,all: 本人基于Flink1.11.1的table API使用Hive方言,调用executSql方法后报错,堆栈信息如下: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute sql at org.apache.flink.client.program.PackagedProgram.callMainMethod( PackagedProgram.java:302) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.client.program.PackagedProgram .invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2. 11-1.11.1.jar:1.11.1] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java: 149) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.client.deployment.application. DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.client.deployment.application. DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler .lambda$handleRequest$0(JarRunHandler.java:100) ~[flink-dist_2.11-1.11.1 .jar:1.11.1] at java.util.concurrent.CompletableFuture$AsyncSupply.run( CompletableFuture.java:1604) [?:1.8.0_242] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: 511) [?:1.8.0_242] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_242 ] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask .access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_242] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask .run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_242] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor .java:1149) [?:1.8.0_242] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:624) [?:1.8.0_242] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242] Caused by: org.apache.flink.table.api.TableException: Failed to execute sql at org.apache.flink.table.api.internal.TableEnvironmentImpl .executeInternal(TableEnvironmentImpl.java:747) ~[flink-table-blink_2.11- 1.11.1.jar:1.11.1] at org.apache.flink.table.api.internal.TableEnvironmentImpl .executeOperation(TableEnvironmentImpl.java:1069) ~[flink-table-blink_2.11- 1.11.1.jar:1.11.1] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql( TableEnvironmentImpl.java:690) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.forchange.online.etl.h2h.Prod2Poc.main(Prod2Poc.java:46) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8. 0_242] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl .java:62) ~[?:1.8.0_242] at sun.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_242] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242] at org.apache.flink.client.program.PackagedProgram.callMainMethod( PackagedProgram.java:288) ~[flink-dist_2.11-1.11.1.jar:1.11.1] ... 13 more Caused by: java.lang.IllegalArgumentException: Job client must be a CoordinationRequestGateway. This is a bug. at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java: 139) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher .setJobClient(CollectResultFetcher.java:97) ~[flink-dist_2.11-1.11.1.jar: 1.11.1] at org.apache.flink.streaming.api.operators.collect. CollectResultIterator.setJobClient(CollectResultIterator.java:84) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.table.planner.sinks.SelectTableSinkBase .setJobClient(SelectTableSinkBase.java:81) ~[flink-table-blink_2.11-1.11.1 .jar:1.11.1] at org.apache.flink.table.api.internal.TableEnvironmentImpl .executeInternal(TableEnvironmentImpl.java:737) ~[flink-table-blink_2.11- 1.11.1.jar:1.11.1] at org.apache.flink.table.api.internal.TableEnvironmentImpl .executeOperation(TableEnvironmentImpl.java:1069) ~[flink-table-blink_2.11- 1.11.1.jar:1.11.1] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql( TableEnvironmentImpl.java:690) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.forchange.online.etl.h2h.Prod2Poc.main(Prod2Poc.java:46) ~[?:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8. 0_242] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl .java:62) ~[?:1.8.0_242] at sun.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_242] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242] at org.apache.flink.client.program.PackagedProgram.callMainMethod( PackagedProgram.java:288) ~[flink-dist_2.11-1.11.1.jar:1.11.1] * 核心错误 `Job client must be a CoordinationRequestGateway. This is a bug.` 请问这是一个Bug吗?
使用flink1.11.1的debezium-changelog目前是否不支持Watermark
报错日志: ``` Currently, defining WATERMARK on a changelog source is not supported ```
使用per-job部署成功的flink sql应用但是用applicationMode部署失败,提交到yarn上不到2秒就死掉,并且读取不到日志
``` 14:56:44.536 [main] ERROR org.apache.flink.client.cli.CliFrontend - Error while running the command. org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn Application Cluster at org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:414) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:64) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:197) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:919) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181] at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) [hadoop-common-3.0.0-cdh6.3.2.jar:?] at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) [flink-dist_2.11-1.11.2.jar:1.11.2] ``` 部署会报这种错,通过yarn logs -applicationId xxx去看yarn log,也看不到相关日志
Re: Flink Job 如何集成到自己的系统,方便管理
可以看下flink源码的flink k8s模块,里面的test pachage下有需求flink k8s的使用姿势,希望对你有帮助。 DanielGu <610493...@qq.com> 于2021年3月7日周日 下午4:34写道: > >有的,通过 FLINK 和 YARN 或 k8s > 的接口进行编程,管理元数据,管理用户文件,支持提交作业及之后管理作业状态,这是许多公司应用Flink > 的实现方式。 > > 请问有什么可以参考的资料吗?有相关意愿,不知道从哪里下手,希望整个 flink on k8s > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink集群提交任务挂掉
增大`taskmanager.memory.task.off-heap.size`配置 bowen li 于2021年4月2日周五 上午10:54写道: > Hi,大家好: > 现在我们遇到的场景是这样的,提交任务的时候会报错。我们使用的版本是1.12.1,搭建模式是standalone的。下面是报错信息。 > >java.lang.OutOfMemoryError: Direct buffer memory. The direct > out-of-memory error has occurred. This can mean two things: either job(s) > require(s) a larger size of JVM direct memory or there is a direct memory > leak. The direct memory can be allocated by user code or some of its > dependencies. In this case 'taskmanager.memory.task.off-heap.size' > configuration option should be increased. Flink framework and its > dependencies also consume the direct memory, mostly for network > communication. The most of network memory is managed by Flink and should > not result in out-of-memory error. In certain special cases, in particular > for jobs with high parallelism, the framework may require more direct > memory which is not managed by Flink. In this case > 'taskmanager.memory.framework.off-heap.size' configuration option should be > increased. If the error persists then there is probably a direct memory > leak in user code or some of its dependencies > 这种情况我们需要特别的配置吗?
Re: 非对齐检查点还能保证exactly once语义吗
不可以,会存在重复消费的问题,如果buffer没有对齐的话,job重启,那么这些buffer的数据就会清空,然后相关的subtask会重新消费一遍。 张锴 于2021年8月2日周一 下午6:53写道: > flink最新特性中有非对齐检查点的特性,可以用来解决一些反压下的任务,但如果用了这个,还能保证精确一次吗?对齐的检查点有清晰的快照N~N + > 1之间的边界,这个会将数据混在一起,如何在恢复的时候保证精确一次? >
Re: 非对齐检查点还能保证exactly once语义吗
Hi! 这个有相关的文档介绍吗,1.11版本左右简单了解过exactly once非对齐机制这块,1.13版本的exactly once的非对齐机制貌似没在官方文档上看到 Caizhi Weng 于2021年8月2日周一 下午7:28写道: > Hi! > > shimin huang 说的可能是原本的 at least once 的 checkpoint 机制,这种 checkpoint 原本就是不对齐的。 > > Flink 1.13 完善了 exactly once 条件下的不对齐 checkpoint 机制,因此这是能保证 exactly once > 的。实现原理简单来说就是把还没处理的数据一起写到 state 里,下次恢复的时候把这些还没处理的数据也恢复出来接着处理。 > > 张锴 于2021年8月2日周一 下午7:20写道: > > > 这个原理能说明一下吗,咋做到的 > > > > 东东 于2021年8月2日周一 下午7:16写道: > > > > > 对于每一个subtask,边界仍然是清晰的,所以精确一次可以保证,只不过ck会变大。 > > > > > > 在 2021-08-02 18:53:11,"张锴" 写道: > > > >flink最新特性中有非对齐检查点的特性,可以用来解决一些反压下的任务,但如果用了这个,还能保证精确一次吗?对齐的检查点有清晰的快照N~N > + > > > >1之间的边界,这个会将数据混在一起,如何在恢复的时候保证精确一次? > > > > > >
flink on k8s是否有替代yarn.ship-files的参数
flink version 1.12.0 近期在将flink on yarn迁移至flink on k8s,以前外部的jar包和配置都是通过yarn.skip-files参数来进行配置加载的,想问下k8s是否有类似参数,目前在1.12.0的文档发现没找到类似的,有个 external-resource..yarn.config-key配置,但是没有具体的试用案例,希望有大佬能够解答下有什么好的方式吗
Re: flink on k8s是否有替代yarn.ship-files的参数
external-resource..yarn.config-key这个配置贴错了应该是这个 external-resource..kubernetes.config-key shimin huang 于2022年3月28日周一 20:14写道: > flink version 1.12.0 > > 近期在将flink on yarn迁移至flink on > k8s,以前外部的jar包和配置都是通过yarn.skip-files参数来进行配置加载的,想问下k8s是否有类似参数,目前在1.12.0的文档发现没找到类似的,有个 > external-resource..yarn.config-key配置,但是没有具体的试用案例,希望有大佬能够解答下有什么好的方式吗 >
Re: flink on k8s是否有替代yarn.ship-files的参数
1.12.0没有找到相关的配置,目前考虑测试下pipeline.classpaths指定对应的jars路径是否生效。 Geng Biao 于2022年3月28日周一 20:18写道: > Hi shimin, > 外部jar依赖可以看一下文档里usrlib在flink on k8s里的使用。 > > Best, > Biao > > 获取 Outlook for iOS<https://aka.ms/o0ukef> > ________ > 发件人: shimin huang > 发送时间: Monday, March 28, 2022 8:14:28 PM > 收件人: user-zh@flink.apache.org > 主题: flink on k8s是否有替代yarn.ship-files的参数 > > flink version 1.12.0 > > 近期在将flink on yarn迁移至flink on > > k8s,以前外部的jar包和配置都是通过yarn.skip-files参数来进行配置加载的,想问下k8s是否有类似参数,目前在1.12.0的文档发现没找到类似的,有个 > > external-resource..yarn.config-key配置,但是没有具体的试用案例,希望有大佬能够解答下有什么好的方式吗 >
Re: flink on k8s是否有替代yarn.ship-files的参数
好的 我了解下 感谢! yu'an huang 于2022年3月28日周一 22:12写道: > 你好, > > > 可以看看这个链接中关于usrlib的介绍(Application mode部分)。 > > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/standalone/docker/#docker-hub-flink-images > > Kubernetes不像yarn一样提供了ship文件的功能。对于Kubernetes application mode来说,用户程序是运行在Job > > Manager的,要求所有的artifacts都已经在镜像中存在。Flink会自动将$FLINK_HOME/usrlib目录下的文件都放入用户程序的classpath中,所以你需要按照链接中的方法,创建镜像,将你需要的artifacts提前放到镜像之中。然后在提交命令中指定主类和主类所用的JAR就可以了。 > > > > > On Mon, 28 Mar 2022 at 8:26 PM, shimin huang > wrote: > > > 1.12.0没有找到相关的配置,目前考虑测试下pipeline.classpaths指定对应的jars路径是否生效。 > > > > Geng Biao 于2022年3月28日周一 20:18写道: > > > > > Hi shimin, > > > 外部jar依赖可以看一下文档里usrlib在flink on k8s里的使用。 > > > > > > Best, > > > Biao > > > > > > 获取 Outlook for iOS<https://aka.ms/o0ukef> > > > > > > 发件人: shimin huang > > > 发送时间: Monday, March 28, 2022 8:14:28 PM > > > 收件人: user-zh@flink.apache.org > > > 主题: flink on k8s是否有替代yarn.ship-files的参数 > > > > > > flink version 1.12.0 > > > > > > 近期在将flink on yarn迁移至flink on > > > > > > > > > k8s,以前外部的jar包和配置都是通过yarn.skip-files参数来进行配置加载的,想问下k8s是否有类似参数,目前在1.12.0的文档发现没找到类似的,有个 > > > > > > > > > external-resource..yarn.config-key配置,但是没有具体的试用案例,希望有大佬能够解答下有什么好的方式吗 > > > > > >
flink on k8s NodePort方式提交本人返回的jm的url和容器ip不一致,导致无法访问
flink version: flink 1.13.0
Re: flink on k8s NodePort方式提交本人返回的jm的url和容器ip不一致,导致无法访问
使用Nodeport方式提交的flink任务返回的jobmangaer的web ui地址发现无法访问,这导致正常的flink list命令也无法获取对应 cluster.id下的job,实际返回的jm ip发现是api server的,请问各位有什么好的办法解决这类问题呢 shimin huang 于2022年4月14日周四 17:20写道: > flink version: flink 1.13.0 > >
Re: flink on k8s NodePort方式提交本人返回的jm的url和容器ip不一致,导致无法访问
感谢 我去了解了解 huweihua 于2022年4月14日周四 20:06写道: > 使用 NodePort 会默认使用 api server 的 host + nodeport, 预期 K8S 集群内部所有节点都会转发 > nodeport 的流量,如果无法访问,可能是你使用的 K8S 做了一些封禁,把 NodePort 的流量转发功能禁用了 > > > > 2022年4月14日 下午5:22,shimin huang 写道: > > > > 使用Nodeport方式提交的flink任务返回的jobmangaer的web ui地址发现无法访问,这导致正常的flink > list命令也无法获取对应 > > cluster.id下的job,实际返回的jm ip发现是api server的,请问各位有什么好的办法解决这类问题呢 > > > > shimin huang 于2022年4月14日周四 17:20写道: > > > >> flink version: flink 1.13.0 > >> > >> > >
flink on native k8s NodePort方式启动任务后返回的jobmanager web ui的地址不是集群中的ip
hi,使用flink on native k8s NodePort方式启动任务后返回的jobmanager web ui的地址不是集群中的ip,看k8s底层的部署也没发现具体问题
Re: flink on native k8s NodePort方式启动任务后返回的jobmanager web ui的地址不是集群中的ip
[image: image.png] 具体细节看和flink k8s这块的细节实现有关,不清楚为什么这个externalIPs的第一个ip在我们k8s集群中为什么不存在 shimin huang 于2022年4月15日周五 11:16写道: > hi,使用flink on native k8s NodePort方式启动任务后返回的jobmanager web > ui的地址不是集群中的ip,看k8s底层的部署也没发现具体问题 >
Re: flink on native k8s NodePort方式启动任务后返回的jobmanager web ui的地址不是集群中的ip
private Optional getRestEndPointFromService(Service service, int restPort) { if (service.getStatus() == null) { return Optional.empty(); } LoadBalancerStatus loadBalancer = service.getStatus().getLoadBalancer(); boolean hasExternalIP = service.getSpec() != null && service.getSpec().getExternalIPs() != null && !service.getSpec().getExternalIPs().isEmpty(); if (loadBalancer != null) { return getLoadBalancerRestEndpoint(loadBalancer, restPort); // 理解主要这块代码,nodeport方式 } else if (hasExternalIP) { final String address = service.getSpec().getExternalIPs().get(0); if (address != null && !address.isEmpty()) { return Optional.of(new Endpoint(address, restPort)); } } return Optional.empty(); } huweihua 于2022年4月15日周五 11:35写道: > 图片显示失败了,可以上传到图床,贴链接到邮件里 > > > 2022年4月15日 上午11:30,shimin huang 写道: > > > > > > 具体细节看和flink k8s这块的细节实现有关,不清楚为什么这个externalIPs的第一个ip在我们k8s集群中为什么不存在 > > > > shimin huang huangshimin1...@gmail.com>> 于2022年4月15日周五 11:16写道: > > hi,使用flink on native k8s NodePort方式启动任务后返回的jobmanager web > ui的地址不是集群中的ip,看k8s底层的部署也没发现具体问题 > >
Re: flink on native k8s NodePort方式启动任务后返回的jobmanager web ui的地址不是集群中的ip
Fabric8FlinkKubeClient#getRestEndPointFromService shimin huang 于2022年4月15日周五 13:37写道: > private Optional getRestEndPointFromService(Service service, int > restPort) { > if (service.getStatus() == null) { > return Optional.empty(); > } > > LoadBalancerStatus loadBalancer = service.getStatus().getLoadBalancer(); > boolean hasExternalIP = > service.getSpec() != null > && service.getSpec().getExternalIPs() != null > && !service.getSpec().getExternalIPs().isEmpty(); > > if (loadBalancer != null) { > return getLoadBalancerRestEndpoint(loadBalancer, restPort); > > // 理解主要这块代码,nodeport方式 > } else if (hasExternalIP) { > final String address = service.getSpec().getExternalIPs().get(0); > if (address != null && !address.isEmpty()) { > return Optional.of(new Endpoint(address, restPort)); > } > } > return Optional.empty(); > } > > > huweihua 于2022年4月15日周五 11:35写道: > >> 图片显示失败了,可以上传到图床,贴链接到邮件里 >> >> > 2022年4月15日 上午11:30,shimin huang 写道: >> > >> > >> > 具体细节看和flink k8s这块的细节实现有关,不清楚为什么这个externalIPs的第一个ip在我们k8s集群中为什么不存在 >> > >> > shimin huang > huangshimin1...@gmail.com>> 于2022年4月15日周五 11:16写道: >> > hi,使用flink on native k8s NodePort方式启动任务后返回的jobmanager web >> ui的地址不是集群中的ip,看k8s底层的部署也没发现具体问题 >> >>
Re: flink on native k8s NodePort方式启动任务后返回的jobmanager web ui的地址不是集群中的ip
感谢 我看下这块 huweihua 于2022年4月15日周五 13:47写道: > Nodeport 模式下获取 address 的代码在 getLoadBalancerRestEndpoint 中。历史版本会直接获取k8s > master url, 在 Flink-1.14 版本中针对 k8s master 是 VIP 的情况做了适配[1],可以看下是否对你有帮助 > https://issues.apache.org/jira/browse/FLINK-23507 > > > > 2022年4月15日 下午1:37,shimin huang 写道: > > > > Fabric8FlinkKubeClient#getRestEndPointFromService > > > > > > shimin huang 于2022年4月15日周五 13:37写道: > > > >> private Optional getRestEndPointFromService(Service service, > int restPort) { > >>if (service.getStatus() == null) { > >>return Optional.empty(); > >>} > >> > >>LoadBalancerStatus loadBalancer = > service.getStatus().getLoadBalancer(); > >>boolean hasExternalIP = > >>service.getSpec() != null > >>&& service.getSpec().getExternalIPs() != null > >>&& !service.getSpec().getExternalIPs().isEmpty(); > >> > >>if (loadBalancer != null) { > >>return getLoadBalancerRestEndpoint(loadBalancer, restPort); > >> > >> // 理解主要这块代码,nodeport方式 > >>} else if (hasExternalIP) { > >>final String address = service.getSpec().getExternalIPs().get(0); > >>if (address != null && !address.isEmpty()) { > >>return Optional.of(new Endpoint(address, restPort)); > >>} > >>} > >>return Optional.empty(); > >> } > >> > >> > >> huweihua 于2022年4月15日周五 11:35写道: > >> > >>> 图片显示失败了,可以上传到图床,贴链接到邮件里 > >>> > >>>> 2022年4月15日 上午11:30,shimin huang 写道: > >>>> > >>>> > >>>> 具体细节看和flink k8s这块的细节实现有关,不清楚为什么这个externalIPs的第一个ip在我们k8s集群中为什么不存在 > >>>> > >>>> shimin huang >>> huangshimin1...@gmail.com>> 于2022年4月15日周五 11:16写道: > >>>> hi,使用flink on native k8s NodePort方式启动任务后返回的jobmanager web > >>> ui的地址不是集群中的ip,看k8s底层的部署也没发现具体问题 > >>> > >>> > >
flink on k8s native开启ha后根据sp启动任务报错找不到job id 0000
flink on native k8s根据savepoint停止任务后在根据savepoint启动任务报错找不到job 错误堆栈如下: java.util.concurrent.ExecutionException: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job () at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) at com.xxx.xxx..streaming.job.segment.xx.xxx.main(ProfileConditionJudgmentJob.java:150) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job () at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$requestJobStatus$14(Dispatcher.java:596) at java.util.Optional.orElseGet(Optional.java:267) at org.apache.flink.runtime.dispatcher.Dispatcher.requestJobStatus(Dispatcher.java:590) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 common frames omitted 2022-05-17 13:43:28.676 [flink-akka.actor.default-dispatcher-4] WARN o.a.f.c.d.application.ApplicationDispatcherBootstrap - Application failed unexpectedly: java.util.concurrent.CompletionException: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job () at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607) at java.util.concurrent.CompletableFuture.uniApplyStage(
Re: flink on k8s native开启ha后根据sp启动任务报错找不到job id 0000
flink版本1.13.0 /home/hdfs/flink-1.13.0/bin/flink run-application \ -t kubernetes-application \ -s spPath \ -p 32 \ -Dresourcemanager.taskmanager-timeout=6 \ -Dkubernetes.namespace=xxx \ -Dkubernetes.service-account=xxx \ -Dkubernetes.taskmanager.service-account=xxx \ -Dkubernetes.cluster-id= \ -Dkubernetes.container.image.pull-secrets= \ -Dkubernetes.rest-service.exposed.type=NodePort \ -Dkubernetes.config.file=/cce.conf \ -Denv.java.opts="-DHADOOP_USER_NAME=hdfs" \ -Dkubernetes.pod-template-file=/home/hdfs/jars/flink-pod.yaml \ -Dkubernetes.taskmanager.cpu=1 \ -Dkubernetes.jobmanager.cpu=0.5 \ -Dtaskmanager.numberOfTaskSlots=16 \ -Djobmanager.memory.process.size=2048m \ -Dtaskmanager.memory.process.size=4096m \ -Dtaskmanager.memory.managed.fraction=0.1 \ -Dtaskmanager.memory.network.fraction=0.1 \ -Dtaskmanager.memory.network.max=2048m \ -Dtaskmanager.memory.network.min=512m \ -Dstate.checkpoints.num-retained=20 \ -Dstate.backend.rocksdb.memory.managed=true \ -Dstate.backend.rocksdb.checkpoint.transfer.thread.num=5 \ -Dstate.backend.rocksdb.localdir=/tmp/rocksdb \ -Dstate.backend.incremental=true \ -Dclassloader.resolve-order=parent-first \ -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \ -Dhigh-availability.storageDir=hdfs://bmr-cluster/flink/kubernetes/ha/recovery \ -c \ Weihua Hu 于2022年5月17日周二 21:54写道: > Hi, shimin > 用的哪个版本的 Flink?提交命令是什么呢? > > > Best, > Weihua > > > 2022年5月17日 下午1:48,shimin huang 写道: > > > > flink on native k8s根据savepoint停止任务后在根据savepoint启动任务报错找不到job > > 错误堆栈如下: > > java.util.concurrent.ExecutionException: > > org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not > find > > Flink job () > > at > > > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > > at > > > org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123) > > at > > > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80) > > at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834) > > at > > > com.xxx.xxx..streaming.job.segment.xx.xxx.main(ProfileConditionJudgmentJob.java:150) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > > at > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > > at > > > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242) > > at > > > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212) > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > > > org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159) > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > > at > > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > > > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > > > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: > > Could not find Flink job () > > at > > > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$requestJobStatus$14(Dispatcher.java:596) > > at java.util.Optional.orElseGet(Optional.java:267) > > at > &g
Re: 使用flink-operator 成功生成savepoint, 但job 并未取消
savepoint流程 1. 执行savepoint kubectl patch flinkdeployment/savepoint-job --type=merge -p '{"spec": {"job": {"state": "suspended", "upgradeMode": "savepoint"}}}’ 2. 删除job kubectl delete flinkdeployment/savepoint-job 3. 根据savepoint启动job 修改flinkdeployment yaml配置,添加如下 spec: ... job: initialSavepointPath: savepoint路径 执行kubectl apply -f xxx > 2022年10月19日 下午3:53,Liting Liu (litiliu) 写道: > > hi: > 我在使用flink-operator 1.2.0 & flink 1.14.3, 使用flink-operator 成功手动生成了savepoint, > 但savepoint 生成之后, job 并没有自动取消。 希望savepoint 成功之后job 能自动取消。请问是哪里没操作对吗?还是一个已知问题? > jobStatus: > jobId: 9de925e9d4a67e04ef6279925450907c > jobName: sql-te-lab-s334c9 > savepointInfo: > lastPeriodicSavepointTimestamp: 0 > lastSavepoint: > location: >- > > hdfs://nameservice1/user/pda/savepoint/6077944532288/flink/sql-te/savepoint-9de925-b9ead1c58e7b > timeStamp: 1666163606426 > triggerType: MANUAL > savepointHistory: > - location: >- > > hdfs://nameservice1/user/pda/savepoint/6077944532288/flink/sql-te/savepoint-9de925-b9ead1c58e7b > timeStamp: 1666163606426 > triggerType: MANUAL > triggerId: '' > triggerTimestamp: 0 > triggerType: MANUAL > startTime: '1666161791058' > state: RUNNING > updateTime: '1666161828364' >
Re: Flink提交作业是否可以跳过上传作业jar包这一步?
可以考虑基于flink-kubernetes依赖下的KubernetesClusterDescriptor来启动任务,可以参考https://github.com/collabH/flink-deployer/blob/main/infrastructure/src/main/java/com/flink/plugins/inf/deployer/KubernetesClusterDeployer.java > 2023年5月15日 19:21,casel.chen 写道: > > 我们开发了一个实时计算平台提交flink > sql作业到k8s上运行,发现每次提交作业都需要上传平台sql作业jar包flinksql.jar,因为这个jar包包含了平台用到的所有connector和format,所以flinksql.jar这个fat > > jar有几百MB,又因为平台所在的k8s集群和作业真正运行的k8s集群是不同的,某些集群的跨k8s离群网络传输开销(跨地区甚至跨云厂商)比较大,而且这个flinksql.jar我们已经放到了k8s镜像当中,jobManager加载镜像后是可以在本地找到该jar的,所以想问一下Flink提交作业是否可以跳过上传作业jar包这一步?有没有参数指定直接去本地加载该作业jar包?这样的话,一是可以减少网络传输开销,二是可以加快flink > sql作业提交的速度。
Re: Flink 严重背压问题排查
Hello aven.wu: 可以看下各个operator的metrics的指标,比如它的buffers.outPoolUsage、buffers.inPoolUsage、buffers.inputFloatingBuffersUsage、buffers.inputExclusiveBuffersUsage, - 如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它被下游反压限速了;如果一个 Subtask 的接受端 Buffer 占用很高,则表明它将反压传导至上游。 - outPoolUsage 和 inPoolUsage 同为低或同为高分别表明当前 Subtask 正常或处于被下游反压 ,这应该没有太多疑问。而比较有趣的是当 outPoolUsage 和 inPoolUsage 表现不同时,这可能是出于反压传导的中间状态或者表明该 Subtask 就是反压的根源。 - 如果一个 Subtask 的 outPoolUsage 是高,通常是被下游 Task 所影响,所以可以排查它本身是反压根源的可能性。如果一个 Subtask 的 outPoolUsage 是低,但其 inPoolUsage 是高,则表明它有可能是反压的根源。因为通常反压会传导至其上游,导致上游某些 Subtask 的 outPoolUsage 为高,我们可以根据这点来进一步判断。值得注意的是,反压有时是短暂的且影响不大,比如来自某个 Channel 的短暂网络延迟或者 TaskManager 的正常 GC,这种情况下我们可以不用处理。 - 可以分析出来上游分下游限速里。 - 通常来说,floatingBuffersUsage 为高则表明反压正在传导至上游,而 exclusiveBuffersUsage 则表明了反压是否存在倾斜(floatingBuffersUsage 高、exclusiveBuffersUsage 低为有倾斜,因为少数 channel 占用了大部分的 Floating Buffer)。 分析出来具体那个operator出现反压的问题,然后在具体去分析,看看数据是否倾斜,以及对应算子的jvm情况,看一些算子的线程,可以利用阿里的Arthas在线看下,希望能够帮到你哈。 aven.wu 于2020年5月11日周一 下午9:43写道: > Hello 大家好 > 今天遇到个Flink 背压的问题,导致程序完全卡主不在处理数据,从监控页面看是应该是 keyProcess-> sink :alarm state > 处理数据有问题,导致上游 ruleProcess 出现背压。 > KeyProcess 是中定义了一个MapState,每来一条数据会读取和更新state中的内容。Sink 是写入kafka已排除不是kafka的问题 > http://qiniu.lgwen.cn/13F2DE58-98C1-4851-B54A-6BDC3C646169.png, > http://qiniu.lgwen.cn/image/jvm.png > > Dump了 堆栈日志 > http://qiniu.lgwen.cn/docstack.log > 没什么排查的思路,如果方便的话,提供一些排查的思路。 > > Best > Aven > >