回复:求助:Flink有没有类似Storm的ack机制呢?

2020-08-04 文章 Bruce
好的,感谢回复,了解了,只能我们这边再去通知
了。

发自我的iPhone


-- 原始邮件 --
发件人: Congxian Qiu 

Re: 求助:Flink有没有类似Storm的ack机制呢?

2020-08-04 文章 Congxian Qiu
Hi 张洋
如果我理解没错的话,现在 Flink
无法严格保证消费了一条数据,等待上一条数据处理完成,然后再消费下一条数据的。如果想做到这个需求,需要用户做一些事情。
你说的第 2 点中是否处理完成,这个能否依赖第三方服务,在 sink(或者最后一个 operator) 处理完成之后做通知呢?
checkpoint 如果正常完成的话,那就是没有异常的,不过 checkpoint 没法完全保证一条数据一条数据的处理

Best,
Congxian


Bruce  于2020年8月5日周三 上午9:33写道:

> 1.我们这里有个需求,Flink从rabbitmq接收消息,需要完整消费处理完前一条,才可以继续消费,因为前一条的结果会影响后面一条的结果准确性。
>
>
> 2.目前我了解到的rabbitmq有个qos可以限流为1条,但是消息流入Flink处理,我并不知道什么时候处理完了,也没有标识可以知道处理完了。
>
>
>
> 3.通过checkpoint的通知,也不是很准确,我并不清楚checkpoint的备份周期内程序是否执行完成,只能知道checkpoint周期内没有异常
>
>
> 4.所以想求助下,Flink如何确认某一段task是否执行完毕?有没有这种机制呢?
>
>
>
>
>
>
> best wishes
> -
> 张洋
>
>
> 


Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running

2020-08-04 文章 bradyMk
原来如此!我重新加了-d 运行了任务,果然从 YarnSessionClusterEntrypoint  变成了
YarnJobClusterEntrypoint ,学习到了~这个问题困扰了我好久,真的万分感谢您的解答!谢谢!



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running

2020-08-04 文章 bradyMk
原来如此!果然用了-d后由 YarnSessionClusterEntrypoint 变成了 YarnJobClusterEntrypoint
;真的是万分感谢!这个问题困扰了我好久,感谢解答疑惑~



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


?????? slot????????

2020-08-04 文章 ??????
flink??1.9streamgraph??780,??slot964,780
??flinkuigraph
https://i.loli.net/2020/08/05/xsekI7cWOYRj58l.png
https://sm.ms/image/xsekI7cWOYRj58l; target="_blank"https://i.loli.net/2020/08/05/xsekI7cWOYRj58l.png; /https://i.loli.net/2020/08/05/dHzIUSZPGC359Ea.png
https://sm.ms/image/dHzIUSZPGC359Ea; target="_blank"https://i.loli.net/2020/08/05/dHzIUSZPGC359Ea.png; /https://i.loli.net/2020/08/05/PWJTU9CwD36y1iN.png
https://sm.ms/image/PWJTU9CwD36y1iN; target="_blank"https://i.loli.net/2020/08/05/PWJTU9CwD36y1iN.png; /

??????Flink??????????Storm??ack????????

2020-08-04 文章 Bruce
1.??Flink??rabbitmq


2.??rabbitmqqos??1Flink??


3.checkpointcheckpoint??checkpoint??


4.??Flink??task






best wishes
-





Re: 退订

2020-08-04 文章 Leonard Xu
Hi
是指取消订阅邮件吗?
可以发送任意内容的邮件到  user-zh-unsubscr...@flink.apache.org 
   取消订阅来自 user-zh@flink.apache.org 
  邮件列表的邮件

Flink 邮件列表的订阅管理,可以参考[1]

祝好,
Leonard Xu
[1] https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list 
 

> 在 2020年8月5日,08:38,baiyg25...@hundsun.com 写道:
> 
> 退订
> 



Re: slot计算问题

2020-08-04 文章 Leonard Xu
Hi,
图挂了,你可以用个图床工具上传图片链接看看

Best
Leonard

> 在 2020年8月4日,19:53,★猛★  写道:
> 
> hi 你好,
> 
> 
> 
> 我的flink是1.9,在streamgraph上设置最大并行度是780,并且所有算子在同一个分组下,但是实际使用的slot却是964,按理说应该是780,为什么会出现这种情况。
> 下图是flinkui里的graph
> 
>  
> 
> 实际使用的slot
>   
> 



退订

2020-08-04 文章 baiyg25...@hundsun.com
退订



Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-04 文章 Eleanore Jin
Hi Yang & Till,

Thanks for your prompt reply!

Yang, regarding your question, I am actually not using k8s job, as I put my
app.jar and its dependencies under flink's lib directory. I have 1 k8s
deployment for job manager, and 1 k8s deployment for task manager, and 1
k8s service for job manager.

As you mentioned above, if flink job is marked as failed, it will cause the
job manager pod to be restarted. Which is not the ideal behavior.

Do you suggest that I should change the deployment strategy from using k8s
deployment to k8s job? In case the flink program exit with non-zero code
(e.g. exhausted number of configured restart), pod can be marked as
complete hence not restarting the job again?

Thanks a lot!
Eleanore

On Tue, Aug 4, 2020 at 2:49 AM Yang Wang  wrote:

