Re: FlinkSQL Source和Sink的Operator name为什么格式不同

2021-09-29 Thread yidan zhao
我猜哈,是因为source支持多张表。比如多个表union支持select的情况。

Ada Luna  于2021年9月29日周三 下午6:02写道:

> Source: TableSourceScan(table=[[default_catalog, default_database,
> ods_k]], fields=[id, name])
> Sink: Sink(table=[default_catalog.default_database.ads_k], fields=[id,
> name])
> Sink: Sink(table=[default_catalog.default_database.ads_k2], fields=[id,
> name]))
>
>
> TableSourceScan 和 Sink相比多了个 中括号,并且采用 ',' 分割名字功空间,这是为什么
>


Flink Forward Asia 2021 议题征集

2021-09-29 Thread 王翔
各位开发者:



今年的 Flink Forward Asia 2021 已正式启动!将于 12 月 4-5 日在北京·国家会议中心举办,预计将有 3000+ 
开发者参与,探讨交流 Flink 最新动态。




现议题征集进行中!延续 FFA 
惯例,会议所有议题均为开放征集而来,不设行业以及公司规模的限制,并由专业的议题评选委员会评分筛选,内容完全去商业化,代表行业领先水平。




在 Flink Forward Asia 
2021,您可与全球开发者分享您的真知灼见,同各技术领域顶级专家面对面交流,探索数字化技术下的未来趋势。如果您在技术方向有积累与洞察,欢迎投递议题!




英雄不问出处,Flink Forward Aisa 2021 等你!




议题投递链接:https://survey.aliyun.com/apps/zhiliao/gWtaFxAXG




议题征集原文:https://mp.weixin.qq.com/s/pbMtJ2jOy_EFlYlCek34qQ




详情可添加小松鼠微信咨询:







Flink 中文学习网站:https://flink-learning.org.cn/




更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~




Re: Flink1.14资源精调相关了解

2021-09-29 Thread yidan zhao
参考Flink文档
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/flink-architecture/#task-slots-and-resources
中,Task Slots and Resources部分。
这部分讲slot对cpu无隔离,mem也只是对managed mem限制。
所以新版提供的精调的slot,只是每个slot可以使用不同的cpu+mem的数字,实际还是没隔离。 只能说从调度层面作为一个新的分配方案。
也即是slot仍然只有调度层的意义,并没有资源层面的含义。



yidan zhao  于2021年9月30日周四 上午11:14写道:

> 如题,目前1.14支持精调,但按照我之前对slot的理解,实际flink没有做到mem和cpu的隔离? 难道是理解错了吗。
>
> 我之前只把slot理解为对单个TaskManager上线程数的限制,当然考虑到slot sharing
> 机制,以及单个subtask本身也不一定是单线程(比如存在background线程),这个限制不是绝对数字限制。
> 但可以认为单个TM有n个slot,就最多能跑n个subtask pipeline。 但实际上slot与slot之间的mem、cpu是没有区分的。
>
> 如上理解有问题吗。
>


Flink1.14资源精调相关了解

2021-09-29 Thread yidan zhao
如题,目前1.14支持精调,但按照我之前对slot的理解,实际flink没有做到mem和cpu的隔离? 难道是理解错了吗。

我之前只把slot理解为对单个TaskManager上线程数的限制,当然考虑到slot sharing
机制,以及单个subtask本身也不一定是单线程(比如存在background线程),这个限制不是绝对数字限制。
但可以认为单个TM有n个slot,就最多能跑n个subtask pipeline。 但实际上slot与slot之间的mem、cpu是没有区分的。

如上理解有问题吗。


????: flink 1.13.2 ????avg??????int????????????????????????int??????????????????

2021-09-29 Thread wukon...@foxmail.com
Hi Asahi :
    ??int ??cast( xx as Decimal)  



wukon...@foxmail.com
 
 Asahi Lee
?? 2021-09-27 14:29
 user-zh
?? flink 1.13.2 
avg??intint??
hi!   ??flink 1.13.2?? int 
??avg?? int  
double??decimal??bug 


Re: [ANNOUNCE] Apache Flink 1.14.0 released

2021-09-29 Thread Yangze Guo
Thanks, Xintong, Joe, Dawid for the great work, thanks to everyone involved!

Best,
Yangze Guo

On Thu, Sep 30, 2021 at 12:02 AM Rion Williams  wrote:
>
> Great news all! Looking forward to it!
>
> > On Sep 29, 2021, at 10:43 AM, Theo Diefenthal 
> >  wrote:
> >
> > 
> > Awesome, thanks for the release.
> >
> > - Ursprüngliche Mail -
> > Von: "Dawid Wysakowicz" 
> > An: "dev" , "user" , 
> > annou...@apache.org
> > Gesendet: Mittwoch, 29. September 2021 15:59:47
> > Betreff: [ANNOUNCE] Apache Flink 1.14.0 released
> >
> > The Apache Flink community is very happy to announce the release of
> > Apache Flink 1.14.0.
> >
> > Apache Flink® is an open-source stream processing framework for
> > distributed, high-performing, always-available, and accurate data
> > streaming applications.
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Please check out the release blog post for an overview of the
> > improvements for this bugfix release:
> > https://flink.apache.org/news/2021/09/29/release-1.14.0.html
> >
> > The full release notes are available in Jira:
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349614
> >
> > We would like to thank all contributors of the Apache Flink community
> > who made this release possible!
> >
> > Regards,
> > Xintong, Joe, Dawid


Flink application mode with no ui , how to start job using k8s ?

2021-09-29 Thread Dhiru
Hi ,
   My requirement is to create Flink cluster application Mode on k8s and do not 
want to expose UI, my requirement is to start the long-running  job which can 
be instantiated at boot time of flink and keep running
use these resource files from jobmanager-application-ha.yaml and 
taskmanager-job-deployment.yaml for creating cluster 
(https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#application-cluster-resource-definitions)

a)  I need to start job during run time , I can bundle my jar with Flink image, 
so that can  instantiate jar 
b) Can I apply HPA (horizontal pod autoscalar) for task manager, will this work 
so that according to workload instance of  taskmanager goes up and down.
--kumar




Re: Flink run different jars

2021-09-29 Thread Qihua Yang
Thanks a lot Yangze. That is very helpful!

On Tue, Sep 28, 2021 at 11:11 PM Yangze Guo  wrote:

> You need to edit the conf/workers. Example of the config[1] and the
> process[2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-and-stopping-a-cluster
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-a-standalone-cluster-session-mode
>
> Best,
> Yangze Guo
>
> On Wed, Sep 29, 2021 at 1:02 PM Qihua Yang  wrote:
> >
> > Hi Yangze,
> >
> > Thanks a lot for your reply. References are very helpful!
> > Another quick question. Reference 1 can start a standalone cluster
> (session Mode). That cluster has a jobManager. I can submit job to run. How
> about taskManger? Do I need to manually start multiple taskManagers?
> > Is there a complete example to show the process?
> >
> > Thanks,
> > Qihua
> >
> >
> > On Tue, Sep 28, 2021 at 7:02 PM Yangze Guo  wrote:
> >>
> >> Hi, Qihua
> >>
> >> IIUC, what you want might be a standalone cluster[1] or session
> cluster[2][3].
> >>
> >> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/
> >> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#session-mode
> >> [3]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Wed, Sep 29, 2021 at 5:57 AM Qihua Yang  wrote:
> >> >
> >> > Hi,
> >> >
> >> > Is that possible to run a flink app without a job? What I am trying
> to do is I build multiple jars. And switch jar to run different jobs.
> >> > I am not sure if flink supports this mode. I saw rest API can upload
> jar, cancel job and run a jar.
> >> > Right now I can upload a jar to flink. But when I cancel a job, flink
> will restart automatically. I checked log. It show below logs. Can anyone
> help me out?
> >> >
> >> > Caused by:
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException:
> Application Status: CANCELED
> >> > at
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:73)
> >> > ... 41 common frames omitted
> >> > Caused by: org.apache.flink.runtime.client.JobCancellationException:
> Job was cancelled.
> >> > at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:149)
> >> > at
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:64)
> >> > ... 41 common frames omitted
> >> >
> >> > Thanks!
>


