Re: Flink提交作业是否可以跳过上传作业jar包这一步?

2023-05-15 文章 shimin huang
可以考虑基于flink-kubernetes依赖下的KubernetesClusterDescriptor来启动任务,可以参考https://github.com/collabH/flink-deployer/blob/main/infrastructure/src/main/java/com/flink/plugins/inf/deployer/KubernetesClusterDeployer.java
 

> 2023年5月15日 19:21,casel.chen  写道:
> 
> 我们开发了一个实时计算平台提交flink 
> sql作业到k8s上运行,发现每次提交作业都需要上传平台sql作业jar包flinksql.jar,因为这个jar包包含了平台用到的所有connector和format,所以flinksql.jar这个fat
>  
> jar有几百MB,又因为平台所在的k8s集群和作业真正运行的k8s集群是不同的,某些集群的跨k8s离群网络传输开销(跨地区甚至跨云厂商)比较大,而且这个flinksql.jar我们已经放到了k8s镜像当中,jobManager加载镜像后是可以在本地找到该jar的,所以想问一下Flink提交作业是否可以跳过上传作业jar包这一步?有没有参数指定直接去本地加载该作业jar包?这样的话,一是可以减少网络传输开销,二是可以加快flink
>  sql作业提交的速度。



Re: 使用flink-operator 成功生成savepoint, 但job 并未取消

2022-10-19 文章 shimin huang
savepoint流程
1. 执行savepoint
kubectl patch flinkdeployment/savepoint-job --type=merge -p '{"spec": {"job": 
{"state": "suspended", "upgradeMode": "savepoint"}}}’
2. 删除job
kubectl delete flinkdeployment/savepoint-job
3. 根据savepoint启动job
修改flinkdeployment yaml配置,添加如下
spec:
 ...
 job:
   initialSavepointPath: savepoint路径
执行kubectl apply -f xxx

> 2022年10月19日 下午3:53,Liting Liu (litiliu)  写道:
> 
> hi:
> 我在使用flink-operator 1.2.0 & flink 1.14.3,  使用flink-operator 成功手动生成了savepoint, 
> 但savepoint 生成之后, job 并没有自动取消。 希望savepoint 成功之后job 能自动取消。请问是哪里没操作对吗?还是一个已知问题?
> jobStatus:
>   jobId: 9de925e9d4a67e04ef6279925450907c
>   jobName: sql-te-lab-s334c9
>   savepointInfo:
> lastPeriodicSavepointTimestamp: 0
> lastSavepoint:
>   location: >-
> 
> hdfs://nameservice1/user/pda/savepoint/6077944532288/flink/sql-te/savepoint-9de925-b9ead1c58e7b
>   timeStamp: 1666163606426
>   triggerType: MANUAL
> savepointHistory:
>   - location: >-
>   
> hdfs://nameservice1/user/pda/savepoint/6077944532288/flink/sql-te/savepoint-9de925-b9ead1c58e7b
> timeStamp: 1666163606426
> triggerType: MANUAL
> triggerId: ''
> triggerTimestamp: 0
> triggerType: MANUAL
>   startTime: '1666161791058'
>   state: RUNNING
>   updateTime: '1666161828364'
> 



Re: flink on k8s native开启ha后根据sp启动任务报错找不到job id 0000

2022-05-17 文章 shimin huang
flink版本1.13.0
/home/hdfs/flink-1.13.0/bin/flink run-application \
-t kubernetes-application \
-s spPath \
-p 32 \
-Dresourcemanager.taskmanager-timeout=6 \
-Dkubernetes.namespace=xxx \
-Dkubernetes.service-account=xxx \
-Dkubernetes.taskmanager.service-account=xxx \
-Dkubernetes.cluster-id= \
-Dkubernetes.container.image.pull-secrets= \
-Dkubernetes.rest-service.exposed.type=NodePort  \
-Dkubernetes.config.file=/cce.conf \
-Denv.java.opts="-DHADOOP_USER_NAME=hdfs" \
-Dkubernetes.pod-template-file=/home/hdfs/jars/flink-pod.yaml \
-Dkubernetes.taskmanager.cpu=1 \
-Dkubernetes.jobmanager.cpu=0.5 \
-Dtaskmanager.numberOfTaskSlots=16 \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
-Dtaskmanager.memory.managed.fraction=0.1 \
-Dtaskmanager.memory.network.fraction=0.1 \
-Dtaskmanager.memory.network.max=2048m \
-Dtaskmanager.memory.network.min=512m \
-Dstate.checkpoints.num-retained=20 \
-Dstate.backend.rocksdb.memory.managed=true \
-Dstate.backend.rocksdb.checkpoint.transfer.thread.num=5 \
-Dstate.backend.rocksdb.localdir=/tmp/rocksdb \
-Dstate.backend.incremental=true \
-Dclassloader.resolve-order=parent-first \