> @Till Rohrmann  In native mode, when a Flink
> application terminates with FAILED state, all the resources will be cleaned
> up.
>
> However, in standalone mode, I agree with you that we need to rethink the
> exit code of Flink. When a job exhausts the restart
> strategy, we should terminate the pod and do not restart again. After
> googling, it seems that we could not specify the restartPolicy
> based on exit code[1]. So maybe we need to return a zero exit code to
> avoid restarting by K8s.
>
> [1].
> https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code
>
> Best,
> Yang
>
> Till Rohrmann  于2020年8月4日周二 下午3:48写道:
>
>> @Yang Wang  I believe that we should rethink the
>> exit codes of Flink. In general you want K8s to restart a failed Flink
>> process. Hence, an application which terminates in state FAILED should not
>> return a non-zero exit code because it is a valid termination state.
>>
>> Cheers,
>> Till
>>
>> On Tue, Aug 4, 2020 at 8:55 AM Yang Wang  wrote:
>>
>>> Hi Eleanore,
>>>
>>> I think you are using K8s resource "Job" to deploy the jobmanager.
>>> Please set .spec.template.spec.restartPolicy = "Never" and
>>> spec.backoffLimit = 0.
>>> Refer here[1] for more information.
>>>
>>> Then, when the jobmanager failed because of any reason, the K8s job will
>>> be marked failed. And K8s will not restart the job again.
>>>
>>> [1].
>>> https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Eleanore Jin  于2020年8月4日周二 上午12:05写道:
>>>
 Hi Till,

 Thanks for the reply!

 I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
 Specifically, I build a custom docker image, which I copied the app jar
 (not uber jar) and all its dependencies under /flink/lib.

 So my question is more like, in this case, if the job is marked as
 FAILED, which causes k8s to restart the pod, this seems not help at all,
 what are the suggestions for such scenario?

 Thanks a lot!
 Eleanore

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes

 On Mon, Aug 3, 2020 at 2:13 AM Till Rohrmann 
 wrote:

> Hi Eleanore,
>
> how are you deploying Flink exactly? Are you using the application
> mode with native K8s support to deploy a cluster [1] or are you manually
> deploying a per-job mode [2]?
>
> I believe the problem might be that we terminate the Flink process
> with a non-zero exit code if the job reaches the ApplicationStatus.FAILED
> [3].
>
> cc Yang Wang have you observed a similar behavior when running Flink
> in per-job mode on K8s?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#flink-kubernetes-application
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#job-cluster-resource-definitions
> [3]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L32
>
> On Fri, Jul 31, 2020 at 6:26 PM Eleanore Jin 
> wrote:
>
>> Hi Experts,
>>
>> I have a flink cluster (per job mode) running on kubernetes. The job
>> is configured with restart strategy
>>
>> restart-strategy.fixed-delay.attempts: 
>> 3restart-strategy.fixed-delay.delay: 10 s
>>
>>
>> So after 3 times retry, the job will be marked as FAILED, hence the
>> pods are not running. However, kubernetes will then restart the job again
>> as the available replicas do not match the desired one.
>>
>> I wonder what are the suggestions for such a scenario? How should I
>> configure the flink job running on k8s?
>>
>> Thanks a lot!
>> Eleanore
>>
>


Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running

2020-08-04 文章 bradyMk
您好:
请问这是flink这个版本自身的bug么?那就意味着没有办法解决了吧,只能手动kill掉?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running

2020-08-04 文章 bradyMk
您好:
您说的完整的log是这个吧?还麻烦帮我看一下
jobmanager_log.txt
  



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


slot????????

2020-08-04 文章 ??????
hi ??





flink??1.9streamgraph??780,??slot964,780
??flinkuigraph

  


  ??slot


Re: Re: FLINK SQL view的数据复用问题

2020-08-04 文章 godfrey he
调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用

kandy.wang  于2020年8月4日周二 下午6:21写道:

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> @ godfrey
> thanks。刚试了一下,source -> Deduplicate  ->
> GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source +
> Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
>
>
> 在 2020-08-04 17:26:02,"godfrey he"  写道:
> >blink planner支持将多sink的query优化成尽量复用重复计算部分。
> >1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
> >
> >kandy.wang  于2020年8月4日周二 下午5:20写道:
> >
> >> FLINK SQL view相关问题:
> >> create view order_source
> >>
> >> as
> >>
> >> select order_id, order_goods_id, user_id,...
> >>
> >> from (
> >>
> >> ..  proctime,row_number() over(partition by order_id,
> >> order_goods_id order by proctime desc) as rownum
> >>
> >> from hive.temp_dw.dm_trd_order_goods/*+ OPTIONS('
> properties.group.id'='flink_etl_kafka_hbase',
> >> 'scan.startup.mode'='latest-offset') */
> >>
> >> ) where  rownum = 1 and  price > 0;
> >>
> >>
> >>
> >>
> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
> as
> >> BIGINT),)
> >>
> >> from
> >>
> >> (
> >>
> >> select order_date as rowkey,
> >>
> >> sum(amount) as saleN,
> >>
> >> from order_source
> >>
> >> group by order_date
> >>
> >> );
> >>
> >>
> >>
> >>
> >> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN
> as
> >> BIGINT))
> >>
> >> from
> >>
> >> (
> >>
> >> select order_hour as rowkey,sum(amount) as saleN,
> >>
> >>
> >>
> >> from order_source
> >>
> >> group by order_hour
> >>
> >> );
> >> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
> >> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  -> sink
> >> 2
> >>
> >>
> >> 本意是想通过view  order_source
> >> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
> >>
> >>
>


Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running

2020-08-04 文章 JasonLee
hi
我记得我用1.6.0版本的时候就有这个问题 好像是没有对应的jira 不过我用新版本已经没有遇到这个问题了 应该是偶尔会出现



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-04 文章 jincheng sun
Hi Seth and David,

I'm very happy to have your reply and suggestions. I would like to share my
thoughts here:

The main motivation we want to refactor the PyFlink doc is that we want to
make sure that the Python users could find all they want starting from the
PyFlink documentation mainpage. That’s, the PyFlink documentation should
have a catalogue which includes all the functionalities available in
PyFlink. However, this doesn’t mean that we will make a copy of the content
of the documentation in the other places. It may be just a reference/link
to the other documentation if needed. For the documentation added under
PyFlink mainpage, the principle is that it should only include Python
specific content, instead of making a copy of the Java content.

>>  I'm concerned that this proposal duplicates a lot of content that will
quickly get out of sync. It feels like it is documenting PyFlink separately
from the rest of the project.

Regarding the concerns about maintainability, as mentioned above, The goal
of this FLIP is to provide an intelligible entrance of Python API, and the
content in it should only contain the information which is useful for
Python users. There are indeed many agenda items that duplicate the Java
documents in this FLIP, but it doesn't mean the content would be copied
from Java documentation. i.e, if the content of the document is the same as
the corresponding Java document, we will add a link to the Java document.
e.g. the "Built-in functions" and "SQL". We only create a page for the
Python-only content, and then redirect to the Java document if there is
something shared with Java. e.g. "Connectors" and "Catalogs". If the
document is Python-only and already exists, we will move it from the old
python document to the new python document, e.g. "Configurations". If the
document is Python-only and not exists before, we will create a new page
for it. e.g. "DataTypes".

The main reason we create a new page for Python Data Types is that it is
only conceptually one-to-one correspondence with Java Data Types, but the
actual document content would be very different from Java DataTypes. Some
detailed difference are as following:



  - The text in the Java Data Types document is written for JVM-based
language users, which is incomprehensible to users who only understand
python.

  - Currently the Python Data Types does not support the "bridgedTo"
method, DataTypes.RAW, DataTypes.NULL and User Defined Types.

  - The section "Planner Compatibility" and "Data Type Extraction" are only
useful for Java/Scala users.

  - We want to add sections which may only apply for Python such as which
Data Types are currently supported in Python, the mapping between DataType
and Python object type, etc.

I think the root cause of such a difference with existing documents is
that, Python is the first non-JVM language we support in flink. This means
our previous method of sharing documents between Java and Scala may not be
suitable for Python. So we will adopt some very different methods to
provide documentation for Python users. Of course, we should reduce
maintenance costs as much as possible while ensuring user experience.
Furthermore, python is the first step of flink multi-language support, and
there may be R, Go, etc in future. it is very necessary for us to form main
page for each language, so that users of each type of language can focus on
the content which they care about.

>> Things like the cookbook and tutorial should be under the Try Flink
section of the documentation.

Regarding the position of the "Cookbook" section, in my sense the "Try
Flink" is for the new users and the "Cookbook" is for more advanced users,
i.e., In “Try Flink” can be the simplest end-to-end example, such as “Hello
World” and In “Cookbook” we can add more use cases closer to production
business, Such as, CDN log analysis, PV / UV of e-commerce. So I prefer to
keep the current structure.

>>  it's relatively straightforward to compare the Python API with the Java
and Scala versions.

Regarding the comparison between Python API and Java/Scala API, I think the
majority of users, especially the beginner users, would not have this
demand. The priority of increasing user experience for beginner users seems
higher than it from my side. Would you please add more inputs for why user
want to compare? How much impact will the comparison be if we put it on
multiple pages :)

Thanks for all of your feedback and suggestions, any follow-up feedback is
welcome.

Best,

Jincheng


David Anderson  于2020年8月3日周一 下午10:49写道:

> Jincheng,
>
> One thing that I like about the way that the documentation is currently
> organized is that it's relatively straightforward to compare the Python API
> with the Java and Scala versions. I'm concerned that if the PyFlink docs
> are more independent, it will be challenging to respond to questions about
> which features from the other APIs are available from Python.
>
> David
>
> On Mon, Aug 3, 2020 at 8:07 AM 

Re:Re: FLINK SQL view的数据复用问题

2020-08-04 文章 kandy.wang















@ godfrey
thanks。刚试了一下,source -> Deduplicate  -> 
GlobalGroupAggregate,在souce端确实是复用了。但是Deduplicate 端是没有复用呢?理论上source + 
Deduplicate 都是view里的逻辑,都应该复用才对。就是感觉复用的还不够多呢。
 

在 2020-08-04 17:26:02,"godfrey he"  写道:
>blink planner支持将多sink的query优化成尽量复用重复计算部分。
>1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
>
>kandy.wang  于2020年8月4日周二 下午5:20写道:
>
>> FLINK SQL view相关问题:
>> create view order_source
>>
>> as
>>
>> select order_id, order_goods_id, user_id,...
>>
>> from (
>>
>> ..  proctime,row_number() over(partition by order_id,
>> order_goods_id order by proctime desc) as rownum
>>
>> from hive.temp_dw.dm_trd_order_goods/*+ 
>> OPTIONS('properties.group.id'='flink_etl_kafka_hbase',
>> 'scan.startup.mode'='latest-offset') */
>>
>> ) where  rownum = 1 and  price > 0;
>>
>>
>>
>>
>> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as
>> BIGINT),)
>>
>> from
>>
>> (
>>
>> select order_date as rowkey,
>>
>> sum(amount) as saleN,
>>
>> from order_source
>>
>> group by order_date
>>
>> );
>>
>>
>>
>>
>> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as
>> BIGINT))
>>
>> from
>>
>> (
>>
>> select order_hour as rowkey,sum(amount) as saleN,
>>
>>
>>
>> from order_source
>>
>> group by order_hour
>>
>> );
>> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
>> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  -> sink
>> 2
>>
>>
>> 本意是想通过view  order_source
>> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
>>
>>


Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running

2020-08-04 文章 Yang Wang
@bradyMk,你可以把完整的JM
log发一下吗,这样我们能看一下Flink的YarnResourceManager为什么没有执行deregister的逻辑