Start Flink cluster, k8s pod behavior

2021-09-29 Thread Qihua Yang
Hi,
I deployed flink in session mode. I didn't run any jobs. I saw below logs.
That is normal, same as Flink menual shows.

+ /opt/flink/bin/run-job-manager.sh
Starting HA cluster with 1 masters.
Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g.
Starting taskexecutor daemon on host job-manager-776dcf6dd-xzs8g.

But when I check kubectl, it shows status is Completed. After a while,
status changed to CrashLoopBackOff, and pod restart.
NAME  READY
STATUS RESTARTS   AGE
job-manager-776dcf6dd-xzs8g   0/1 Completed  5
 5m27s

NAME  READY
STATUS RESTARTS   AGE
job-manager-776dcf6dd-xzs8g   0/1 CrashLoopBackOff   5
 7m35s

Anyone can help me understand why?
Why do kubernetes regard this pod as completed and restart? Should I config
something? either Flink side or Kubernetes side? From the Flink manual,
after the cluster is started, I can upload a jar to run the application.

Thanks,
Qihua


RE: FlinkJobNotFoundException

2021-09-29 Thread Gusick, Doug S
Hi Matthias,

Thank you for getting back. We have been looking into upgrading to a newer 
version, but have not completed full testing just yet.

I was unable to find a previous error in the JM logs. You should have received 
an email with details to a “lockbox”. I have uploaded the job manager logs 
there. Please let me know if you need any more information.

Thank you,
Doug

From: Matthias Pohl 
Sent: Wednesday, September 29, 2021 12:00 PM
To: Gusick, Doug S [Engineering] 
Cc: user@flink.apache.org; Erai, Rahul [Engineering] 

Subject: Re: FlinkJobNotFoundException

Hi Doug,
thanks for reaching out to the community. First of all, 1.9.2 is quite an old 
Flink version. You might want to consider upgrading to a newer version. The 
community only offers support for the two most-recent Flink versions. Newer 
version might include fixes for your issue.

But back to your actual problem: The logs you're providing only show that some 
job switched into FINISHED state. Is there some error showing up earlier in the 
logs which you might have missed? It would be helpful if you could share the 
complete JobManager logs to get a better understanding of what's going on.

Best,
Matthias

On Wed, Sep 29, 2021 at 3:47 PM Gusick, Doug S 
mailto:doug.gus...@gs.com>> wrote:
Hello,

We are facing an issue with some of our applications that are submitting a high 
volume of jobs to Flink (we are using v1.9.2). We are observing that numerous 
jobs (in this case 44 out of 350+) fail with the same FlinkJobNotFoundException 
within a 45 second timeframe.

From our client logs, this is the exception we can see:


Calc Engine: Caused by: org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
Flink job (d0991f0ae712a9df710aa03311a32c8c)]

Calc Engine:   at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)

Calc Engine:   at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)

Calc Engine:   at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)

Calc Engine:   at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)

Calc Engine:   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)

Calc Engine:   ... 3 more


This is the first job to fail with the above exception. From the JobManager 
logs, we can see that the job goes to FINISHED State, and then we see the 
following exception:

2021-09-28 04:54:16,936 INFO  [flink-akka.actor.default-dispatcher-28] 
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink Java 
Job at Tue Sep 28 04:48:21 EDT 2021 (d0991f0ae712a9df710aa03311a32c8c) switched 
from state RUNNING to FINISHED.
2021-09-28 04:54:16,937 INFO  [flink-akka.actor.default-dispatcher-28] 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
d0991f0ae712a9df710aa03311a32c8c reached globally terminal state FINISHED.
2021-09-28 04:54:16,939 INFO  [flink-akka.actor.default-dispatcher-28] 
org.apache.flink.runtime.jobmaster.JobMaster  - Stopping the 
JobMaster for job Flink Java Job at Tue Sep 28 04:48:21 EDT 
2021(d0991f0ae712a9df710aa03311a32c8c).
2021-09-28 04:54:16,940 INFO  [flink-akka.actor.default-dispatcher-39] 
org.apache.flink.yarn.YarnResourceManager - Disconnect job 
manager 
0...@akka.tcp://fl...@d43723-714.dc.gs.com:44887/user/jobmanager_392
 for job d0991f0ae712a9df710aa03311a32c8c from the resource manager.
2021-09-28 04:54:18,256 ERROR [flink-akka.actor.default-dispatcher-91] 
org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
Exception occurred in REST handler: 
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
Flink job (d0991f0ae712a9df710aa03311a32c8c)


Here are the relevant logs from the TaskManager. We can see that the 
JobLeaderService tries to reconnect to the job. Any ideas as to why it is 
trying to reconnect?:

2021-09-28 04:54:13,382 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Receive slot 
request b26c04706fd5aad03dfdca8691f1bf1c for job 
d0991f0ae712a9df710aa03311a32c8c from resource manager with leader id 
.

2021-09-28 04:54:13,383 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.taskexecutor.JobLeaderService- Add job 
d0991f0ae712a9df710aa03311a32c8c for job leader monitoring.

2021-09-28 04:54:13,397 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.taskexecutor.JobLeaderService- Successful 
registration at job manager 
akka.tcp://fl...@d43723-714.dc.gs.com:44887/user/jobmanager_392 for job 
d0991f0ae712a9df710aa03311a32c8c.

2021-09-28 04:54:13,397 INFO  

Re: Unable to connect to Mesos on mesos-appmaster.sh start

2021-09-29 Thread Matthias Pohl
...and if possible, it would be helpful to provide debug logs as well.

On Wed, Sep 29, 2021 at 6:33 PM Matthias Pohl 
wrote:

> May you provide the entire JobManager logs so that we can see what's going
> on?
>
> On Wed, Sep 29, 2021 at 12:42 PM Javier Vegas  wrote:
>
>> Thanks again, Matthias!
>>
>> Putting  -Djobmanager.rpc.address=$HOST and  -Djobmanager.rpc.port=$PORT0
>> as params for appmaster.sh
>> I see in tog they seem to transform in the correct values
>>
>> -Djobmanager.rpc.address=10.0.23.35 -Djobmanager.rpc.port=31009
>>
>> but a bit later the appmaster dies with this new error. it is unclear
>> what address it is trying to bind, I added explicit params
>> -Drest.bind-port=8081 and
>>   -Drest.port=8081 in case jobmanager.rpc.port was somehow
>> interfering, but that didn't help.
>>
>> 2021-09-29 10:29:59.845 [main] INFO  
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Shutting 
>> MesosSessionClusterEntrypoint down with application status FAILED. 
>> Diagnostics java.net.BindException: Cannot assign requested address
>>  at java.base/sun.nio.ch.Net.bind0(Native Method)
>>  at java.base/sun.nio.ch.Net.bind(Unknown Source)
>>  at java.base/sun.nio.ch.Net.bind(Unknown Source)
>>  at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:134)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:550)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1334)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:506)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:491)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:973)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.bind(AbstractChannel.java:248)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:356)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>>  at java.base/java.lang.Thread.run(Unknown Source)
>>
>> On Wed, Sep 29, 2021 at 2:36 AM Matthias Pohl 
>> wrote:
>>
>>> The port has its separate configuration parameter jobmanager.rpc.port [1]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#jobmanager-rpc-port-1
>>>
>>> On Wed, Sep 29, 2021 at 10:11 AM Javier Vegas  wrote:
>>>
 Matthias, thanks for the suggestion! I changed my
 jobmanager.rpc.address param from $HOSTNAME to $HOST:$PORT0 which in the
 log I see resolves properly to the host IP and port mapped to 8081

 2021-09-29 07:58:05.452 [main] INFO
  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  -
 -Djobmanager.rpc.address=10.0.22.114:31894

 which is very promising. But sadly a little bit later appmaster dies
 with this errror:

 2021-09-29 07:58:05.648 [main] INFO
  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Initializing
 cluster services.
 2021-09-29 07:58:05.674 [main] INFO
  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Shutting
 MesosSessionClusterEntrypoint down with application status FAILED.
 Diagnostics org.apache.flink.configurati
 on.IllegalConfigurationException: The configured hostname is not valid
 at
 org.apache.flink.util.NetUtils.unresolvedHostToNormalizedString(NetUtils.java:179)
 at
 org.apache.flink.util.NetUtils.unresolvedHostAndPortToNormalizedString(NetUtils.java:197)
 at
 org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:207)
 at
 org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:152)
 at
 org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:370)