-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
\

-Dhigh-availability.storageDir=hdfs://bmr-cluster/flink/kubernetes/ha/recovery
\
-c  \

Weihua Hu  于2022年5月17日周二 21:54写道:

> Hi, shimin
> 用的哪个版本的 Flink?提交命令是什么呢?
>
>
> Best,
> Weihua
>
> > 2022年5月17日 下午1:48,shimin huang  写道:
> >
> > flink on native k8s根据savepoint停止任务后在根据savepoint启动任务报错找不到job
> > 错误堆栈如下:
> > java.util.concurrent.ExecutionException:
> > org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not
> find
> > Flink job ()
> > at
> >
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> > at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> > at
> >
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
> > at
> >
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
> > at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)
> > at
> >
> com.xxx.xxx..streaming.job.segment.xx.xxx.main(ProfileConditionJudgmentJob.java:150)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> > at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> > at
> >
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
> > at
> >
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212)
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> >
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159)
> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> > at
> >
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException:
> > Could not find Flink job ()
> > at
> >
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$requestJobStatus$14(Dispatcher.java:596)
> > at java.util.Optional.orElseGet(Optional.java:267)
> > at

flink on k8s native开启ha后根据sp启动任务报错找不到job id 0000

2022-05-16 文章 shimin huang
flink on native k8s根据savepoint停止任务后在根据savepoint启动任务报错找不到job
错误堆栈如下:
java.util.concurrent.ExecutionException:
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find
Flink job ()
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)
at
com.xxx.xxx..streaming.job.segment.xx.xxx.main(ProfileConditionJudgmentJob.java:150)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException:
Could not find Flink job ()
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$requestJobStatus$14(Dispatcher.java:596)
at java.util.Optional.orElseGet(Optional.java:267)
at
org.apache.flink.runtime.dispatcher.Dispatcher.requestJobStatus(Dispatcher.java:590)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 common frames omitted
2022-05-17 13:43:28.676 [flink-akka.actor.default-dispatcher-4] WARN
 o.a.f.c.d.application.ApplicationDispatcherBootstrap  - Application failed
unexpectedly:
java.util.concurrent.CompletionException:
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find
Flink job ()
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
at

Re: flink on native k8s NodePort方式启动任务后返回的jobmanager web ui的地址不是集群中的ip

2022-04-14 文章 shimin huang
感谢 我看下这块

huweihua  于2022年4月15日周五 13:47写道:

> Nodeport 模式下获取 address 的代码在 getLoadBalancerRestEndpoint 中。历史版本会直接获取k8s
> master url, 在 Flink-1.14 版本中针对 k8s master 是 VIP 的情况做了适配[1],可以看下是否对你有帮助
> https://issues.apache.org/jira/browse/FLINK-23507
>
>
> > 2022年4月15日 下午1:37,shimin huang  写道:
> >
> > Fabric8FlinkKubeClient#getRestEndPointFromService
> >
> >
> > shimin huang  于2022年4月15日周五 13:37写道:
> >
> >> private Optional getRestEndPointFromService(Service service,
> int restPort) {
> >>if (service.getStatus() == null) {
> >>return Optional.empty();
> >>}
> >>
> >>LoadBalancerStatus loadBalancer =
> service.getStatus().getLoadBalancer();
> >>boolean hasExternalIP =
> >>service.getSpec() != null
> >>&& service.getSpec().getExternalIPs() != null
> >>&& !service.getSpec().getExternalIPs().isEmpty();
> >>
> >>if (loadBalancer != null) {
> >>return getLoadBalancerRestEndpoint(loadBalancer, restPort);
> >>
> >> // 理解主要这块代码,nodeport方式
> >>} else if (hasExternalIP) {
> >>final String address = service.getSpec().getExternalIPs().get(0);
> >>if (address != null && !address.isEmpty()) {
> >>return Optional.of(new Endpoint(address, restPort));
> >>}
> >>}
> >>return Optional.empty();
> >> }
> >>
> >>
> >> huweihua  于2022年4月15日周五 11:35写道:
> >>
> >>> 图片显示失败了,可以上传到图床,贴链接到邮件里
> >>>
> >>>> 2022年4月15日 上午11:30,shimin huang  写道:
> >>>>
> >>>>
> >>>> 具体细节看和flink k8s这块的细节实现有关,不清楚为什么这个externalIPs的第一个ip在我们k8s集群中为什么不存在
> >>>>
> >>>> shimin huang  >>> huangshimin1...@gmail.com>> 于2022年4月15日周五 11:16写道:
> >>>> hi,使用flink on  native k8s NodePort方式启动任务后返回的jobmanager web
> >>> ui的地址不是集群中的ip,看k8s底层的部署也没发现具体问题
> >>>
> >>>
>
>