@JasonLee,你说的bug是什么呢,已经有对应的JIRA了吗

Best,
Yang

JasonLee <17610775...@163.com> 于2020年8月4日周二 下午4:33写道:

> hi
> 这本身就是一个bug 应该是还没有修复
>
>
> | |
> JasonLee
> |
> |
> 邮箱:17610775...@163.com
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年08月04日 15:41,bradyMk 写道:
> 您好
> 我这边是用perJob的方式提交的,而且这种现象还是偶发性的,这次错误日志是这样的:
>
> 2020-08-04 10:30:14,475 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> flink2Ots (e11a22af324049217fdff28aca9f73a5) switched from state FAILING to
> FAILED.
> java.lang.Exception: Container released on a *lost* node
>at
>
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:370)
>at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
>at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
>at
>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>at
>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>at
>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2020-08-04 10:30:14,476 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not
> restart the job flink2Ots (e11a22af324049217fdff28aca9f73a5) because the
> restart strategy prevented it.
> java.lang.Exception: Container released on a *lost* node
>at
>
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:370)
>at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
>at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
>at
>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>at
>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>at
>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2020-08-04 10:30:14,476 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping
> checkpoint coordinator for job e11a22af324049217fdff28aca9f73a5.
> 2020-08-04 10:30:14,476 INFO
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  -
> Shutting down
>
> 但是我之前也遇到过这个错误时,yarn上的application是可以退出的。
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-04 文章 Yang Wang
@Till Rohrmann  In native mode, when a Flink
application terminates with FAILED state, all the resources will be cleaned
up.

However, in standalone mode, I agree with you that we need to rethink the
exit code of Flink. When a job exhausts the restart
strategy, we should terminate the pod and do not restart again. After
googling, it seems that we could not specify the restartPolicy
based on exit code[1]. So maybe we need to return a zero exit code to avoid
restarting by K8s.

[1].
https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code

Best,
Yang

Till Rohrmann  于2020年8月4日周二 下午3:48写道:

> @Yang Wang  I believe that we should rethink the
> exit codes of Flink. In general you want K8s to restart a failed Flink
> process. Hence, an application which terminates in state FAILED should not
> return a non-zero exit code because it is a valid termination state.
>
> Cheers,
> Till
>
> On Tue, Aug 4, 2020 at 8:55 AM Yang Wang  wrote:
>
>> Hi Eleanore,
>>
>> I think you are using K8s resource "Job" to deploy the jobmanager. Please
>> set .spec.template.spec.restartPolicy = "Never" and spec.backoffLimit = 0.
>> Refer here[1] for more information.
>>
>> Then, when the jobmanager failed because of any reason, the K8s job will
>> be marked failed. And K8s will not restart the job again.
>>
>> [1].
>> https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup
>>
>>
>> Best,
>> Yang
>>
>> Eleanore Jin  于2020年8月4日周二 上午12:05写道:
>>
>>> Hi Till,
>>>
>>> Thanks for the reply!
>>>
>>> I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
>>> Specifically, I build a custom docker image, which I copied the app jar
>>> (not uber jar) and all its dependencies under /flink/lib.
>>>
>>> So my question is more like, in this case, if the job is marked as
>>> FAILED, which causes k8s to restart the pod, this seems not help at all,
>>> what are the suggestions for such scenario?
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes
>>>
>>> On Mon, Aug 3, 2020 at 2:13 AM Till Rohrmann 
>>> wrote:
>>>
 Hi Eleanore,

 how are you deploying Flink exactly? Are you using the application mode
 with native K8s support to deploy a cluster [1] or are you manually
 deploying a per-job mode [2]?

 I believe the problem might be that we terminate the Flink process with
 a non-zero exit code if the job reaches the ApplicationStatus.FAILED [3].

 cc Yang Wang have you observed a similar behavior when running Flink in
 per-job mode on K8s?

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#flink-kubernetes-application
 [2]
 https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#job-cluster-resource-definitions
 [3]
 https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L32

 On Fri, Jul 31, 2020 at 6:26 PM Eleanore Jin 
 wrote:

> Hi Experts,
>
> I have a flink cluster (per job mode) running on kubernetes. The job
> is configured with restart strategy
>
> restart-strategy.fixed-delay.attempts: 
> 3restart-strategy.fixed-delay.delay: 10 s
>
>
> So after 3 times retry, the job will be marked as FAILED, hence the
> pods are not running. However, kubernetes will then restart the job again
> as the available replicas do not match the desired one.
>
> I wonder what are the suggestions for such a scenario? How should I
> configure the flink job running on k8s?
>
> Thanks a lot!
> Eleanore
>



Re: FLINK SQL view的数据复用问题

2020-08-04 文章 godfrey he
blink planner支持将多sink的query优化成尽量复用重复计算部分。
1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务

kandy.wang  于2020年8月4日周二 下午5:20写道:

> FLINK SQL view相关问题:
> create view order_source
>
> as
>
> select order_id, order_goods_id, user_id,...
>
> from (
>
> ..  proctime,row_number() over(partition by order_id,
> order_goods_id order by proctime desc) as rownum
>
> from hive.temp_dw.dm_trd_order_goods/*+ 
> OPTIONS('properties.group.id'='flink_etl_kafka_hbase',
> 'scan.startup.mode'='latest-offset') */
>
> ) where  rownum = 1 and  price > 0;
>
>
>
>
> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as
> BIGINT),)
>
> from
>
> (
>
> select order_date as rowkey,
>
> sum(amount) as saleN,
>
> from order_source
>
> group by order_date
>
> );
>
>
>
>
> insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as
> BIGINT))
>
> from
>
> (
>
> select order_hour as rowkey,sum(amount) as saleN,
>
>
>
> from order_source
>
> group by order_hour
>
> );
> 问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
> 最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  -> sink
> 2
>
>
> 本意是想通过view  order_source
> (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 ,如何做到 ?
>
>


Re: Re: 有界数据中batch和stream的区别

2020-08-04 文章 godfrey he
你的运行环境是啥?能提供一下相关配置吗?

chenxuying  于2020年8月4日周二 下午2:46写道:

> 你好,请问下我修改后的语句是
> insert into print_sink select game_id,count(id) from mysql_source group by
> game_id
> 然后在执行的时候如果选择的是streamMode他会打印出Changelog,如下
> 2> +I(12,1)
> 5> +I(12555,1) 1> +I(122,1) 3> +I(13,1) 6> +I(1,1) 6> -U(1,1) 6> +U(1,2)
> 6> -U(1,2) 6> +U(1,3) 6> -U(1,3) 6> +U(1,4) 6> -U(1,4)
>
>
> 然后如果我使用的是batchMode,他就报错了
> org.apache.flink.util.FlinkException: Error while shutting the
> TaskExecutor down.
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:440)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$2(TaskExecutor.java:425)
> ...
> Caused by: java.util.concurrent.CompletionException:
> java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.flink.util.JavaGcCleanerWrapper
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> ...
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
> ... 21 more
> Suppressed: org.apache.flink.util.FlinkException: Could not properly shut
> down the TaskManager services.
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:236)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.stopTaskExecutorServices(TaskExecutor.java:462)
> at...
> ... 21 more
> Caused by: org.apache.flink.util.FlinkException: Could not close resource.
> at
> org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42)
> at
> org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:204)
> ... 37 more
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.flink.util.JavaGcCleanerWrapper
> at
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:94)
> at
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.verifyEmpty(UnsafeMemoryBudget.java:64)
> ...
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
> ... 21 more
> [CIRCULAR REFERENCE:java.lang.NoClassDefFoundError: Could not initialize
> class org.apache.flink.util.JavaGcCleanerWrapper]
>
>
> 不知道您是否知道原因
>
>
> 在 2020-08-04 12:11:32,"godfrey he"  写道:
> >逻辑上批产生的结果是Table,流产生的结果是Changelog。
> >你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。
> >最简单的方式可以将query改为带group by的,再看结果的差异。
> >更多关于Table和Changelog的概念可以参考 [1]
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html
> >
> >chenxuying  于2020年8月4日周二 上午11:44写道:
> >
> >> hi :
> >> flink table sql 1.11.0
> >> 在EnvironmentSettings中可以设置BatchMode或StreamingMode
> >>
> >>
> >> EnvironmentSettings environmentSettings =
> EnvironmentSettings.newInstance()
> >> //.inStreamingMode()
> >> .inBatchMode()
> >> .build();
> >>
> >>
> >> 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 ,
> >> 不知道大佬们有没有例子可以比较容易理解
> >> 我的代码
> >> EnvironmentSettings environmentSettings =
> EnvironmentSettings.newInstance()
> >> //.inStreamingMode()
> >> .inBatchMode()
> >> .build();
> >> TableEnvironment tableEnvironment =
> >> TableEnvironment.create(environmentSettings);
> >> tableEnvironment.executeSql("CREATE TABLE mysql_source ( " +
> >> " id bigint, " +
> >> "  game_id varchar, " +
> >> "  PRIMARY KEY (id) NOT ENFORCED  " +
> >> " )  " +
> >> " with ( " +
> >> "'connector' = 'jdbc',  " +
> >> " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
> >> " 'username' = 'root' , " +
> >> " 'password' = 'root', " +
> >> " 'table-name' = 'mysqlsink' , " +
> >> " 'driver' = 'com.mysql.cj.jdbc.Driver' , " +
> >> " 'sink.buffer-flush.interval' = '2s', " +
> >> " 'sink.buffer-flush.max-rows' = '300' " +
> >> " )");
> >> tableEnvironment.executeSql("CREATE TABLE print_sink ( " +
> >> " id bigint, " +
> >> "  game_id varchar, " +
> >> "  PRIMARY KEY (id) NOT ENFORCED  " +
> >> " )  " +
> >> " with ( " +
> >> "'connector' = 'print'  " +
> >> " )");
> >> tableEnvironment.executeSql("insert into print_sink select id,game_id
> from
> >> mysql_source");
>


FLINK SQL view的数据复用问题

2020-08-04 文章 kandy.wang
FLINK SQL view相关问题:
create view order_source

as

select order_id, order_goods_id, user_id,...

from (

..  proctime,row_number() over(partition by order_id, order_goods_id 
order by proctime desc) as rownum

from hive.temp_dw.dm_trd_order_goods/*+ 
OPTIONS('properties.group.id'='flink_etl_kafka_hbase', 
'scan.startup.mode'='latest-offset') */

) where  rownum = 1 and  price > 0; 




insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as 
BIGINT),)

from

(

select order_date as rowkey,

sum(amount) as saleN,

from order_source

group by order_date

);




insert into hive.temp_dw.day_order_index select rowkey, ROW(cast(saleN as 
BIGINT))

from

(

select order_hour as rowkey,sum(amount) as saleN,



from order_source

group by order_hour

);
问题:同一个view,相同的消费group,不同的sink,产生 2个job。 这样的话,相当于2个job公用一个consumer group。
最后生成的job是 : a.  order_source  -> sink  1  b.  order_source  -> sink  2


本意是想通过view  order_source (view里需要对订单数据去重)复用同一份source全量数据,对应底层可以复用同一份state数据 
,如何做到 ?



Re: 维表实现无法加载配置文件