Re: Unable to connect to Mesos on mesos-appmaster.sh start

2021-09-29 Thread Matthias Pohl
May you provide the entire JobManager logs so that we can see what's going
on?

On Wed, Sep 29, 2021 at 12:42 PM Javier Vegas  wrote:

> Thanks again, Matthias!
>
> Putting  -Djobmanager.rpc.address=$HOST and  -Djobmanager.rpc.port=$PORT0
> as params for appmaster.sh
> I see in tog they seem to transform in the correct values
>
> -Djobmanager.rpc.address=10.0.23.35 -Djobmanager.rpc.port=31009
>
> but a bit later the appmaster dies with this new error. it is unclear what
> address it is trying to bind, I added explicit params
> -Drest.bind-port=8081 and
>   -Drest.port=8081 in case jobmanager.rpc.port was somehow
> interfering, but that didn't help.
>
> 2021-09-29 10:29:59.845 [main] INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Shutting 
> MesosSessionClusterEntrypoint down with application status FAILED. 
> Diagnostics java.net.BindException: Cannot assign requested address
>   at java.base/sun.nio.ch.Net.bind0(Native Method)
>   at java.base/sun.nio.ch.Net.bind(Unknown Source)
>   at java.base/sun.nio.ch.Net.bind(Unknown Source)
>   at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:134)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:550)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1334)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:506)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:491)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:973)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.bind(AbstractChannel.java:248)
>   at 
> org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:356)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>   at java.base/java.lang.Thread.run(Unknown Source)
>
> On Wed, Sep 29, 2021 at 2:36 AM Matthias Pohl 
> wrote:
>
>> The port has its separate configuration parameter jobmanager.rpc.port [1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#jobmanager-rpc-port-1
>>
>> On Wed, Sep 29, 2021 at 10:11 AM Javier Vegas  wrote:
>>
>>> Matthias, thanks for the suggestion! I changed my jobmanager.rpc.address
>>> param from $HOSTNAME to $HOST:$PORT0 which in the log I see resolves
>>> properly to the host IP and port mapped to 8081
>>>
>>> 2021-09-29 07:58:05.452 [main] INFO
>>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  -
>>> -Djobmanager.rpc.address=10.0.22.114:31894
>>>
>>> which is very promising. But sadly a little bit later appmaster dies
>>> with this errror:
>>>
>>> 2021-09-29 07:58:05.648 [main] INFO
>>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Initializing
>>> cluster services.
>>> 2021-09-29 07:58:05.674 [main] INFO
>>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Shutting
>>> MesosSessionClusterEntrypoint down with application status FAILED.
>>> Diagnostics org.apache.flink.configurati
>>> on.IllegalConfigurationException: The configured hostname is not valid
>>> at
>>> org.apache.flink.util.NetUtils.unresolvedHostToNormalizedString(NetUtils.java:179)
>>> at
>>> org.apache.flink.util.NetUtils.unresolvedHostAndPortToNormalizedString(NetUtils.java:197)
>>> at
>>> org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:207)
>>> at
>>> org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:152)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:370)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:344)
>>> at
>>> 

Re: [ANNOUNCE] Apache Flink 1.14.0 released

2021-09-29 Thread Rion Williams
Great news all! Looking forward to it!

> On Sep 29, 2021, at 10:43 AM, Theo Diefenthal 
>  wrote:
> 
> 
> Awesome, thanks for the release.
> 
> - Ursprüngliche Mail -
> Von: "Dawid Wysakowicz" 
> An: "dev" , "user" , 
> annou...@apache.org
> Gesendet: Mittwoch, 29. September 2021 15:59:47
> Betreff: [ANNOUNCE] Apache Flink 1.14.0 released
> 
> The Apache Flink community is very happy to announce the release of
> Apache Flink 1.14.0.
>  
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data
> streaming applications.
>  
> The release is available for download at:
> https://flink.apache.org/downloads.html
>  
> Please check out the release blog post for an overview of the
> improvements for this bugfix release:
> https://flink.apache.org/news/2021/09/29/release-1.14.0.html
>  
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349614
>  
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>  
> Regards,
> Xintong, Joe, Dawid


Re: FlinkJobNotFoundException

2021-09-29 Thread Matthias Pohl
Hi Doug,
thanks for reaching out to the community. First of all, 1.9.2 is quite an
old Flink version. You might want to consider upgrading to a newer version.
The community only offers support for the two most-recent Flink versions.
Newer version might include fixes for your issue.

But back to your actual problem: The logs you're providing only show that
some job switched into FINISHED state. Is there some error showing up
earlier in the logs which you might have missed? It would be helpful if you
could share the complete JobManager logs to get a better understanding of
what's going on.

Best,
Matthias

On Wed, Sep 29, 2021 at 3:47 PM Gusick, Doug S  wrote:

> Hello,
>
>
>
> We are facing an issue with some of our applications that are submitting a
> high volume of jobs to Flink (we are using v1.9.2). We are observing that
> numerous jobs (in this case 44 out of 350+) fail with the same
> FlinkJobNotFoundException within a 45 second timeframe.
>
>
>
> From our client logs, this is the exception we can see:
>
>
>
> Calc Engine: Caused by: 
> org.apache.flink.runtime.rest.util.RestClientException: 
> [org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
> Flink job (d0991f0ae712a9df710aa03311a32c8c)]
>
> Calc Engine:   at 
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
>
> Calc Engine:   at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
>
> Calc Engine:   at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>
> Calc Engine:   at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>
> Calc Engine:   at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>
> Calc Engine:   ... 3 more
>
>
>
>
>
> This is the first job to fail with the above exception. From the
> JobManager logs, we can see that the job goes to FINISHED State, and then
> we see the following exception:
>
>
>
> 2021-09-28 04:54:16,936 INFO  [flink-akka.actor.default-dispatcher-28]
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink
> Java Job at Tue Sep 28 04:48:21 EDT 2021 (d0991f0ae712a9df710aa03311a32c8c)
> switched from state RUNNING to FINISHED.
>
> 2021-09-28 04:54:16,937 INFO  [flink-akka.actor.default-dispatcher-28]
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job
> d0991f0ae712a9df710aa03311a32c8c reached globally terminal state FINISHED.
>
> 2021-09-28 04:54:16,939 INFO  [flink-akka.actor.default-dispatcher-28]
> org.apache.flink.runtime.jobmaster.JobMaster  - Stopping
> the JobMaster for job Flink Java Job at Tue Sep 28 04:48:21 EDT
> 2021(d0991f0ae712a9df710aa03311a32c8c).
>
> 2021-09-28 04:54:16,940 INFO  [flink-akka.actor.default-dispatcher-39]
> org.apache.flink.yarn.YarnResourceManager - Disconnect
> job manager
> 0...@akka.tcp://fl...@d43723-714.dc.gs.com:44887/user/jobmanager_392
> for job d0991f0ae712a9df710aa03311a32c8c from the resource manager.
>
> 2021-09-28 04:54:18,256 ERROR [flink-akka.actor.default-dispatcher-91]
> org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler  -
> Exception occurred in REST handler:
> org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find
> Flink job (d0991f0ae712a9df710aa03311a32c8c)
>
>
>
> Here are the relevant logs from the TaskManager. We can see that the 
> JobLeaderService tries to reconnect to the job. Any ideas as to why it is 
> trying to reconnect?:
>
>
> 2021-09-28 04:54:13,382 INFO  [flink-akka.actor.default-dispatcher-16] 
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Receive slot 
> request b26c04706fd5aad03dfdca8691f1bf1c for job 
> d0991f0ae712a9df710aa03311a32c8c from resource manager with leader id 
> .
>
> 2021-09-28 04:54:13,383 INFO  [flink-akka.actor.default-dispatcher-16] 
> org.apache.flink.runtime.taskexecutor.JobLeaderService- Add job 
> d0991f0ae712a9df710aa03311a32c8c for job leader monitoring.
>
> 2021-09-28 04:54:13,397 INFO  [flink-akka.actor.default-dispatcher-16] 
> org.apache.flink.runtime.taskexecutor.JobLeaderService- Successful 
> registration at job manager 
> akka.tcp://fl...@d43723-714.dc.gs.com:44887/user/jobmanager_392 for job 
> d0991f0ae712a9df710aa03311a32c8c.
>
> 2021-09-28 04:54:13,397 INFO  [flink-akka.actor.default-dispatcher-16] 
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Establish 
> JobManager connection for job d0991f0ae712a9df710aa03311a32c8c.
>
> 2021-09-28 04:54:13,397 INFO  [flink-akka.actor.default-dispatcher-16] 
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Offer 
> reserved slots to the leader of job d0991f0ae712a9df710aa03311a32c8c.
>
> 2021-09-28 04:54:13,405 INFO  [CHAIN DataSource (settl_delivery_type_code | 
> DistCp | Sourcing Files) -> FlatMap (settl_delivery_type_code | DistCp | 

Re: [ANNOUNCE] Apache Flink 1.14.0 released

2021-09-29 Thread Theo Diefenthal


Awesome, thanks for the release.

- Ursprüngliche Mail -
Von: "Dawid Wysakowicz" 
An: "dev" , "user" , 
annou...@apache.org
Gesendet: Mittwoch, 29. September 2021 15:59:47
Betreff: [ANNOUNCE] Apache Flink 1.14.0 released

The Apache Flink community is very happy to announce the release of
Apache Flink 1.14.0.
 
Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data
streaming applications.
 
The release is available for download at:
https://flink.apache.org/downloads.html
 
Please check out the release blog post for an overview of the
improvements for this bugfix release:
https://flink.apache.org/news/2021/09/29/release-1.14.0.html
 
The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349614
 
We would like to thank all contributors of the Apache Flink community
who made this release possible!
 
Regards,
Xintong, Joe, Dawid


[ANNOUNCE] Apache Flink 1.14.0 released

2021-09-29 Thread Dawid Wysakowicz
The Apache Flink community is very happy to announce the release of
Apache Flink 1.14.0.
 
Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data
streaming applications.
 
The release is available for download at:
https://flink.apache.org/downloads.html
 
Please check out the release blog post for an overview of the
improvements for this bugfix release:
https://flink.apache.org/news/2021/09/29/release-1.14.0.html
 
The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349614
 
We would like to thank all contributors of the Apache Flink community
who made this release possible!
 
Regards,
Xintong, Joe, Dawid




OpenPGP_signature
Description: OpenPGP digital signature


FlinkJobNotFoundException

2021-09-29 Thread Gusick, Doug S
Hello,

We are facing an issue with some of our applications that are submitting a high 
volume of jobs to Flink (we are using v1.9.2). We are observing that numerous 
jobs (in this case 44 out of 350+) fail with the same FlinkJobNotFoundException 
within a 45 second timeframe.

>From our client logs, this is the exception we can see:


Calc Engine: Caused by: org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
Flink job (d0991f0ae712a9df710aa03311a32c8c)]

Calc Engine:   at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)

Calc Engine:   at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)

Calc Engine:   at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)

Calc Engine:   at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)

Calc Engine:   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)

Calc Engine:   ... 3 more


This is the first job to fail with the above exception. From the JobManager 
logs, we can see that the job goes to FINISHED State, and then we see the 
following exception:

2021-09-28 04:54:16,936 INFO  [flink-akka.actor.default-dispatcher-28] 
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink Java 
Job at Tue Sep 28 04:48:21 EDT 2021 (d0991f0ae712a9df710aa03311a32c8c) switched 
from state RUNNING to FINISHED.
2021-09-28 04:54:16,937 INFO  [flink-akka.actor.default-dispatcher-28] 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
d0991f0ae712a9df710aa03311a32c8c reached globally terminal state FINISHED.
2021-09-28 04:54:16,939 INFO  [flink-akka.actor.default-dispatcher-28] 
org.apache.flink.runtime.jobmaster.JobMaster  - Stopping the 
JobMaster for job Flink Java Job at Tue Sep 28 04:48:21 EDT 
2021(d0991f0ae712a9df710aa03311a32c8c).
2021-09-28 04:54:16,940 INFO  [flink-akka.actor.default-dispatcher-39] 
org.apache.flink.yarn.YarnResourceManager - Disconnect job 
manager 
0...@akka.tcp://fl...@d43723-714.dc.gs.com:44887/user/jobmanager_392
 for job d0991f0ae712a9df710aa03311a32c8c from the resource manager.
2021-09-28 04:54:18,256 ERROR [flink-akka.actor.default-dispatcher-91] 
org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
Exception occurred in REST handler: 
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
Flink job (d0991f0ae712a9df710aa03311a32c8c)


Here are the relevant logs from the TaskManager. We can see that the 
JobLeaderService tries to reconnect to the job. Any ideas as to why it is 
trying to reconnect?:

2021-09-28 04:54:13,382 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Receive slot 
request b26c04706fd5aad03dfdca8691f1bf1c for job 
d0991f0ae712a9df710aa03311a32c8c from resource manager with leader id 
.

2021-09-28 04:54:13,383 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.taskexecutor.JobLeaderService- Add job 
d0991f0ae712a9df710aa03311a32c8c for job leader monitoring.

2021-09-28 04:54:13,397 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.taskexecutor.JobLeaderService- Successful 
registration at job manager 
akka.tcp://fl...@d43723-714.dc.gs.com:44887/user/jobmanager_392 for job 
d0991f0ae712a9df710aa03311a32c8c.

2021-09-28 04:54:13,397 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Establish 
JobManager connection for job d0991f0ae712a9df710aa03311a32c8c.

2021-09-28 04:54:13,397 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Offer reserved 
slots to the leader of job d0991f0ae712a9df710aa03311a32c8c.

2021-09-28 04:54:13,405 INFO  [CHAIN DataSource (settl_delivery_type_code | 
DistCp | Sourcing Files) -> FlatMap (settl_delivery_type_code | DistCp | 
Copying Batch Data) (1/1)] org.apache.flink.runtime.blob.BlobClient 
 - Downloading 
d0991f0ae712a9df710aa03311a32c8c/p-54dfdc41d9a995e5b75eb9eb29bcac91725fc425-be2bc5df9ef6ce331e8019cf32eb222b
 from d43723-714.dc.gs.com/10.175.239.171:38726

2021-09-28 04:54:16,942 INFO  [flink-akka.actor.default-dispatcher-3] 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable  - Free slot 
TaskSlot(index:0, state:ACTIVE, resource profile: 
ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147483647, 
directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, 
networkMemoryInMB=2147483647, managedMemoryInMB=9318}, allocationId: 
b26c04706fd5aad03dfdca8691f1bf1c, jobId: 

RE: Event is taking a lot of time between the operators

2021-09-29 Thread Sanket Agrawal
Thank you @Piotr Nowojski for helping me.

From: Piotr Nowojski 
Sent: Wednesday, September 29, 2021 12:53 PM
To: Sanket Agrawal 
Cc: Ragini Manjaiah ; user@flink.apache.org
Subject: Re: Event is taking a lot of time between the operators


[**EXTERNAL EMAIL**]
Hi Sanket,