Re: flink on native k8s NodePort方式启动任务后返回的jobmanager web ui的地址不是集群中的ip

2022-04-14 文章 shimin huang
Fabric8FlinkKubeClient#getRestEndPointFromService


shimin huang  于2022年4月15日周五 13:37写道:

> private Optional getRestEndPointFromService(Service service, int 
> restPort) {
> if (service.getStatus() == null) {
> return Optional.empty();
> }
>
> LoadBalancerStatus loadBalancer = service.getStatus().getLoadBalancer();
> boolean hasExternalIP =
> service.getSpec() != null
> && service.getSpec().getExternalIPs() != null
> && !service.getSpec().getExternalIPs().isEmpty();
>
> if (loadBalancer != null) {
> return getLoadBalancerRestEndpoint(loadBalancer, restPort);
>
> // 理解主要这块代码,nodeport方式
> } else if (hasExternalIP) {
> final String address = service.getSpec().getExternalIPs().get(0);
> if (address != null && !address.isEmpty()) {
> return Optional.of(new Endpoint(address, restPort));
> }
> }
> return Optional.empty();
> }
>
>
> huweihua  于2022年4月15日周五 11:35写道:
>
>> 图片显示失败了,可以上传到图床,贴链接到邮件里
>>
>> > 2022年4月15日 上午11:30,shimin huang  写道:
>> >
>> >
>> > 具体细节看和flink k8s这块的细节实现有关,不清楚为什么这个externalIPs的第一个ip在我们k8s集群中为什么不存在
>> >
>> > shimin huang > huangshimin1...@gmail.com>> 于2022年4月15日周五 11:16写道:
>> > hi,使用flink on  native k8s NodePort方式启动任务后返回的jobmanager web
>> ui的地址不是集群中的ip,看k8s底层的部署也没发现具体问题
>>
>>


Re: flink on native k8s NodePort方式启动任务后返回的jobmanager web ui的地址不是集群中的ip

2022-04-14 文章 shimin huang
private Optional getRestEndPointFromService(Service service,
int restPort) {
if (service.getStatus() == null) {
return Optional.empty();
}

LoadBalancerStatus loadBalancer = service.getStatus().getLoadBalancer();
boolean hasExternalIP =
service.getSpec() != null
&& service.getSpec().getExternalIPs() != null
&& !service.getSpec().getExternalIPs().isEmpty();

if (loadBalancer != null) {
return getLoadBalancerRestEndpoint(loadBalancer, restPort);

// 理解主要这块代码,nodeport方式
} else if (hasExternalIP) {
final String address = service.getSpec().getExternalIPs().get(0);
if (address != null && !address.isEmpty()) {
return Optional.of(new Endpoint(address, restPort));
}
}
return Optional.empty();
}


huweihua  于2022年4月15日周五 11:35写道:

> 图片显示失败了,可以上传到图床,贴链接到邮件里
>
> > 2022年4月15日 上午11:30,shimin huang  写道:
> >
> >
> > 具体细节看和flink k8s这块的细节实现有关,不清楚为什么这个externalIPs的第一个ip在我们k8s集群中为什么不存在
> >
> > shimin huang  huangshimin1...@gmail.com>> 于2022年4月15日周五 11:16写道:
> > hi,使用flink on  native k8s NodePort方式启动任务后返回的jobmanager web
> ui的地址不是集群中的ip,看k8s底层的部署也没发现具体问题
>
>


Re: flink on native k8s NodePort方式启动任务后返回的jobmanager web ui的地址不是集群中的ip