2020-08-04 文章 李奇
可以用一个静态类加载资源,然后返回一个属性对象properties。

> 在 2020年8月4日,下午4:55,"guaishushu1...@163.com"  写道:
> 
> 维表的Function是集成TableFunction,这样就没办法加载配置文件,大佬们有没有什么好的方式呀???
> 
> 
> 
> guaishushu1...@163.com


维表实现无法加载配置文件

2020-08-04 文章 guaishushu1...@163.com
维表的Function是集成TableFunction,这样就没办法加载配置文件,大佬们有没有什么好的方式呀???



guaishushu1...@163.com


回复:Flink 通过sql client 启动的任务,kill掉之后,是否可以指定checkpoint恢复?

2020-08-04 文章 JasonLee
HI
目前sql-client的方式应该还不支持从指定的checkpoint恢复任务 不过Flink on zeppelin目前已经支持了 有兴趣可以用下


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年08月04日 16:28,mispower 写道:
通过sql_client 启动的streaming的任务,在维护或者异常之后,如何像flink straming 一样通过指定checkpoint 
恢复到上一次的消费节点。
在邮件列表里搜索了相关的问题,好像都没有明确回答。

回复:flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running

2020-08-04 文章 JasonLee
hi
这本身就是一个bug 应该是还没有修复


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年08月04日 15:41,bradyMk 写道:
您好
我这边是用perJob的方式提交的,而且这种现象还是偶发性的,这次错误日志是这样的:

2020-08-04 10:30:14,475 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
flink2Ots (e11a22af324049217fdff28aca9f73a5) switched from state FAILING to
FAILED.
java.lang.Exception: Container released on a *lost* node
   at
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:370)
   at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
   at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
   at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
   at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
   at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
   at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-08-04 10:30:14,476 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not
restart the job flink2Ots (e11a22af324049217fdff28aca9f73a5) because the
restart strategy prevented it.
java.lang.Exception: Container released on a *lost* node
   at
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:370)
   at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
   at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
   at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
   at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
   at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
   at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-08-04 10:30:14,476 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping
checkpoint coordinator for job e11a22af324049217fdff28aca9f73a5.
2020-08-04 10:30:14,476 INFO
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  -
Shutting down

但是我之前也遇到过这个错误时,yarn上的application是可以退出的。



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink 通过sql client 启动的任务,kill掉之后,是否可以指定checkpoint恢复?

2020-08-04 文章 mispower
通过sql_client 启动的streaming的任务,在维护或者异常之后,如何像flink straming 一样通过指定checkpoint 
恢复到上一次的消费节点。
在邮件列表里搜索了相关的问题,好像都没有明确回答。

Kerberos 动态参数认证问题

2020-08-04 文章 sllence
大家好,

目前测试的版本是flink1.11

看jira https://issues.apache.org/jira/browse/FLINK-12130 所示的问题好像并没
有得到解决

flink run -m yarn-cluster -yD security.kerberos.login.keytab = / path / to /
keytab -yD security.kerberos.login.principal = xxx /path/to/test.jar

 

看源码发现安全认证所用的配置还是只使用了配置文件中的配置,并没有把动态参数里
的配置进行合并,求助~





Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-04 文章 Till Rohrmann
@Yang Wang  I believe that we should rethink the
exit codes of Flink. In general you want K8s to restart a failed Flink
process. Hence, an application which terminates in state FAILED should not
return a non-zero exit code because it is a valid termination state.

Cheers,
Till

On Tue, Aug 4, 2020 at 8:55 AM Yang Wang  wrote:

> Hi Eleanore,
>
> I think you are using K8s resource "Job" to deploy the jobmanager. Please
> set .spec.template.spec.restartPolicy = "Never" and spec.backoffLimit = 0.
> Refer here[1] for more information.
>
> Then, when the jobmanager failed because of any reason, the K8s job will
> be marked failed. And K8s will not restart the job again.
>
> [1].
> https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup
>
>
> Best,
> Yang
>
> Eleanore Jin  于2020年8月4日周二 上午12:05写道:
>
>> Hi Till,
>>
>> Thanks for the reply!
>>
>> I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
>> Specifically, I build a custom docker image, which I copied the app jar
>> (not uber jar) and all its dependencies under /flink/lib.
>>
>> So my question is more like, in this case, if the job is marked as
>> FAILED, which causes k8s to restart the pod, this seems not help at all,
>> what are the suggestions for such scenario?
>>
>> Thanks a lot!
>> Eleanore
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes
>>
>> On Mon, Aug 3, 2020 at 2:13 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Eleanore,
>>>
>>> how are you deploying Flink exactly? Are you using the application mode
>>> with native K8s support to deploy a cluster [1] or are you manually
>>> deploying a per-job mode [2]?
>>>
>>> I believe the problem might be that we terminate the Flink process with
>>> a non-zero exit code if the job reaches the ApplicationStatus.FAILED [3].
>>>
>>> cc Yang Wang have you observed a similar behavior when running Flink in
>>> per-job mode on K8s?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#flink-kubernetes-application
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#job-cluster-resource-definitions
>>> [3]
>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L32
>>>
>>> On Fri, Jul 31, 2020 at 6:26 PM Eleanore Jin 
>>> wrote:
>>>
 Hi Experts,

 I have a flink cluster (per job mode) running on kubernetes. The job is
 configured with restart strategy

 restart-strategy.fixed-delay.attempts: 
 3restart-strategy.fixed-delay.delay: 10 s


 So after 3 times retry, the job will be marked as FAILED, hence the
 pods are not running. However, kubernetes will then restart the job again
 as the available replicas do not match the desired one.

 I wonder what are the suggestions for such a scenario? How should I
 configure the flink job running on k8s?

 Thanks a lot!
 Eleanore