As I mentioned in the previous email, it's most likely still an issue of 
backpressure and you can check it as I described in that message. Either your 
records are stuck in the network buffers between (I) to async operations (if 
there is a network exchange), and/or inside the `AsyncWaitOperator`'s internal 
queue (II). If it's causing you problems

I. For the former problem (network buffers) you can:
a) get rid of the network exchange, via removing keyBy/shuffle/rebalance (might 
not be feasible, depending on your business logic)
b) reduce the amount of the in-flight data. In Flink 1.14 we are adding 
automatic buffer debloating mechanism, in Flink 1.8 you can not use, but you 
could manually tweak both amount and the size of the buffers. You can read 
about it here [1], just ignore the automatic buffer debloating mechanism.
II. You can change the size of the internal queue by adjusting the `capacity` 
parameter [2]

The more buffered in-flight data you have between operators, the longer the 
delay between processing the same record by two different operators.

Best,
Piotrek

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/network_mem_tuning/
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api



śr., 29 wrz 2021 o 08:20 Sanket Agrawal 
mailto:sanket.agra...@infosys.com>> napisał(a):
Hi Ragini,

For measuring time in an async we have put a logger as the first and the last 
statement in asyncInvoke and for measuring time between the asyncs we are 
simply subtracting the message2's start time and message1's end time. Also, we 
are using 1 as the parallelism.

Please let me know if you need any other information or if you have any 
recommendations on improving the approach.

Thanks,
Sanket Agrawal

From: Ragini Manjaiah 
mailto:ragini.manja...@gmail.com>>
Sent: Wednesday, September 29, 2021 11:17 AM
To: Sanket Agrawal 
mailto:sanket.agra...@infosys.com>>
Cc: Piotr Nowojski mailto:pnowoj...@apache.org>>; 
user@flink.apache.org
Subject: Re: Event is taking a lot of time between the operators