2022-04-14 文章 shimin huang
[image: image.png]
具体细节看和flink k8s这块的细节实现有关,不清楚为什么这个externalIPs的第一个ip在我们k8s集群中为什么不存在

shimin huang  于2022年4月15日周五 11:16写道:

> hi,使用flink on  native k8s NodePort方式启动任务后返回的jobmanager web
> ui的地址不是集群中的ip,看k8s底层的部署也没发现具体问题
>


flink on native k8s NodePort方式启动任务后返回的jobmanager web ui的地址不是集群中的ip

2022-04-14 文章 shimin huang
hi,使用flink on  native k8s NodePort方式启动任务后返回的jobmanager web
ui的地址不是集群中的ip,看k8s底层的部署也没发现具体问题


Re: flink on k8s NodePort方式提交本人返回的jm的url和容器ip不一致,导致无法访问

2022-04-14 文章 shimin huang
感谢 我去了解了解

huweihua  于2022年4月14日周四 20:06写道:

> 使用 NodePort 会默认使用 api server 的 host + nodeport, 预期 K8S 集群内部所有节点都会转发
> nodeport 的流量,如果无法访问,可能是你使用的 K8S 做了一些封禁,把 NodePort 的流量转发功能禁用了
>
>
> > 2022年4月14日 下午5:22,shimin huang  写道:
> >
> > 使用Nodeport方式提交的flink任务返回的jobmangaer的web ui地址发现无法访问,这导致正常的flink
> list命令也无法获取对应
> > cluster.id下的job,实际返回的jm ip发现是api server的,请问各位有什么好的办法解决这类问题呢
> >
> > shimin huang  于2022年4月14日周四 17:20写道:
> >
> >> flink version: flink 1.13.0
> >>
> >>
>
>


Re: flink on k8s NodePort方式提交本人返回的jm的url和容器ip不一致,导致无法访问

2022-04-14 文章 shimin huang
使用Nodeport方式提交的flink任务返回的jobmangaer的web ui地址发现无法访问,这导致正常的flink list命令也无法获取对应
cluster.id下的job,实际返回的jm ip发现是api server的,请问各位有什么好的办法解决这类问题呢

shimin huang  于2022年4月14日周四 17:20写道:

> flink version: flink 1.13.0
>
>


flink on k8s NodePort方式提交本人返回的jm的url和容器ip不一致,导致无法访问

2022-04-14 文章 shimin huang
flink version: flink 1.13.0


Re: flink on k8s是否有替代yarn.ship-files的参数

2022-03-29 文章 shimin huang
好的 我了解下 感谢!

yu'an huang  于2022年3月28日周一 22:12写道:

> 你好,
>
>
> 可以看看这个链接中关于usrlib的介绍(Application mode部分)。
>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/standalone/docker/#docker-hub-flink-images
>
> Kubernetes不像yarn一样提供了ship文件的功能。对于Kubernetes application mode来说,用户程序是运行在Job
>
> Manager的,要求所有的artifacts都已经在镜像中存在。Flink会自动将$FLINK_HOME/usrlib目录下的文件都放入用户程序的classpath中,所以你需要按照链接中的方法,创建镜像,将你需要的artifacts提前放到镜像之中。然后在提交命令中指定主类和主类所用的JAR就可以了。
>
>
>
>
> On Mon, 28 Mar 2022 at 8:26 PM, shimin huang 
> wrote:
>
> > 1.12.0没有找到相关的配置,目前考虑测试下pipeline.classpaths指定对应的jars路径是否生效。
> >
> > Geng Biao  于2022年3月28日周一 20:18写道:
> >
> > > Hi shimin,
> > > 外部jar依赖可以看一下文档里usrlib在flink on k8s里的使用。
> > >
> > > Best,
> > > Biao
> > >
> > > 获取 Outlook for iOS<https://aka.ms/o0ukef>
> > > 
> > > 发件人: shimin huang 
> > > 发送时间: Monday, March 28, 2022 8:14:28 PM
> > > 收件人: user-zh@flink.apache.org 
> > > 主题: flink on k8s是否有替代yarn.ship-files的参数
> > >
> > > flink version 1.12.0
> > >
> > > 近期在将flink on yarn迁移至flink on
> > >
> > >
> >
> k8s,以前外部的jar包和配置都是通过yarn.skip-files参数来进行配置加载的,想问下k8s是否有类似参数,目前在1.12.0的文档发现没找到类似的,有个
> > >
> > >
> >
> external-resource..yarn.config-key配置,但是没有具体的试用案例,希望有大佬能够解答下有什么好的方式吗
> > >
> >
>