>>>


Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running

2020-08-04 文章 bradyMk
您好
我这边是用perJob的方式提交的,而且这种现象还是偶发性的,这次错误日志是这样的:

2020-08-04 10:30:14,475 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
flink2Ots (e11a22af324049217fdff28aca9f73a5) switched from state FAILING to
FAILED.
java.lang.Exception: Container released on a *lost* node
at
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:370)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-08-04 10:30:14,476 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not
restart the job flink2Ots (e11a22af324049217fdff28aca9f73a5) because the
restart strategy prevented it.
java.lang.Exception: Container released on a *lost* node
at
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:370)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-08-04 10:30:14,476 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping
checkpoint coordinator for job e11a22af324049217fdff28aca9f73a5.
2020-08-04 10:30:14,476 INFO 
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  -
Shutting down

但是我之前也遇到过这个错误时,yarn上的application是可以退出的。



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running

2020-08-04 文章 Yang Wang
我怀疑你起的是一个session cluster,如果是perjob的任务,job失败以后application是一定会退出的

你可以把jobmanager的log发一下,这样方便排查问题


Best,
Yang

bradyMk  于2020年8月4日周二 下午2:35写道:

> 您好
> JM应该还在运行,因为Web Ui还可以看,但是我想知道我这个任务明明已经挂掉了,为什么JM还在运行着?这个需要配置什么参数去解决么?
>
>
>
> -
> Best Wishes
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


flink table api 中数据库字段大小写问题

2020-08-04 文章 lgs
Hi,
postgres字段包含大小写。
   postgres_sink = """
CREATE TABLE alarm_history_data (
`recordId` STRING,
`rowtime`  TIMESTAMP(3),
`action`   STRING,
`originalState`STRING,
`newState` STRING,
`originalCause`STRING,
`newCause` STRING,
`ser_name` STRING,
`enb`  STRING,
`eventTime`STRING,
`ceasedTime`   STRING,
`duration` STRING,
`acked`STRING,
`pmdId`STRING,
`pmdTime`  STRING,
 PRIMARY KEY (`recordId`) NOT ENFORCED
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:postgresql://127.0.0.1:5432/postgres',
'connector.table' = 'alarm_history_data',
'connector.driver' = 'org.postgresql.Driver',
'connector.username' = 'postgres',
'connector.password' = 'my_password',
'connector.write.flush.max-rows' = '1'
)
"""

st_env.scan("source").group_by("recordId").select(
"recordId,"
"last_tvalue(actionTime) as rowtime, last_value(action),"
"last_value(originalState) as originalState, last_value(newState),"
"last_value(originalCause), last_value(newCause),"
"last_value(ser_name), last_value(enb), last_value(eventTime),"
"last_value(ceasedTime), last_value(duration), last_value(acked),"
"last_value(pmdId), last_value(pmdTime)"
).insert_into("alarm_history_data")

sink出错,报错是:
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO
alarm_history_data(recordId, rowtime, action, originalState, newState,
originalCause, newCause, ser_name, enb, eventTime, ceasedTime, duration,
acked, pmdId, pmdTime) VALUES ('47357607', '2020-06-03 17:37:44+08',
'Insert', '', 'cleared', '', 'crash', 'Oyama_ENM_MS',
'789198-houshakuzi-RBS6302', '2020-06-03T17:24:57', '2020-06-03T17:29:50',
'293.0', 'No', '0x8002', '2020-06-03T17:22:46') ON CONFLICT (recordId)
DO UPDATE SET recordId=EXCLUDED.recordId, rowtime=EXCLUDED.rowtime,
action=EXCLUDED.action, originalState=EXCLUDED.originalState,
newState=EXCLUDED.newState, originalCause=EXCLUDED.originalCause,
newCause=EXCLUDED.newCause, ser_name=EXCLUDED.ser_name, enb=EXCLUDED.enb,
eventTime=EXCLUDED.eventTime, ceasedTime=EXCLUDED.ceasedTime,
duration=EXCLUDED.duration, acked=EXCLUDED.acked, pmdId=EXCLUDED.pmdId,
pmdTime=EXCLUDED.pmdTime was aborted: ERROR: column "recordid" of relation
"alarm_history_data" does not exist

请问要怎么解决?要怎样才能在最终的sql语句里面加个引号把字段包起来?




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-04 文章 Yang Wang
Hi Eleanore,

I think you are using K8s resource "Job" to deploy the jobmanager. Please
set .spec.template.spec.restartPolicy = "Never" and spec.backoffLimit = 0.
Refer here[1] for more information.

Then, when the jobmanager failed because of any reason, the K8s job will be
marked failed. And K8s will not restart the job again.

[1].
https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup


Best,
Yang

Eleanore Jin  于2020年8月4日周二 上午12:05写道:

> Hi Till,
>
> Thanks for the reply!
>
> I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
> Specifically, I build a custom docker image, which I copied the app jar
> (not uber jar) and all its dependencies under /flink/lib.
>
> So my question is more like, in this case, if the job is marked as FAILED,
> which causes k8s to restart the pod, this seems not help at all, what are
> the suggestions for such scenario?
>
> Thanks a lot!
> Eleanore
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes
>
> On Mon, Aug 3, 2020 at 2:13 AM Till Rohrmann  wrote:
>
>> Hi Eleanore,
>>
>> how are you deploying Flink exactly? Are you using the application mode
>> with native K8s support to deploy a cluster [1] or are you manually
>> deploying a per-job mode [2]?
>>
>> I believe the problem might be that we terminate the Flink process with a
>> non-zero exit code if the job reaches the ApplicationStatus.FAILED [3].
>>
>> cc Yang Wang have you observed a similar behavior when running Flink in
>> per-job mode on K8s?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#flink-kubernetes-application
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#job-cluster-resource-definitions
>> [3]
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L32
>>
>> On Fri, Jul 31, 2020 at 6:26 PM Eleanore Jin 
>> wrote:
>>
>>> Hi Experts,
>>>
>>> I have a flink cluster (per job mode) running on kubernetes. The job is
>>> configured with restart strategy
>>>
>>> restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 
>>> 10 s
>>>
>>>
>>> So after 3 times retry, the job will be marked as FAILED, hence the pods
>>> are not running. However, kubernetes will then restart the job again as the
>>> available replicas do not match the desired one.
>>>
>>> I wonder what are the suggestions for such a scenario? How should I
>>> configure the flink job running on k8s?
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>