[**EXTERNAL EMAIL**]
Hi Sanket,
 I have a similar use case. how are you measuring the time for Async1` function 
to return the result and external api call

On Wed, Sep 29, 2021 at 10:47 AM Sanket Agrawal 
mailto:sanket.agra...@infosys.com>> wrote:
Hi @Piotr Nowojski,

Thank you for replying back. Yes, first async is taking between 1300-1500 
milliseconds but that is called on a CompletableFuture.supplyAsync and the 
Async Capacity is set to 1000.

Async Code Structure: Inside asyncInvoke we are calling 
CompletableFuture.supplyAsync and inside supplyAsync we are calling an external 
API which is taking around 1005ms to 1040ms. Rest of the code for request 
creation/response validation is also inside the supplyAsync and is taking 
around 250ms.

This way we tried that the main Async thread(as the async does not uses 
multiple threads directly) is available for the next message as soon as it 
calls CompletableFuture.supplyAsync on the current message.

Thanks,
Sanket Agrawal

From: Piotr Nowojski mailto:pnowoj...@apache.org>>
Sent: Tuesday, September 28, 2021 8:02 PM
To: Sanket Agrawal 
mailto:sanket.agra...@infosys.com>>
Cc: user@flink.apache.org
Subject: Re: Event is taking a lot of time between the operators


[**EXTERNAL EMAIL**]
Hi,

With Flink 1.8.0 I'm not sure how reliable the backpressure status is in the 
WebUI when it comes to the Async operators. If I remember correctly until 
around Flink 1.10 (+/- 2 version) backpressure monitoring was checking for 
thread dumps stuck in requesting Flink's network memory buffers. If in your 

Re: Unable to connect to Mesos on mesos-appmaster.sh start

2021-09-29 Thread Javier Vegas
Thanks again, Matthias!

Putting  -Djobmanager.rpc.address=$HOST and  -Djobmanager.rpc.port=$PORT0
as params for appmaster.sh
I see in tog they seem to transform in the correct values

-Djobmanager.rpc.address=10.0.23.35 -Djobmanager.rpc.port=31009

but a bit later the appmaster dies with this new error. it is unclear what
address it is trying to bind, I added explicit params
-Drest.bind-port=8081 and
  -Drest.port=8081 in case jobmanager.rpc.port was somehow interfering,
but that didn't help.

2021-09-29 10:29:59.845 [main] INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Shutting
MesosSessionClusterEntrypoint down with application status FAILED.
Diagnostics java.net.BindException: Cannot assign requested address
at java.base/sun.nio.ch.Net.bind0(Native Method)
at java.base/sun.nio.ch.Net.bind(Unknown Source)
at java.base/sun.nio.ch.Net.bind(Unknown Source)
at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source)
at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:134)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:550)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1334)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:506)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:491)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:973)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.bind(AbstractChannel.java:248)
at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:356)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Unknown Source)


.


On Wed, Sep 29, 2021 at 2:36 AM Matthias Pohl 
wrote:

> The port has its separate configuration parameter jobmanager.rpc.port [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#jobmanager-rpc-port-1
>
> On Wed, Sep 29, 2021 at 10:11 AM Javier Vegas  wrote:
>
>> Matthias, thanks for the suggestion! I changed my jobmanager.rpc.address
>> param from $HOSTNAME to $HOST:$PORT0 which in the log I see resolves
>> properly to the host IP and port mapped to 8081
>>
>> 2021-09-29 07:58:05.452 [main] INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  -
>> -Djobmanager.rpc.address=10.0.22.114:31894
>>
>> which is very promising. But sadly a little bit later appmaster dies with
>> this errror:
>>
>> 2021-09-29 07:58:05.648 [main] INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Initializing
>> cluster services.
>> 2021-09-29 07:58:05.674 [main] INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Shutting
>> MesosSessionClusterEntrypoint down with application status FAILED.
>> Diagnostics org.apache.flink.configurati
>> on.IllegalConfigurationException: The configured hostname is not valid
>> at
>> org.apache.flink.util.NetUtils.unresolvedHostToNormalizedString(NetUtils.java:179)
>> at
>> org.apache.flink.util.NetUtils.unresolvedHostAndPortToNormalizedString(NetUtils.java:197)
>> at
>> org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:207)
>> at
>> org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:152)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:370)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:344)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.createRemoteRpcService(AkkaRpcServiceUtils.java:92)
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:294)
>> at
>> 

FlinkSQL Source和Sink的Operator name为什么格式不同

2021-09-29 Thread Ada Luna
Source: TableSourceScan(table=[[default_catalog, default_database,
ods_k]], fields=[id, name])
Sink: Sink(table=[default_catalog.default_database.ads_k], fields=[id, name])
Sink: Sink(table=[default_catalog.default_database.ads_k2], fields=[id, name]))


TableSourceScan 和 Sink相比多了个 中括号,并且采用 ',' 分割名字功空间,这是为什么


Re: flink rest endpoint creation failure

2021-09-29 Thread Matthias Pohl
Hi Curt,
could you elaborate a bit more on your setup? Maybe, provide commands you
used to deploy the jobs and the Flink/YARN logs. What's puzzling me is your
statement about "two JobManagers spinning up" and "everything's working
fine if two TaskManagers are running on different instances".
- When talking about Flink applications, you're talking about application
mode?
- I have the feeling you're mixing up JobManager and TaskManager in your
initial description. Could you clarify this?
- Actually, each of the Flink components (JobManager and TaskManager)
should run in its own YARN container. The way you describe it it sounds
like Flink runs within one container?

Best,
Matthias



On Thu, Sep 23, 2021 at 5:14 PM Curt Buechter  wrote:

> Thanks Robert,
> But, no, the rest.bind-port is not set to 35485 in the configuration.
> Other jobs use different ports, so it is getting set dynamically.
>
>
> #==
> # Rest & web frontend
>
> #==
>
> # The port to which the REST client connects to. If rest.bind-port has
> # not been specified, then the server will bind to this port as well.
> #
> #rest.port: 8081
>
> # The address to which the REST client will connect to
> #
> #rest.address: 0.0.0.0
>
> # Port range for the REST and web server to bind to.
> #
> #rest.bind-port: 8080-8090
>
> # The address that the REST & web server binds to
> #
> #rest.bind-address: 0.0.0.0
>
> # Flag to specify whether job submission is enabled from the web-based
> # runtime monitor. Uncomment to disable.
>
> #web.submit.enable: false
>
>
>
> On Wed, Sep 22, 2021 at 11:46 AM Curt Buechter 
> wrote:
>
>> Hi,
>> I'm getting an error that happens randomly when starting a flink
>> application.
>>
>> For context, this is running in YARN on AWS. This application is one that
>> converts from the Table API to the Stream API, so two flink
>> applications/jobmanagers are trying to start up. I think what happens is
>> that the rest api port is chosen, and is the same for both of the flink
>> apps. If YARN chooses two different instances for the two task managers,
>> they each work fine and start their rest api on the same port on their own
>> respective machine. But, if YARN chooses the same instance for both job
>> managers, they both try to start up the rest api on the same port on the
>> same machine, and I get the error.
>>
>> Here is the error:
>>
>> 2021-09-22 15:47:27,724 ERROR 
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could not 
>> start cluster entrypoint YarnJobClusterEntrypoint.
>> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
>> initialize the cluster entrypoint YarnJobClusterEntrypoint.
>>  at 
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:212)
>>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>>  at 
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)
>>  [flink-dist_2.12-1.13.2.jar:1.13.2]
>>  at 
>> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99)
>>  [flink-dist_2.12-1.13.2.jar:1.13.2]
>> Caused by: org.apache.flink.util.FlinkException: Could not create the 
>> DispatcherResourceManagerComponent.
>>  at 
>> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:275)
>>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>>  at 
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250)
>>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>>  at 
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
>>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>>  at java.security.AccessController.doPrivileged(Native Method) 
>> ~[?:1.8.0_282]
>>  at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
>>  at 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
>>  ~[hadoop-common-3.2.1-amzn-3.jar:?]
>>  at 
>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>>  at 
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
>>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>>  ... 2 more
>> Caused by: java.net.BindException: Could not start rest endpoint on any port 
>> in port range 35485
>>  at 
>> org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:234)
>>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>>  at 
>> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172)
>>  

Re: Unable to connect to Mesos on mesos-appmaster.sh start

2021-09-29 Thread Matthias Pohl
The port has its separate configuration parameter jobmanager.rpc.port [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#jobmanager-rpc-port-1

On Wed, Sep 29, 2021 at 10:11 AM Javier Vegas  wrote:

> Matthias, thanks for the suggestion! I changed my jobmanager.rpc.address
> param from $HOSTNAME to $HOST:$PORT0 which in the log I see resolves
> properly to the host IP and port mapped to 8081
>
> 2021-09-29 07:58:05.452 [main] INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  -
> -Djobmanager.rpc.address=10.0.22.114:31894
>
> which is very promising. But sadly a little bit later appmaster dies with
> this errror:
>
> 2021-09-29 07:58:05.648 [main] INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Initializing
> cluster services.
> 2021-09-29 07:58:05.674 [main] INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Shutting
> MesosSessionClusterEntrypoint down with application status FAILED.
> Diagnostics org.apache.flink.configurati
> on.IllegalConfigurationException: The configured hostname is not valid
> at
> org.apache.flink.util.NetUtils.unresolvedHostToNormalizedString(NetUtils.java:179)
> at
> org.apache.flink.util.NetUtils.unresolvedHostAndPortToNormalizedString(NetUtils.java:197)
> at
> org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:207)
> at
> org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:152)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:370)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:344)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.createRemoteRpcService(AkkaRpcServiceUtils.java:92)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:294)
> at
> org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.initializeServices(MesosSessionClusterEntrypoint.java:61)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:239)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
> at java.base/java.security.AccessController.doPrivileged(Native Method)
> at java.base/javax.security.auth.Subject.doAs(Unknown Source)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)
> at
> org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.main(MesosSessionClusterEntrypoint.java:114)
> Caused by: java.lang.IllegalArgumentException
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122)
> at
> org.apache.flink.util.NetUtils.unresolvedHostToNormalizedString(NetUtils.java:177)
> ... 17 more
> .
> 2021-09-29 07:58:05.685 [main] ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Could not start
> cluster entrypoint MesosSessionClusterEntrypoint.
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> initialize the cluster entrypoint MesosSessionClusterEntrypoint.
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:212)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)
> at
> org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.main(MesosSessionClusterEntrypoint.java:114)
> Caused by: org.apache.flink.configuration.IllegalConfigurationException:
> The configured hostname is not valid
> at
> org.apache.flink.util.NetUtils.unresolvedHostToNormalizedString(NetUtils.java:179)
> at
> org.apache.flink.util.NetUtils.unresolvedHostAndPortToNormalizedString(NetUtils.java:197)
> at
> org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:207)
> at
> org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:152)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:370)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:344)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.createRemoteRpcService(AkkaRpcServiceUtils.java:92)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:294)
> at
> 

Re: Unable to connect to Mesos on mesos-appmaster.sh start

2021-09-29 Thread Javier Vegas
Matthias, thanks for the suggestion! I changed my jobmanager.rpc.address
param from $HOSTNAME to $HOST:$PORT0 which in the log I see resolves
properly to the host IP and port mapped to 8081

2021-09-29 07:58:05.452 [main] INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint  -
-Djobmanager.rpc.address=10.0.22.114:31894

which is very promising. But sadly a little bit later appmaster dies with
this errror:

2021-09-29 07:58:05.648 [main] INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Initializing
cluster services.
2021-09-29 07:58:05.674 [main] INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Shutting
MesosSessionClusterEntrypoint down with application status FAILED.
Diagnostics org.apache.flink.configurati
on.IllegalConfigurationException: The configured hostname is not valid
at
org.apache.flink.util.NetUtils.unresolvedHostToNormalizedString(NetUtils.java:179)
at
org.apache.flink.util.NetUtils.unresolvedHostAndPortToNormalizedString(NetUtils.java:197)
at
org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:207)
at
org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:152)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:370)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:344)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.createRemoteRpcService(AkkaRpcServiceUtils.java:92)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:294)
at
org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.initializeServices(MesosSessionClusterEntrypoint.java:61)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:239)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Unknown Source)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)
at
org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.main(MesosSessionClusterEntrypoint.java:114)
Caused by: java.lang.IllegalArgumentException
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122)
at
org.apache.flink.util.NetUtils.unresolvedHostToNormalizedString(NetUtils.java:177)
... 17 more
.
2021-09-29 07:58:05.685 [main] ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Could not start
cluster entrypoint MesosSessionClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
initialize the cluster entrypoint MesosSessionClusterEntrypoint.
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:212)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)
at
org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.main(MesosSessionClusterEntrypoint.java:114)
Caused by: org.apache.flink.configuration.IllegalConfigurationException:
The configured hostname is not valid
at
org.apache.flink.util.NetUtils.unresolvedHostToNormalizedString(NetUtils.java:179)
at
org.apache.flink.util.NetUtils.unresolvedHostAndPortToNormalizedString(NetUtils.java:197)
at
org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:207)
at
org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:152)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:370)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:344)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.createRemoteRpcService(AkkaRpcServiceUtils.java:92)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:294)
at
org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.initializeServices(MesosSessionClusterEntrypoint.java:61)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:239)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Unknown Source)
at

Re: Event is taking a lot of time between the operators

2021-09-29 Thread Piotr Nowojski
Hi Sanket,

As I mentioned in the previous email, it's most likely still an issue of
backpressure and you can check it as I described in that message. Either
your records are stuck in the network buffers between (I) to async
operations (if there is a network exchange), and/or inside the
`AsyncWaitOperator`'s internal queue (II). If it's causing you problems

I. For the former problem (network buffers) you can:
a) get rid of the network exchange, via removing keyBy/shuffle/rebalance
(might not be feasible, depending on your business logic)
b) reduce the amount of the in-flight data. In Flink 1.14 we are adding
automatic buffer debloating mechanism, in Flink 1.8 you can not use, but
you could manually tweak both amount and the size of the buffers. You can
read about it here [1], just ignore the automatic buffer debloating
mechanism.
II. You can change the size of the internal queue by adjusting the
`capacity` parameter [2]

The more buffered in-flight data you have between operators, the longer the
delay between processing the same record by two different operators.

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/network_mem_tuning/
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api



śr., 29 wrz 2021 o 08:20 Sanket Agrawal 
napisał(a):

> Hi Ragini,
>
>
>
> For measuring time in an async we have put a logger as the first and the
> last statement in asyncInvoke and for measuring time between the asyncs
> we are simply subtracting the message2’s start time and message1’s end
> time. Also, we are using 1 as the parallelism.
>
>
>
> Please let me know if you need any other information or if you have any
> recommendations on improving the approach.
>
>
>
> Thanks,
>
> Sanket Agrawal
>
>
>
> *From:* Ragini Manjaiah 
> *Sent:* Wednesday, September 29, 2021 11:17 AM
> *To:* Sanket Agrawal 
> *Cc:* Piotr Nowojski ; user@flink.apache.org
> *Subject:* Re: Event is taking a lot of time between the operators
>
>
>
> [**EXTERNAL EMAIL**]
>
> Hi Sanket,
>
>  I have a similar use case. how are you measuring the time for Async1`
> function to return the result and external api call
>
>
>
> On Wed, Sep 29, 2021 at 10:47 AM Sanket Agrawal <
> sanket.agra...@infosys.com> wrote:
>
> Hi @Piotr Nowojski ,
>
>
>
> Thank you for replying back. Yes, first async is taking between 1300-1500
> milliseconds but that is called on a CompletableFuture.*supplyAsync *and
> the Async Capacity is set to 1000.
>
>
>
> *Async Code Structure*: Inside asyncInvoke we are calling
> CompletableFuture.*supplyAsync *and inside* supplyAsync *we are calling
> an external API which is taking around 1005ms to 1040ms. Rest of the code
> for request creation/response validation is also inside the* supplyAsync *and
> is taking around 250ms.
>
>
>
> This way we tried that the main Async thread(as the async does not uses
> multiple threads directly) is available for the next message as soon as it
> calls CompletableFuture.supplyAsync on the current message.
>
>
>
> Thanks,
>
> Sanket Agrawal
>
>
>
> *From:* Piotr Nowojski 
> *Sent:* Tuesday, September 28, 2021 8:02 PM
> *To:* Sanket Agrawal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Event is taking a lot of time between the operators
>
>
>
> [**EXTERNAL EMAIL**]
>
> Hi,
>
>
>
> With Flink 1.8.0 I'm not sure how reliable the backpressure status is in
> the WebUI when it comes to the Async operators. If I remember correctly
> until around Flink 1.10 (+/- 2 version) backpressure monitoring was
> checking for thread dumps stuck in requesting Flink's network memory
> buffers. If in your job AsyncFunction is the source of a backpressure, it
> would be skipped and not reported. For analysing backpressure I would
> highly recommend upgrading to Flink 1.13.x as it has greatly improved
> tooling for that [1]. And in that version AsynFunctions are definitely
> handled correctly. Since Flink 1.10 I believe you can use the
> `isBackPressured` metric. In previous versions you would have to rely on
> buffer usage metrics as described here [2].
>
>
>
>
>
> [1] https://flink.apache.org/2021/07/07/backpressure.html
> 
>
> [2]
> https://flink.apache.org/2019/07/23/flink-network-stack-2.html#network-metrics
> 