Re: flink on k8s是否有替代yarn.ship-files的参数

2022-03-28 文章 shimin huang
1.12.0没有找到相关的配置,目前考虑测试下pipeline.classpaths指定对应的jars路径是否生效。

Geng Biao  于2022年3月28日周一 20:18写道:

> Hi shimin,
> 外部jar依赖可以看一下文档里usrlib在flink on k8s里的使用。
>
> Best,
> Biao
>
> 获取 Outlook for iOS<https://aka.ms/o0ukef>
> ________
> 发件人: shimin huang 
> 发送时间: Monday, March 28, 2022 8:14:28 PM
> 收件人: user-zh@flink.apache.org 
> 主题: flink on k8s是否有替代yarn.ship-files的参数
>
> flink version 1.12.0
>
> 近期在将flink on yarn迁移至flink on
>
> k8s,以前外部的jar包和配置都是通过yarn.skip-files参数来进行配置加载的,想问下k8s是否有类似参数,目前在1.12.0的文档发现没找到类似的,有个
>
> external-resource..yarn.config-key配置,但是没有具体的试用案例,希望有大佬能够解答下有什么好的方式吗
>


Re: flink on k8s是否有替代yarn.ship-files的参数

2022-03-28 文章 shimin huang
external-resource..yarn.config-key这个配置贴错了应该是这个
external-resource..kubernetes.config-key

shimin huang  于2022年3月28日周一 20:14写道:

> flink version 1.12.0
>
> 近期在将flink on yarn迁移至flink on
> k8s,以前外部的jar包和配置都是通过yarn.skip-files参数来进行配置加载的,想问下k8s是否有类似参数,目前在1.12.0的文档发现没找到类似的,有个
> external-resource..yarn.config-key配置,但是没有具体的试用案例,希望有大佬能够解答下有什么好的方式吗
>


flink on k8s是否有替代yarn.ship-files的参数

2022-03-28 文章 shimin huang
flink version 1.12.0

近期在将flink on yarn迁移至flink on
k8s,以前外部的jar包和配置都是通过yarn.skip-files参数来进行配置加载的,想问下k8s是否有类似参数,目前在1.12.0的文档发现没找到类似的,有个
external-resource..yarn.config-key配置,但是没有具体的试用案例,希望有大佬能够解答下有什么好的方式吗


Re: 非对齐检查点还能保证exactly once语义吗

2021-08-03 文章 shimin huang
Hi!
这个有相关的文档介绍吗,1.11版本左右简单了解过exactly once非对齐机制这块,1.13版本的exactly
once的非对齐机制貌似没在官方文档上看到

Caizhi Weng  于2021年8月2日周一 下午7:28写道:

> Hi!
>
> shimin huang 说的可能是原本的 at least once 的 checkpoint 机制,这种 checkpoint 原本就是不对齐的。
>
> Flink 1.13 完善了 exactly once 条件下的不对齐 checkpoint 机制,因此这是能保证 exactly once
> 的。实现原理简单来说就是把还没处理的数据一起写到 state 里,下次恢复的时候把这些还没处理的数据也恢复出来接着处理。
>
> 张锴  于2021年8月2日周一 下午7:20写道:
>
> > 这个原理能说明一下吗,咋做到的
> >
> > 东东  于2021年8月2日周一 下午7:16写道:
> >
> > > 对于每一个subtask,边界仍然是清晰的,所以精确一次可以保证,只不过ck会变大。
> > >
> > > 在 2021-08-02 18:53:11,"张锴"  写道:
> > > >flink最新特性中有非对齐检查点的特性,可以用来解决一些反压下的任务,但如果用了这个,还能保证精确一次吗?对齐的检查点有清晰的快照N~N
> +
> > > >1之间的边界,这个会将数据混在一起,如何在恢复的时候保证精确一次?
> > >
> >
>


Re: 非对齐检查点还能保证exactly once语义吗

2021-08-02 文章 shimin huang
不可以,会存在重复消费的问题,如果buffer没有对齐的话,job重启,那么这些buffer的数据就会清空,然后相关的subtask会重新消费一遍。

张锴  于2021年8月2日周一 下午6:53写道:

> flink最新特性中有非对齐检查点的特性,可以用来解决一些反压下的任务,但如果用了这个,还能保证精确一次吗?对齐的检查点有清晰的快照N~N +
> 1之间的边界,这个会将数据混在一起,如何在恢复的时候保证精确一次?
>


Re: flink集群提交任务挂掉

2021-04-01 文章 shimin huang
增大`taskmanager.memory.task.off-heap.size`配置

bowen li  于2021年4月2日周五 上午10:54写道:

> Hi,大家好:
>  现在我们遇到的场景是这样的,提交任务的时候会报错。我们使用的版本是1.12.1,搭建模式是standalone的。下面是报错信息。
>
>java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory error has occurred. This can mean two things: either job(s)
> require(s) a larger size of JVM direct memory or there is a direct memory
> leak. The direct memory can be allocated by user code or some of its
> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
> configuration option should be increased. Flink framework and its
> dependencies also consume the direct memory, mostly for network
> communication. The most of network memory is managed by Flink and should
> not result in out-of-memory error. In certain special cases, in particular
> for jobs with high parallelism, the framework may require more direct
> memory which is not managed by Flink. In this case
> 'taskmanager.memory.framework.off-heap.size' configuration option should be
> increased. If the error persists then there is probably a direct memory
> leak in user code or some of its dependencies
>   这种情况我们需要特别的配置吗?


Re: Flink Job 如何集成到自己的系统,方便管理

2021-03-07 文章 shimin huang
可以看下flink源码的flink k8s模块,里面的test pachage下有需求flink k8s的使用姿势,希望对你有帮助。

DanielGu <610493...@qq.com> 于2021年3月7日周日 下午4:34写道:

> >有的,通过 FLINK 和 YARN 或 k8s
> 的接口进行编程,管理元数据,管理用户文件,支持提交作业及之后管理作业状态,这是许多公司应用Flink
> 的实现方式。
>
> 请问有什么可以参考的资料吗?有相关意愿,不知道从哪里下手,希望整个 flink on k8s
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


使用per-job部署成功的flink sql应用但是用applicationMode部署失败,提交到yarn上不到2秒就死掉,并且读取不到日志

2020-11-25 文章 shimin huang
```
14:56:44.536 [main] ERROR org.apache.flink.client.cli.CliFrontend - Error
while running the command.
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
deploy Yarn Application Cluster
at
org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:414)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:64)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:197)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:919)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_181]
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
[hadoop-common-3.0.0-cdh6.3.2.jar:?]
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
[flink-dist_2.11-1.11.2.jar:1.11.2]
```

部署会报这种错,通过yarn logs -applicationId xxx去看yarn log,也看不到相关日志


使用flink1.11.1的debezium-changelog目前是否不支持Watermark

2020-11-16 文章 shimin huang
报错日志:
```
Currently, defining WATERMARK on a changelog source is not supported
```


flink1.11.1使用Table API Hive方言的executSql报错

2020-07-27 文章 shimin huang
Hi,all:
  本人基于Flink1.11.1的table API使用Hive方言,调用executSql方法后报错,堆栈信息如下:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute sql