Re:Re: 有界数据中batch和stream的区别

2020-08-04 文章 chenxuying
你好,请问下我修改后的语句是
insert into print_sink select game_id,count(id) from mysql_source group by 
game_id
然后在执行的时候如果选择的是streamMode他会打印出Changelog,如下
2> +I(12,1)
5> +I(12555,1) 1> +I(122,1) 3> +I(13,1) 6> +I(1,1) 6> -U(1,1) 6> +U(1,2) 6> 
-U(1,2) 6> +U(1,3) 6> -U(1,3) 6> +U(1,4) 6> -U(1,4)


然后如果我使用的是batchMode,他就报错了
org.apache.flink.util.FlinkException: Error while shutting the TaskExecutor 
down.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:440)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$2(TaskExecutor.java:425)
...
Caused by: java.util.concurrent.CompletionException: 
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.flink.util.JavaGcCleanerWrapper
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
...
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
... 21 more
Suppressed: org.apache.flink.util.FlinkException: Could not properly shut down 
the TaskManager services.
at 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:236)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.stopTaskExecutorServices(TaskExecutor.java:462)
at...
... 21 more
Caused by: org.apache.flink.util.FlinkException: Could not close resource.
at org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42)
at 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:204)
... 37 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.flink.util.JavaGcCleanerWrapper
at 
org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:94)
at 
org.apache.flink.runtime.memory.UnsafeMemoryBudget.verifyEmpty(UnsafeMemoryBudget.java:64)
...
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
... 21 more
[CIRCULAR REFERENCE:java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.flink.util.JavaGcCleanerWrapper]


不知道您是否知道原因


在 2020-08-04 12:11:32,"godfrey he"  写道:
>逻辑上批产生的结果是Table,流产生的结果是Changelog。
>你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。
>最简单的方式可以将query改为带group by的,再看结果的差异。
>更多关于Table和Changelog的概念可以参考 [1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html
>
>chenxuying  于2020年8月4日周二 上午11:44写道:
>
>> hi :
>> flink table sql 1.11.0
>> 在EnvironmentSettings中可以设置BatchMode或StreamingMode
>>
>>
>> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
>> //.inStreamingMode()
>> .inBatchMode()
>> .build();
>>
>>
>> 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 ,
>> 不知道大佬们有没有例子可以比较容易理解
>> 我的代码
>> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
>> //.inStreamingMode()
>> .inBatchMode()
>> .build();
>> TableEnvironment tableEnvironment =
>> TableEnvironment.create(environmentSettings);
>> tableEnvironment.executeSql("CREATE TABLE mysql_source ( " +
>> " id bigint, " +
>> "  game_id varchar, " +
>> "  PRIMARY KEY (id) NOT ENFORCED  " +
>> " )  " +
>> " with ( " +
>> "'connector' = 'jdbc',  " +
>> " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
>> " 'username' = 'root' , " +
>> " 'password' = 'root', " +
>> " 'table-name' = 'mysqlsink' , " +
>> " 'driver' = 'com.mysql.cj.jdbc.Driver' , " +
>> " 'sink.buffer-flush.interval' = '2s', " +
>> " 'sink.buffer-flush.max-rows' = '300' " +
>> " )");
>> tableEnvironment.executeSql("CREATE TABLE print_sink ( " +
>> " id bigint, " +
>> "  game_id varchar, " +
>> "  PRIMARY KEY (id) NOT ENFORCED  " +
>> " )  " +
>> " with ( " +
>> "'connector' = 'print'  " +
>> " )");
>> tableEnvironment.executeSql("insert into print_sink select id,game_id from
>> mysql_source");


Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running

2020-08-04 文章 bradyMk
您好
JM应该还在运行,因为Web Ui还可以看,但是我想知道我这个任务明明已经挂掉了,为什么JM还在运行着?这个需要配置什么参数去解决么?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink run-application 怎样设置配置文件的环境变量

2020-08-04 文章 Yang Wang
Zhou Zach 你是正确的,应该使用-D来传递,但是参数不正确
使用-Dcontainerized.master.env.HBASE_CONF_PATH='/etc/hbase/conf'来设置JM的环境变量
使用-Dcontainerized.taskmanager.env.HBASE_CONF_PATH='/etc/hbase/conf'来设置TM的环境变量

Best,
Yang

shizk233  于2020年8月3日周一 下午10:54写道:

> 在yarn上各节点起作用的环境变量应该是用-yD设置
>
> Zhou Zach  于2020年8月3日周一 下午6:35写道:
>
> > Hi all,
> >
> > 通过如下方式设置HBASE_CONF_PATH变量,提交到yarn时,发现HBASE_CONF_PATH没有生效,
> >
> >
> > /opt/flink-1.11.1/bin/flink run-application -t yarn-application \
> > -DHBASE_CONF_PATH='/etc/hbase/conf' \
> >
> >
> > 请问flink提交job时,怎样设置环境变量?
>