Re: Unable to connect to Mesos on mesos-appmaster.sh start

2021-09-29 Thread Matthias Pohl
One thing that was puzzling me yesterday when reading your post: Have you
tried $HOST instead of $HOSTNAME in the Marathon configuration? When I
played around with Mesos, I remember using HOST to resolve the host's IP
address instead of the host's name. It could be that the hostname itself
cannot be resolved to the right IP address. But I struggled to find proper
documentation to back that up. Only in the recipes section of the Marathon
docs [1], HOST was used as well.

Matthias

[1]
https://mesosphere.github.io/marathon/docs/recipes.html#command-executor-health-checks

On Wed, Sep 29, 2021 at 3:37 AM Javier Vegas  wrote:

> Another update:  Looking more carefully in my appmaster log, I see the
> following
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  -
> Registering as new framework.
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  -
> -
>
> ---
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  -  Mesos
> Info:
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - Master
> URL: 10.0.18.246:5050
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  -  Framework
> Info:
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - ID:
> (none)
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - Name:
> flink-test
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - Failover
> Timeout (secs): 604800.0
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - Role: *
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - 
> Capabilities:
> (none)
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - Principal:
> (none)
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - Host:
> 311dcf7fd77c
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - Web
> UI: http://311dcf7fd77c:8081
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  -
> -
>
> ---
>
>
> which is picking up the mesos.master and
> mesos.resourcemanager.framework.name params I am passing to
> mesos-appmaster.sh
>
>
> In my Mesos dashboard I can see the framework has been created with the
> right name, but has no associated agents/tasks to it. So at least Flink has
> been able to connect to the Mesos master to create the framework
>
>
> Later in the mesos-appmaster log is when I see the Mesos connection errors:
>
>
> 2021-09-29 01:15:39.726 [flink-akka.actor.default-dispatcher-3] DEBUG
> o.a.f.r.resourcemanager.slotmanager.DeclarativeSlotManager  - Starting
> the slot manager.
>
> 2021-09-29 01:15:39.815 [flink-akka.actor.default-dispatcher-2] DEBUG
> org.apache.flink.mesos.scheduler.ConnectionMonitor  - State change
> (StoppedState -> StoppedState) with data ()
>
> 2021-09-29 01:15:39.823 [flink-akka.actor.default-dispatcher-3] DEBUG
> o.a.f.runtime.resourcemanager.active.ActiveResourceManager  - Trigger
> heartbeat request.
>
> 2021-09-29 01:15:39.823 [flink-akka.actor.default-dispatcher-3] DEBUG
> org.apache.flink.mesos.scheduler.ReconciliationCoordinator  - State
> change (Suspended -> Suspended) with data ReconciliationData(Map(),0)
>
> 2021-09-29 01:15:39.823 [flink-akka.actor.default-dispatcher-3] DEBUG
> o.a.f.runtime.resourcemanager.active.ActiveResourceManager  - Trigger
> heartbeat request.
>
> 2021-09-29 01:15:39.824 [flink-akka.actor.default-dispatcher-3] INFO
> org.apache.flink.mesos.scheduler.ConnectionMonitor  - Connecting to
> Mesos...
>
> 2021-09-29 01:15:39.825 [flink-akka.actor.default-dispatcher-3] DEBUG
> org.apache.flink.mesos.scheduler.ConnectionMonitor  - State change
> (StoppedState -> ConnectingState) with data ()
>
> 2021-09-29 01:15:39.826 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - Mesos
> resource manager started.
>

Re: HA setup Flink version flink:1.13.2-scala_2.12-java11

2021-09-29 Thread Dhiru
 Thanks    I got this working On Wednesday, September 29, 2021, 12:12:17 AM 
EDT, Dhiru  wrote:  
 
 I am following this link for setting up  HA configuration ZooKeeper HA Services

| 
| 
|  | 
ZooKeeper HA Services

ZooKeeper HA Services # Flink’s ZooKeeper HA services use ZooKeeper for high 
availability services. Flink lever...
 |

 |

 |



zookeeper version which I am using is 3.4.10
    high-availability.storageDir: s3a://    kubernetes.cluster-id: 
cluster1337    high-availability: zookeeper    
high-availability.zookeeper.quorum: zookeeper:2181    
high-availability.zookeeper.path.root: /flink
logs I see zookeeper has this error 
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState [] - 
Authentication failed

tried searching online, but didn't any solution, Please let me know if anyone 
has fixed this issue   

RE: Upgrading from 1.11.3 -> 1.13.1 - random jobs stays in "CREATED"state, then fails with Slot request bulk is not fulfillable!