at org.apache.flink.client.program.PackagedProgram.callMainMethod(
PackagedProgram.java:302) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.client.program.PackagedProgram
.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.
11-1.11.1.jar:1.11.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:
149) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.client.deployment.application.
DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.client.deployment.application.
DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler
.lambda$handleRequest$0(JarRunHandler.java:100) ~[flink-dist_2.11-1.11.1
.jar:1.11.1]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(
CompletableFuture.java:1604) [?:1.8.0_242]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:
511) [?:1.8.0_242]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_242
]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask
.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_242]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask
.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_242]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
.java:1149) [?:1.8.0_242]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:624) [?:1.8.0_242]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
at org.apache.flink.table.api.internal.TableEnvironmentImpl
.executeInternal(TableEnvironmentImpl.java:747) ~[flink-table-blink_2.11-
1.11.1.jar:1.11.1]
at org.apache.flink.table.api.internal.TableEnvironmentImpl
.executeOperation(TableEnvironmentImpl.java:1069) ~[flink-table-blink_2.11-
1.11.1.jar:1.11.1]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(
TableEnvironmentImpl.java:690) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at org.forchange.online.etl.h2h.Prod2Poc.main(Prod2Poc.java:46) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.
0_242]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl
.java:62) ~[?:1.8.0_242]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_242]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(
PackagedProgram.java:288) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
... 13 more
Caused by: java.lang.IllegalArgumentException: Job client must be a
CoordinationRequestGateway. This is a bug.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:
139) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher
.setJobClient(CollectResultFetcher.java:97) ~[flink-dist_2.11-1.11.1.jar:
1.11.1]
at org.apache.flink.streaming.api.operators.collect.
CollectResultIterator.setJobClient(CollectResultIterator.java:84)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.table.planner.sinks.SelectTableSinkBase
.setJobClient(SelectTableSinkBase.java:81) ~[flink-table-blink_2.11-1.11.1
.jar:1.11.1]
at org.apache.flink.table.api.internal.TableEnvironmentImpl
.executeInternal(TableEnvironmentImpl.java:737) ~[flink-table-blink_2.11-
1.11.1.jar:1.11.1]
at org.apache.flink.table.api.internal.TableEnvironmentImpl
.executeOperation(TableEnvironmentImpl.java:1069) ~[flink-table-blink_2.11-
1.11.1.jar:1.11.1]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(
TableEnvironmentImpl.java:690) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at org.forchange.online.etl.h2h.Prod2Poc.main(Prod2Poc.java:46) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.
0_242]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl
.java:62) ~[?:1.8.0_242]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_242]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(
PackagedProgram.java:288) ~[flink-dist_2.11-1.11.1.jar:1.11.1]


* 核心错误
`Job client must be a CoordinationRequestGateway. This is a bug.`
请问这是一个Bug吗?


Re: Flink 严重背压问题排查

2020-05-11 文章 shimin huang
Hello aven.wu:
可以看下各个operator的metrics的指标,比如它的buffers.outPoolUsage、buffers.inPoolUsage、buffers.inputFloatingBuffersUsage、buffers.inputExclusiveBuffersUsage,

   - 如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它被下游反压限速了;如果一个 Subtask 的接受端 Buffer
   占用很高,则表明它将反压传导至上游。
   - outPoolUsage 和 inPoolUsage 同为低或同为高分别表明当前 Subtask 正常或处于被下游反压
   ,这应该没有太多疑问。而比较有趣的是当 outPoolUsage 和 inPoolUsage
   表现不同时,这可能是出于反压传导的中间状态或者表明该 Subtask 就是反压的根源。
   - 如果一个 Subtask 的 outPoolUsage 是高,通常是被下游 Task 所影响,所以可以排查它本身是反压根源的可能性。如果一个
   Subtask 的 outPoolUsage 是低,但其 inPoolUsage
是高,则表明它有可能是反压的根源。因为通常反压会传导至其上游,导致上游某些
   Subtask 的 outPoolUsage 为高,我们可以根据这点来进一步判断。值得注意的是,反压有时是短暂的且影响不大,比如来自某个
   Channel 的短暂网络延迟或者 TaskManager 的正常 GC,这种情况下我们可以不用处理。
   - 可以分析出来上游分下游限速里。
   - 通常来说,floatingBuffersUsage 为高则表明反压正在传导至上游,而 exclusiveBuffersUsage
   则表明了反压是否存在倾斜(floatingBuffersUsage 高、exclusiveBuffersUsage 低为有倾斜,因为少数
   channel 占用了大部分的 Floating Buffer)。

分析出来具体那个operator出现反压的问题,然后在具体去分析,看看数据是否倾斜,以及对应算子的jvm情况,看一些算子的线程,可以利用阿里的Arthas在线看下,希望能够帮到你哈。


aven.wu  于2020年5月11日周一 下午9:43写道:

> Hello 大家好
> 今天遇到个Flink 背压的问题,导致程序完全卡主不在处理数据,从监控页面看是应该是 keyProcess-> sink :alarm state
>  处理数据有问题,导致上游 ruleProcess 出现背压。
> KeyProcess 是中定义了一个MapState,每来一条数据会读取和更新state中的内容。Sink 是写入kafka已排除不是kafka的问题
> http://qiniu.lgwen.cn/13F2DE58-98C1-4851-B54A-6BDC3C646169.png,
> http://qiniu.lgwen.cn/image/jvm.png
>
> Dump了 堆栈日志
> http://qiniu.lgwen.cn/docstack.log
> 没什么排查的思路,如果方便的话,提供一些排查的思路。
>
> Best
> Aven
>
>