2021-09-29 Thread Schwalbe Matthias
Hi Tobias,

If your number of pipelines equals number of Flink job then this is exactly 
what you should observe:
It takes one slot per Flink job and parallelism, hence for parallelism 1 you 
would have to provide at least 40 slots.

… independent of Flink version

… for Beam on Flink I’m not sure, assuming similar matters


Thias


From: Kaymak, Tobias 
Sent: Freitag, 24. September 2021 14:53
To: user 
Subject: Upgrading from 1.11.3 -> 1.13.1 - random jobs stays in "CREATED"state, 
then fails with Slot request bulk is not fulfillable!

Hi,

I am trying to upgrade our Flink cluster from version 1.11.3 -> 1.13.1
We use it to execute over 40 pipelines written in Apache Beam 2.32.0.

While moving the pipelines one-by-one over to the new cluster I noticed at some 
point that it did not start a new pipeline after I moved about 20.

4 TM with 8 slots are running, giving 32 slots to run things.

When I kill the jobmanager pod to make it reload the config, a random pipeline 
is then stuck in the CREATED state. No log is shown but after some minutes it's 
visible that:

Slot request bulk is not fulfillable! Could not allocate the required slot 
within slot request timeout

I found this post: 
http://mail-archives.apache.org/mod_mbox/flink-issues/202106.mbox/%3cjira.13382840.162321628.576520.1623246960...@atlassian.jira%3E

However, I am running the official Docker images of Flink, TM and JM are in 
sync.

I checked that there is no memory pressure on TM and JM:
[cid:image001.png@01D7B50B.6E949D20]
[cid:image002.png@01D7B50B.6E949D20]

Any advice on how to debug this situation?

jobmanager.memory.heap.size: 3500m
jobmanager.memory.jvm-overhead.max: 1536m
jobmanager.memory.process.size: 5gb
jobmanager.memory.off-heap.size: 512m
jobmanager.memory.jvm-metaspace.size: 512m

taskmanager.memory.process.size: 54gb
taskmanager.memory.jvm-metaspace.size: 2gb
taskmanager.memory.task.off-heap.size: 2gb

Best,
Tobi
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.


RE: Event is taking a lot of time between the operators

2021-09-29 Thread Sanket Agrawal
Hi Ragini,

For measuring time in an async we have put a logger as the first and the last 
statement in asyncInvoke and for measuring time between the asyncs we are 
simply subtracting the message2's start time and message1's end time. Also, we 
are using 1 as the parallelism.

Please let me know if you need any other information or if you have any 
recommendations on improving the approach.

Thanks,
Sanket Agrawal

From: Ragini Manjaiah 
Sent: Wednesday, September 29, 2021 11:17 AM
To: Sanket Agrawal 
Cc: Piotr Nowojski ; user@flink.apache.org
Subject: Re: Event is taking a lot of time between the operators


[**EXTERNAL EMAIL**]
Hi Sanket,
 I have a similar use case. how are you measuring the time for Async1` function 
to return the result and external api call

On Wed, Sep 29, 2021 at 10:47 AM Sanket Agrawal 
mailto:sanket.agra...@infosys.com>> wrote:
Hi @Piotr Nowojski,

Thank you for replying back. Yes, first async is taking between 1300-1500 
milliseconds but that is called on a CompletableFuture.supplyAsync and the 
Async Capacity is set to 1000.

Async Code Structure: Inside asyncInvoke we are calling 
CompletableFuture.supplyAsync and inside supplyAsync we are calling an external 
API which is taking around 1005ms to 1040ms. Rest of the code for request 
creation/response validation is also inside the supplyAsync and is taking 
around 250ms.

This way we tried that the main Async thread(as the async does not uses 
multiple threads directly) is available for the next message as soon as it 
calls CompletableFuture.supplyAsync on the current message.

Thanks,
Sanket Agrawal

From: Piotr Nowojski mailto:pnowoj...@apache.org>>
Sent: Tuesday, September 28, 2021 8:02 PM
To: Sanket Agrawal 
mailto:sanket.agra...@infosys.com>>
Cc: user@flink.apache.org
Subject: Re: Event is taking a lot of time between the operators


[**EXTERNAL EMAIL**]
Hi,

With Flink 1.8.0 I'm not sure how reliable the backpressure status is in the 
WebUI when it comes to the Async operators. If I remember correctly until 
around Flink 1.10 (+/- 2 version) backpressure monitoring was checking for 
thread dumps stuck in requesting Flink's network memory buffers. If in your job 
AsyncFunction is the source of a backpressure, it would be skipped and not 
reported. For analysing backpressure I would highly recommend upgrading to 
Flink 1.13.x as it has greatly improved tooling for that [1]. And in that 
version AsynFunctions are definitely handled correctly. Since Flink 1.10 I 
believe you can use the `isBackPressured` metric. In previous versions you 
would have to rely on buffer usage metrics as described here [2].


[1] 
https://flink.apache.org/2021/07/07/backpressure.html
[2] 
https://flink.apache.org/2019/07/23/flink-network-stack-2.html#network-metrics

Apart of the back pressure, part of the problem might be simply how long does 
it take for `Async1` function to return the result. Have you checked that? 
Isn't it taking a couple of seconds?

Best,
Piotrek

wt., 28 wrz 2021 o 15:55 Sanket Agrawal 
mailto:sanket.agra...@infosys.com>> napisał(a):
Hi All,

I am new to Flink. While developing a Flink application We observed that our 
message is taking around 10 seconds between the two Async operators. Below are 
the details.


  *   Flink Flow: Kinesis Source -> Process -> Async1 -> Async2 -> Process -> 
Kinesis Sink
  *   Environment: Amazon KDA. 1 Kinesis Processing Unit (1vCore & 4GB ram), 
and 1 parallelism.
  *   Flink Version: 1.8.0
  *   Backpressure: Flink dashboard shows that backpressure is OK.
  *   Input rate: 60 messages per second.

Any kind of pointers/help will be very useful.

Thanks,
Sanket Agrawal



Re: Flink run different jars

2021-09-29 Thread Yangze Guo
You need to edit the conf/workers. Example of the config[1] and the process[2].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-and-stopping-a-cluster
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-a-standalone-cluster-session-mode

Best,
Yangze Guo

On Wed, Sep 29, 2021 at 1:02 PM Qihua Yang  wrote:
>
> Hi Yangze,
>
> Thanks a lot for your reply. References are very helpful!
> Another quick question. Reference 1 can start a standalone cluster (session 
> Mode). That cluster has a jobManager. I can submit job to run. How about 
> taskManger? Do I need to manually start multiple taskManagers?
> Is there a complete example to show the process?
>
> Thanks,
> Qihua
>
>
> On Tue, Sep 28, 2021 at 7:02 PM Yangze Guo  wrote:
>>
>> Hi, Qihua
>>
>> IIUC, what you want might be a standalone cluster[1] or session 
>> cluster[2][3].
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#session-mode
>> [3] 
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode
>>
>> Best,
>> Yangze Guo
>>
>> On Wed, Sep 29, 2021 at 5:57 AM Qihua Yang  wrote:
>> >
>> > Hi,
>> >
>> > Is that possible to run a flink app without a job? What I am trying to do 
>> > is I build multiple jars. And switch jar to run different jobs.
>> > I am not sure if flink supports this mode. I saw rest API can upload jar, 
>> > cancel job and run a jar.
>> > Right now I can upload a jar to flink. But when I cancel a job, flink will 
>> > restart automatically. I checked log. It show below logs. Can anyone help 
>> > me out?
>> >
>> > Caused by: 
>> > org.apache.flink.client.deployment.application.UnsuccessfulExecutionException:
>> >  Application Status: CANCELED
>> > at 
>> > org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:73)
>> > ... 41 common frames omitted
>> > Caused by: org.apache.flink.runtime.client.JobCancellationException: Job 
>> > was cancelled.
>> > at 
>> > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:149)
>> > at 
>> > org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:64)
>> > ... 41 common frames omitted
>> >
>> > Thanks!